-- 日拱一卒, 不期而至!
你好, 我是彤哥, 本篇是 netty 系列的第七篇.
简介
上一章我们一起学习了 Java NIO 的核心组件 Buffer, 它通常跟 Channel 一起使用, 但是它们在网络 IO 中又该如何使用呢, 今天我们将一起学习另一个 NIO 核心组件 --Selector, 没有它可以说就干不起来网络 IO.
概念
我们先来看两段 Selector 的注释, 见类 java.nio.channels.Selector.
注释 I
A multiplexor of {@link SelectableChannel} objects.
它是 SelectableChannel 对象的多路复用器, 从这里我们也可以知道 Java NIO 实际上是多路复用 IO.
SelectableChannel 有几个子类, 你会非常熟悉:
DatagramChannel,UDP 协议连接
SocketChannel,TCP 协议连接
ServerSocketChannel, 专门处理 TCP 协议 Accept 事件
我们有必要复习一下多路复用 IO 的流程:
第一阶段通过 select 去轮询检查有没有连接准备好数据, 第二阶段把数据从内核空间拷贝到用户空间.
在 Java 中, 就是通过 Selector 这个多路复用器来实现第一阶段的.
注释 II
A selector may be created by invoking the {@link #open open} method of this class, which will use the system's default {@link java.nio.channels.spi.SelectorProvider selector provider} to create a new selector. A selector may also be created by invoking the {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector} method of a custom selector provider. A selector remains open until it is closed via its {@link #close close} method.
Selector 可以通过它自己的 open()方法创建, 它将通过默认的 java.nio.channels.spi.SelectorProvider 类创建一个新的 Selector. 也可以通过实现 java.nio.channels.spi.SelectorProvider 类的抽象方法 openSelector()来自定义实现一个 Selector.Selector 一旦创建将会一直处于 open 状态直到调用了 close()方法为止.
那么, 默认使用的 Selector 究竟是哪个呢?
通过跟踪源码:
- > java.nio.channels.Selector#open()
- 1> java.nio.channels.spi.SelectorProvider#provider()
- 1.1> sun.nio.ch.DefaultSelectorProvider#create() // 返回 WindowsSelectorProvider
- 2> sun.nio.ch.WindowsSelectorProvider#openSelector() // 返回 WindowsSelectorImpl
可以看到, 在 Windows 平台下, 默认实现的 Provider 是 WindowsSelectorProvider, 它的 openSelector()方法返回的是 WindowsSelectorImpl, 它就是 Windows 平台默认的 Selector 实现.
为什么要提到在 Windows 平台呢, 难道在 Linux 下面实现不一样?
是滴, 因为网络 IO 是跟操作系统息息相关的, 不同的操作系统的实现可能都不一样, Linux 下面 JDK 的实现完全不一样, 那么我们为什么没有感知到呢? 我的代码在 Windows 下面写的, 拿到 Linux 下面不是一样运行? 那是 Java 虚拟机 (或者说 Java 运行时环境) 帮我们把这个事干了, 它屏蔽了跟操作系统相关的细节, 这也是 Java 代码可以 "Write Once, Run Anywhere" 的精髓所在.
Selector 与 Channel 的关系
上面我们说了 selector 是多路复用器, 它是在网络 IO 的第一阶段用来轮询检查有没有连接准备好数据的, 那么它和 Channel 是什么关系呢?
Selector 通过不断轮询的方式同时监听多个 Channel 的事件, 注意, 这里是同时监听, 一旦有 Channel 准备好了, 它就会返回这些准备好了的 Channel, 交给处理线程去处理.
所以, 在 NIO 编程中, 通过 Selector 我们就实现了一个线程同时处理多个连接请求的目标, 也可以一定程序降低服务器资源的消耗.
基本用法
创建 Selector
通过调用 Selector.open()方法是我们常用的方式:
Selector selector = Selector.open();
当然, 也可以通过实现 java.nio.channels.spi.SelectorProvider.openSelector()抽象方法自定义一个 Selector.
将 Channel 注册到 Selector 上
为了将 Channel 跟 Selector 绑定在一起, 需要将 Channel 注册到 Selector 上, 调用 Channel 的 register()方法即可:
- channel.configureBlocking(false);
- SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
- Connect
- Accept
- Read
- Write
- int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
- SelectionKey
- int interestSet = selectionKey.interestOps();
- boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
- boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
- boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
- boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
- int readySet = selectionKey.readyOps();
- selectionKey.isAcceptable();
- selectionKey.isConnectable();
- selectionKey.isReadable();
- selectionKey.isWritable();
- Channel channel = selectionKey.channel();
- Selector selector = selectionKey.selector();
- selectionKey.attach(theObject);
- Object attachedObj = selectionKey.attachment();
- SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
- Selector.select()
- Set<SelectionKey> selectedKeys = selector.selectedKeys();
- Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
- while(keyIterator.hasNext()) {
- SelectionKey key = keyIterator.next();
- if(key.isAcceptable()) {
- // a connection was accepted by a ServerSocketChannel.
- } else if (key.isConnectable()) {
- // a connection was established with a remote server.
- } else if (key.isReadable()) {
- // a channel is ready for reading
- } else if (key.isWritable()) {
- // a channel is ready for writing
- }
- keyIterator.remove();
- }
- public class EchoServer {
- public static void main(String[] args) throws IOException {
- // 创建一个 Selector
- Selector selector = Selector.open();
- // 创建 ServerSocketChannel
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- // 绑定 8080 端口
- serverSocketChannel.bind(new InetSocketAddress(8080));
- // 设置为非阻塞模式, 本文来源于工从号彤哥读源码
- serverSocketChannel.configureBlocking(false);
- // 将 Channel 注册到 selector 上, 并注册 Accept 事件
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- while (true) {
- // 阻塞在 select 上
- selector.select();
- // 如果使用的是 select(timeout)或 selectNow()需要判断返回值是否大于 0
- // 有就绪的 Channel
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- // 遍历 selectKeys
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
- while (iterator.hasNext()) {
- SelectionKey selectionKey = iterator.next();
- // 如果是 accept 事件
- if (selectionKey.isAcceptable()) {
- // 强制转换为 ServerSocketChannel
- ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
- SocketChannel socketChannel = ssc.accept();
- System.out.println("accept new conn:" + socketChannel.getRemoteAddress());
- socketChannel.configureBlocking(false);
- // 将 SocketChannel 注册到 Selector 上, 并注册读事件
- socketChannel.register(selector, SelectionKey.OP_READ);
- } else if (selectionKey.isReadable()) {
- // 如果是读取事件
- // 强制转换为 SocketChannel
- SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
- // 创建 Buffer 用于读取数据
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- // 将数据读入到 buffer 中
- int length = socketChannel.read(buffer);
- if (length> 0) {
- buffer.flip();
- byte[] bytes = new byte[buffer.remaining()];
- // 将数据读入到 byte 数组中
- buffer.get(bytes);
- // 换行符会跟着消息一起传过来
- String content = new String(bytes, "UTF-8").replace("\r\n", "");
- if (content.equalsIgnoreCase("quit")) {
- selectionKey.cancel();
- socketChannel.close();
- } else {
- System.out.println("receive msg:" + content);
- }
- }
- }
- iterator.remove();
- }
- }
- }
- }
来源: https://www.cnblogs.com/tong-yuan/p/11992860.html