上次 Java NIO 分析(6): 从 BIO 到 NIO - 设计和概念 http://sound2gd.wang/2018/07/06/Java-NIO 分析 - 6-Java-NIO 中的概念 / 讲到了 NIO 的设计思想,
即 Doug Lea 大佬受 AWT 启发得到的事件驱动机制, 关键点在于
非阻塞处理器
事件分发组件
在 NIO 的 API 中, Channel 就是实现非阻塞的组件, 而事件分发 (Dispatcher) 使用的是 Selector 组件,
在传统的 I/O 流 (Stream) 是有方向的, 而 NIO 支持双向读写, 这样就需要将流中的数据读取到某个缓冲组件里,
即 Buffer 组件.
Buffer 组件还有个特殊的实现 DirectByteBuffer, 可以申请堆外内存, 关于为什么要申请堆外内存后续会谈.
1. Channel
Channel 是 NIO 中用来实现非阻塞数据操作的桥梁, 笔者猜测是借鉴的 Berkly Socket 的设计, 代表某种通道,
和 I/O Stream 只支持读或者写 (单向) 不一样, Channel 同时支持读和写, 但是只能读和写到 Buffer 中, 因为
支持了非阻塞, 读出的数据要找个地方临时存放.
Channel 主要实现有:
- SocketChannel
- ServerSocketChannel
- DatagramChannel
- FileChannel
基本类图如下:
点击查看大图
以上 Channel 涵盖了文件, TCP, UDP 网络的支持, 也是我们用的最多的.
Channel 都不是手动 new 出来的, 基本都是用静态方法 Open 出来的, 或者从 BIO 的 Stream 里封装得到的(本质上也是调用某 Channel 的 open 方法).
比如使用 FileChannel 来读写文件的一个例子:
- /**
- * 测试 FileChannel 模拟传统 IO 用竹筒多次取水的过程
- *
- * @author sound2gd
- *
- */
- public class FileChannelTest2 {
- public static void main(String[] args) throws Exception{
- FileInputStream sr = new FileInputStream("src/com/cris/chapter15/f6/FileChannelTest2.java");
- FileChannel fc = sr.getChannel();
- ByteBuffer bf = ByteBuffer.allocate(256);
- // 创建 Charset 对象
- Charset charset = Charset.forName("UTF-8");
- CharsetDecoder decoder = charset.newDecoder();
- while((fc.read(bf))!=-1){
- // 锁定 Buffer 的空白区
- bf.flip();
- // 转码
- CharBuffer cbuff = decoder.decode(bf);
- System.out.print(cbuff);
- //buffer 初始化, 用于下一次读取
- bf.clear();
- }
- }
- }
用法还是比较简单的, 从 Channel 读数据到 Buffer 用 read, 从 Buffer 写数据到 Channel 用 write
这里的 FileChannel 就是从 FileInputStream 上得到的, 查看其源码:
- public FileChannel getChannel() {
- synchronized (this) {
- if (channel == null) {
- channel = FileChannelImpl.open(fd, path, true, false, this);
- }
- return channel;
- }
- }
可以看到, 还是调用了 FileChannelImpl 的 open 方法
Scatter && Gather
上面的类图还可以看到
ScatteringByteChannel
和
GatheringByteChannel
, 它们分别代表 Scatter 和 Gather 操作
Scatter 是分散操作, 可以将一个 Channel 里的数据读取到多个 Buffer
Gather 是聚合操作, 可以将多个 Buffer 的数据读取到一个 Channel
在网络编程中这俩是常用操作, 比如 http 协议的解析通常会将 header 和 body 分散到俩 Buffer, 方便后续处理
Scatter 和 Gather 的细节限于篇幅不展开叙述, 感兴趣的读者可以自行了解
2. Buffer
Buffer 是一个容器, 本质上就是一个数组. 用于接受从 Channel 里传过来的数据
Buffer 的实现常见有:
看名字就知道是存放什么类型数据的 Buffer.
Buffer 的创建时通过 Buffer 类的静态方法来创建的. Buffer 有三个核心概念:
position: 位置, 用于指明下一个可以被读出的或者写入的缓冲区位置索引
limit: 界限, 第一个不应该被读出或者写入的缓冲区位置索引
capacity: 容量, 创建后不能改变
Buffer 类有一个实例方法: flip(). 其作用是将 limit 设置为 position 所在的位置, 然后将 position 置为 0 , 这就使得 Buffer 的读写指针又回到了开始位置. clear()方法就是将 position 置为 0,limit 置为 capacity.
为啥要有这种操作? 因为 Buffer 是支持读和写的, 写完了给别的地方用就要 flip, 免得数据处理出错
下面以 CharBuffer 为例举个简单的例子
- public static void main(String[] args) {
- // 创建 Buffer
- CharBuffer buffer = CharBuffer.allocate(8);
- System.out.println("buffer 的容量:" + buffer.capacity());
- System.out.println("buffer 的位置:" + buffer.position());
- System.out.println("buffer 的界限:" + buffer.limit());
- buffer.put('s');
- buffer.put('o');
- buffer.put('u');
- buffer.put('n');
- buffer.put('d');
- System.out.println("加入 5 个元素后 position:" + buffer.position());
- // 调用 flip
- buffer.flip();
- System.out.println("buffer 的容量:" + buffer.capacity());
- System.out.println("buffer 的位置:" + buffer.position());
- System.out.println("buffer 的界限:" + buffer.limit());
- // 取出第一个元素
- System.out.print("buffer 中的元素:" + buffer.get());
- while (buffer.hasRemaining()) {
- System.out.print(buffer.get());
- }
- System.out.println();
- System.out.println("取出第一个元素后 position=" + buffer.position());
- // 调用 clear
- buffer.clear();
- System.out.println("第 3 个元素" + buffer.get(2));
- }
输出结果请读者自行理解下
DirectByteBuffer
上面还有个特殊的类, 就是这个 DirectByteBuffer, 这个是有名的冰山对象,
分配 DirectByteBuffer 的时候, JVM 是申请一块直接内存(堆外), 然后地址关联到 DirectByteBufer 里的 address
它的回收器 sun.misc.Cleaner 使用的是虚引用, 当 DirectByteBuffer 被回收的时候, 其关联的堆外内存也会使用 Unsafe 释放掉
虽然在 DireactByteBuffer 堆内占用内存少, 但是可能关联一块非常大的堆外内存, 和冰山一样, 所以称为冰山对象
后面还会对 DirectByteBuffer 进行解析, 这个是 NIO 的一个重要 feature 之一
3. Selector
Selector 是 NIO 中用来实现事件分发的组件, 受 AWT 线程的启发, 用于接收 I/O 事件并分发到合适的处理器.
Selector 底层使用的依然是操作系统的 select,poll 和 epoll 系统调用, 支持使用一个线程来监听多个 fd 的 I/O 事件, 也即前面讲的 I/O 多路复用模型.
Selector 可以同时监控多个 SelectableChannel 的 IO 状况, 是非阻塞 IO 的核心, 一个 Selector 有三个 SelectionKey 集合
所有的 SelectionKey 集合, 代表了注册在该 Selector 上的 Channel
被选择的 SelectionKey 集合: 代表了所有可以通过 select 方法获取的, 需要进行 IO 处理的 Channel
被取消的 SelectionKey 集合: 代表了所有被取消注册关系的 Channel, 在下次执行 select 方法时. 这些 Channel 对应的 SelectKey 会被彻底删除
SelectableChannel 代表可以支持非阻塞 IO 操作的 Channel 对象, 它可以被注册到 Selector 上, 这种注册关系由 SelectionKey 实例表示
下面举个聊天室的例子
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.*;
- import java.nio.charset.Charset;
- import java.nio.charset.StandardCharsets;
- /**
- * 使用 NIO 来实现聊天室
- */
- public class NServer {
- // 用于检测所有 Channel 状态的 selector
- private Selector selector = null;
- // 定义实现编码, 解码的字符集对象
- private Charset charset = StandardCharsets.UTF_8;
- public void init() throws Exception {
- selector = Selector.open();
- // 通过 open 方法来打开一个未绑定的 ServerSocketChannel 实例
- ServerSocketChannel server = ServerSocketChannel.open();
- InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 8888);
- // 绑定到指定地址
- server.bind(isa);
- // 设置以非阻塞的方式工作
- server.configureBlocking(false);
- // 将 Server 注册到指定的 Selector 对象
- server.register(selector, SelectionKey.OP_ACCEPT);
- while (selector.select()> 0) {
- // 依次处理 selector 上的已选择的 SelectionKey
- for (SelectionKey sk : selector.selectedKeys()) {
- // 从 selector 上的已选择 key 集中删除正在处理的 SelectionKey
- selector.selectedKeys().remove(sk);
- // 如果 sk 对应的 Channel 包含客户端的连接请求
- if (sk.isAcceptable()) {
- // 调用 accept 方法接受此连接, 产生服务器端的 SocketChannel
- SocketChannel accept = server.accept();
- // 采用非阻塞模式
- accept.configureBlocking(false);
- // 将该 SocketChannel 注册到 selector
- accept.register(selector, SelectionKey.OP_READ);
- // 将 sk 对应的 Channel 设置成准备接受其他请求
- sk.interestOps(SelectionKey.OP_ACCEPT);
- }
- // 如果 sk 对应的 Channel 有数据需要读取
- if (sk.isReadable()) {
- // 获取该 SelctionKey 对应的 Channel, 该 Channel 有可读的数据
- SocketChannel channel = (SocketChannel) sk.channel();
- // 定义准备执行读取数据的 ByteBuffer
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- String content = "";
- // 开始读取数据
- try {
- while (channel.read(buffer)> 0) {
- buffer.flip();
- content += charset.decode(buffer);
- }
- // 打印从该 SK 对应的 Channel 读取到的数据
- System.out.println("读取的数据" + content);
- // 将 sk 对应的 channel 设置成准备下一次读取
- sk.interestOps(SelectionKey.OP_READ);
- } catch (Exception e) {
- // 如果捕获到了该 SK 对应的 Channel 出现了异常, 即表明
- // 该 Channel 对应的 Client 出现了问题, 所以从 selctor 中取消该 Sk 的注册
- sk.cancel();
- if (sk.channel() != null) {
- sk.channel().close();
- }
- }
- // 如果 content 的长度大于 0, 即聊天信息不为空,
- if (content.length()> 0) {
- // 遍历该 selecor 里注册的所有 SelectionKey
- for (SelectionKey key : selector.keys()) {
- // 获取该 key 对应的 channel
- SelectableChannel target = key.channel();
- // 如果该 Channel 是 SocketChannel 对象
- if (target instanceof SocketChannel) {
- // 将读取到的内容写入该 channel 中
- SocketChannel dest = (SocketChannel) target;
- dest.write(charset.encode(content));
- }
- }
- }
- }
- }
- }
- }
- public static void main(String[] args) {
- try {
- new NServer().init();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
使用 nc localhost 8888 就可以测试了, 多开几个终端以模拟多个客户端.
这个例子里使用了
ServerSocketChannel
, 类似于 BIO 中 ServerSocket, 用于监听某个地址和端口, 是 TCP 服务端的代表. 同时还可以看到 accept 之后得到了一个 SocketChannel, 代表一个 TCP socket 通道.
我们均使用了非阻塞模式, 在 read 的时候如果读取的数据不够, 也不会阻塞调用线程.
4. 总结
本节介绍了 NIO 的核心 Channel, Buffer 和 Selector, 它们的设计意图和解决的问题, 同时举了些简单的例子来说明用法.
NIO 的根本还是 I/O 多路复用, 操作系统告诉你哪个 fd 可读可写, 内核帮你做了 Event Loop, 比在应用层用户空间做无疑是提升了太多的.
来源: https://juejin.im/entry/5b515450f265da0f8456205b