我前段时间的一篇博客 java 网络编程 -- 多线程数据收发并行总结了服务端与客户端之间的收发并行实践. 原理很简单, 就是针对单一客户端, 服务端起两个线程分别负责 read 和 write 操作, 然后线程保持阻塞等待读写执行.
事实上, 这样的模式非常糟糕. 因为每一个客户端在服务端需要占用两条线程, 假如有 1000 个客户端, 则需要 2000 + 条线程. CPU 需要花费大量的时间进行线程上下文切换, 造成系统资源浪费.
想要缩减线程数量, 先要解决阻塞问题. 而 NIO 可以通过 IO 多路复用将 read 和 write 的阻塞给抹去. 再配合线程池, 即可实现用少量的线程支撑起上百万个客户端的连接.
什么是 NIO
NIO 与 IO 多路复用
java NIO 全称 java non-blocking IO. 字面意思即非阻塞式 IO. 实际上这里的非阻塞只是宏观的说法.
关于 IO 模式, 这里引一个别人的博客, 介绍了几种 IO 模式的区别:
简述同步 IO, 异步 IO, 阻塞 IO, 非阻塞 IO 之间的联系与区别
本博客不再赘述这些, 只是想说 NIO 属于其中的 IO 复用模型.(实验室里有一本《UNIX 网络编程》疫情结束回学校一定把这部分好好看看)
多路复用 IO 模型中, 会有一个线程去不断轮询多个 socket 的状态, 当 socket 有读写事件时, 才来调用 IO 操作. 因为是一个线程来管理多个 socket, 系统不需要建立其它线程, 维护线程, 只有 socket 就绪时, 才会使用 IO 资源, 所以它大大降低了资源占用.
java NIO 中, 使用 selector.select() 监听多个通道是否有到达事件, 没有事件就一直阻塞, 有事件就调用 IO 进行处理.
三大核心
通道 (Channel)
缓冲区 (Buffer)
选择器 (Selectors)
详细介绍如下
NIO 使用举例
这里以服务端读取客户端消息的流程为例, 介绍 NIO 的使用 (完整内容只有输入, 暂且不管输出). 画了一个流程图, 如下所示:
建立 selector 和 ServerSocketChannel, 并绑定注册, 用于监听客户端连接请求, 代码如下:
- selector = Selector.open();
- ServerSocketChannel server = ServerSocketChannel.open();
- // 设置为非阻塞
- server.configureBlocking(false);
- // 绑定本地端口
- server.socket().bind(new InetSocketAddress(port));
- // 注册客户端连接到达监听
- server.register(selector, SelectionKey.OP_ACCEPT);
同时还要建立 readSelector 和 writeSelector. 其实线程池也是提前建立的, 这里暂且不写.
- readSelector = Selector.open();
- writeSelector = Selector.open();
监听通道, 得到客户端, 并建立 SocketChannel, 用于监听后续客户端消息
- //select() 方法返回已就绪的通道数
- if (selector.select() == 0) {
- continue;
- }
- Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- iterator.remove();
- // 检查当前 Key 的状态是否是 accept 的
- // 客户端到达状态
- if (key.isAcceptable()) {
- ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
- // 非阻塞状态拿到客户端连接
- SocketChannel socketChannel = serverSocketChannel.accept();
- try {
- // 客户端构建异步线程
- // 添加同步处理
- // 此处代码暂且忽略
- } catch (IOException e) {
- e.printStackTrace();
- System.out.println("客户端连接异常:" + e.getMessage());
- }
- }
将 SocketChannel 注册进 readSelector 和 writeSelector
- /**
- * 参数分别是: channel, 对应的 selector, 以及
- *registerOps: 待注册的操作集, 这个在后文中有详细解析;
- *locker: 用于标识同步代码块的状态, 是锁定还是可用;
- *runnable: 执行具体读写操作的类, 送给线程池执行;
- *map: 建立 SelectionKey 与 Runnable 映射关系的 HashMap.
- */
- private static SelectionKey registerSelection(SocketChannel channel, Selector selector,
- int registerOps, AtomicBoolean locker,
- HashMap<SelectionKey, Runnable> map,
- Runnable runnable) {
- synchronized (locker) {
- // 设置锁定状态
- locker.set(true);
- try {
- // 唤醒当前的 selector, 让 selector 不处于 select() 状态
- // 注册 channel 时一定要将 selector 唤醒, 否则当前 select 中没有刚注册的 channel
- selector.wakeup();
- SelectionKey key = null;
- if (channel.isRegistered()) {
- // 查询是否已经注册过
- key = channel.keyFor(selector);
- if (key != null) {
- // 将新的 Ops 添加进去
- key.interestOps(key.readyOps() | registerOps);
- }
- }
- if (key == null) {
- // 注册 selector 得到 Key
- key = channel.register(selector, registerOps);
- // 注册回调
- map.put(key, runnable);
- }
- return key;
- } catch (ClosedChannelException e) {
- return null;
- } finally {
- // 解除锁定状态
- locker.set(false);
- try {
- // 通知
- locker.notify();
- } catch (Exception ignored) {
- }
- }
- }
- }
监听各个客户端消息, 通过 selectionKeys 获取 channel, 再执行输入操作
- try {
- if (readSelector.select() == 0) {
- continue;
- }
- Set<SelectionKey> selectionKeys = readSelector.selectedKeys();
- for (SelectionKey selectionKey : selectionKeys) {
- if (selectionKey.isValid()) {
- //IO 处理代码, 暂且忽略
- }
- }
- selectionKeys.clear();
- } catch (IOException e) {
- e.printStackTrace();
- }
注意: 以上都是一些代码片段, 没有完全串联起来, 省略了一些类对象调用, 方法调用以及关键的线程池操作等等. 但是基本的方法已经展示出来, 剩下的后面的博客再去填坑.
光看上面的代码, 对于一些 NIO 方法的认知还是很模糊的. 下面通过阅读 selector 类和 SelectionKey 类的源码注释, 来加深对部分方法的理解.
selector 类
selector 是 NIO 的核心类, 下面是选择器的一些重要方法:
open 相关
open() 开启一个 selector
public abstract boolean isOpen(); 判断是否开启
- public static Selector open() throws IOException {
- return SelectorProvider.provider().openSelector();
- }
keys 相关
public abstract Set keys(); 返回所有 key 的集合
public abstract Set selectedKeys(); 返回已被选择的 key 的集合
select
下面几个方法都是返回已就绪通道的数量, 可能是 0;
selectNow(), 非阻塞方法;
select(), 仅在三种情况下返回, 1. 通道被选择; 2. 调用 wakeup 方法; 3. 线程中断.
select(timeout), 比 select() 多一个解除阻塞的条件, 即超时.
wakeup(), 解除正在阻塞的 select 方法的阻塞, 立即返回
close(), 关闭 selector.
SelectionKey 类
注册进 selector 的任何一个 channel 都用一个 SelectionKey 对象来指代.
操作集
Operation-set: 操作集, 一些常量 int 值, 代表各种类型的操作; 一个 selection key 包含两个操作集, interest set 和 ready-operation set
- public static final int OP_READ = 1 << 0;
- public static final int OP_WRITE = 1 << 2;
- public static final int OP_CONNECT = 1 << 3;
- public static final int OP_ACCEPT = 1 << 4;
interest set: 兴趣集; 一个 channel 所有的操作集; 可通过 interestOps(int) 方法来更新
ready-operation set: 就绪操作集, 只包含使得 channel 被报告就绪的操作, 底层通过与或操作来更新; 例如当一个 channel 读取就绪时, 将 read 操作集加入到就绪集中.
方法列表
public abstract SelectableChannel channel(): 返回此选择键所关联的通道. 即使此 key 已经被取消, 仍然会返回;
public abstract Selector selector(): 返回此选择键所关联的选择器, 即使此键已经被取消, 仍然会返回;
public abstract boolean isValid(): 检测此 key 是否有效. 当 key 被取消, 或者通道被关闭, 或者 selector 被关闭, 都将导致此 key 无效. 在 AbstractSelector.removeKey(key) 中, 会导致 selectionKey 被置为无效;
public abstract void cancel(): 请求将此键取消注册. 一旦返回成功, 那么该键就是无效的, 被添加到 selector 的 cancelledKeys 中. cancel 操作将 key 的 valid 属性置为 false, 并执行 selector.cancel(key)(即将 key 加入 cancelledkey 集合);
public abstract int interesOps(): 获得此键的 interes 集合;
public abstract SelectionKey interestOps(int ops): 将此键的 interst 设置为指定值. 此操作会对 ops 和 channel.validOps 进行校验. 如果此 ops 不会当前 channel 支持, 将抛出异常;
public abstract int readyOps(): 获取此键上 ready 操作集合. 即在当前通道上已经就绪的事件;
public final boolean isReadable(): 检测此键 "read" 事件是否就绪. 等效于:(readyOps() & OP_READ) != 0; 还有 isWritable(),isConnectable(),isAcceptable()
public final Object attach(Object ob): 将给定的对象作为附件添加到此 key 上. 在 key 有效期间, 附件可以在多个 ops 事件中传递;
public final Object attachment(): 获取附件. 一个 channel 的附件, 可以再当前 Channel(或者说是 SelectionKey) 生命周期中共享, 但是 attachment 数据不会作为 socket 数据在网络中传输.
终于写完了, 这篇博客只能算是对 NIO 简单介绍, 一些东西还没讲到. channel 和 buffer 部分的方法没有分析, 线程池部分没有加上, 还有输出操作那一套, 都没讲. 总想尽可能多地详细完整一点, 但是越深入, 知识点就越庞大, 所以只能放弃一部分内容, 于是成了现在这个样子. 如果详细规划一下拆开多个博客写会更好.
来源: https://www.cnblogs.com/buptleida/p/12633675.html