好久没写博客了, 最近打算花些时间把 Netty 的源码好好读一读, 下面是本人在学习的过程中的一些笔记, 不能确保自己思考的完全是正确的, 如果有错误, 欢迎大家指正.
由于本人的语文功底烂的很, 通篇使用大白话来讲解 0.0, 有一些概念上的东西, 博主可能不会明确的给出定义, 建议使用过 Netty 的同学一起来研究.
好了, 我们一起来看下吧.
Netty 是一款用于快速开发的高性能的网络应用程序的 Java 框架. 说到 Netty, 我们先对几种 I/O 模型进行一下比对:
那么伪异步 IO 是啥呢?
其实就是加入了线程池 (ThreadPoolExecutor), 对接入的客户端的 Socket 封装成 task, 实现了 Runnable 接口, 然后投递到线程池中处理, 这样就避免了 BIO 那种一个客户端连接一个 IO 线程的情况, 防止资源耗尽和宕机. 但是这种方式底层的通信依然采用了同步阻塞模型, 无法从根本上解决问题.
那么 AIO 又是啥呢?
NIO2.0 引入了新的一步通道的概念, 并提供了异步文件通道和异步套接字的实现. 它不需要通过多路复用器对注册的通道进行轮询操作即可实现异步读写, 属于真正意义上的异步非阻塞 IO.
1, 通过 java.util.concurrent.Future 类来异步获取操作的结果.
2, 在执行异步操作的时候传入一个 CompletionHandler 接口的实现类, 作为操作完成的回调.
接口有以下两个方法.
- /**
- * Invoked when an operation has completed.
- *
- * @param result
- * The result of the I/O operation.
- * @param attachment
- * The object attached to the I/O operation when it was initiated.
- */
- void completed(V result, A attachment);
- /**
- * Invoked when an operation fails.
- *
- * @param exc
- * The exception to indicate why the I/O operation failed
- * @param attachment
- * The object attached to the I/O operation when it was initiated.
- */
- void failed(Throwable exc, A attachment);
好的, 下面也稍微回顾一下 NIO, 以及 NIO 涉及的几个关键组件:
缓冲区 Buffer
通道 Channel
多路复用器 Selector
Buffer : 看什么都不如看官方文档来的更准确, 下面是官方 Buffer javadoc 内容, 我们来看下:
里面讲述了, buffer 抽象类 是一个数据容器, 除了内容, 还有一些属性, capacity,limit,position.
capacity 是容器的容量, 这个值一旦被创建, 就无法修改. limit 是 不应该被读或写的第一个元素的位置. position 是指下一个将会被读或写的位置, 这个值一定小于等于 limit.
另外 javadoc 中还提到了 mark 和 reset, 其中 mark 其实就是打一个标记, 把当前的 position 赋给 mark. 那么 reset 的 描述是这样的 把当前的 position 改成之前 mark 的位置.
ok, 由上面的文档可以得出下面的顺序 0 <= mark <= position <= limit <= capacity
其实 Buffer 中还有一个非常重要的方法必须要说一下, 那就是 flip() , 看下 javadoc
这个其实就是把 当前的 limit = position, position = 0, 当然如果之前有 mark 也会失效, 设置成 - 1, 当你往 buffer 中写了数据的时候, 只有执行 flip() 方法, 才可以正确的读取数据, doc 中还指出这个方法经常和 compact() 方法连着用. 同样, 贴出 javadoc:
相当于什么呢, 就相当于是清理掉已经读取过得数据, 比如 position = 5 , limit = 10, 前 5 个数据经读取过了, 那么将新建一个 buffer, 将当前 position 到 limit 的数据拷贝到一个新的 Buffer 中, 那么新的 buffer 的 postion = limit-postion, limit = capacity, 好了, 看源码是这样的, 接下来就是验证一下了:
- ByteBuffer buffer = ByteBuffer.allocate(10);
- buffer.put("helloworld".getBytes());
- System.out.println(buffer.position() + ":" + buffer.limit());
- buffer.flip();
- System.out.println(buffer.position() + ":" + buffer.limit());
- byte[] bytes = new byte[buffer.limit() + 1];
- for(int i=0; i<6; i++) {
- bytes[i] = buffer.get();
- }
- System.out.println(new String(bytes));
- System.out.println(buffer.position() + ":" + buffer.limit());
- System.out.println(buffer);
- buffer.compact();
- System.out.println(buffer.position() + ":" + buffer.limit());
- System.out.println(buffer);
测试结果如下:
- 10:10
- 0:10
- hellow
- 6:10
- java.nio.HeapByteBuffer[pos=6 lim=10 cap=10]
- 4:10
- java.nio.HeapByteBuffer[pos=4 lim=10 cap=10]
好了, Buffer 的源码看到这里也算是差不多了.
2,Channel
Channel 是一个通道, 它就像自来水管一样, 网络数据通过 Channel 读取与写入, 通道与流的不同之处在于通道是双向的, 流只是在一个方向上移动 (一个流必须是 InputStream 或者 OutStream 的子类), 而通道可以用于读, 写或者二者同时进行, 属于全双工.
这里我们也来看下源码吧, 就看 ServerSocketChannel
提供了几个比较重要的 API:
public static ServerSocketChannel open() throws IOException; // 通过该方法创建一个 Channel
看下 javadoc , 明确说明了 新创建的 channel 是没有任何绑定的, 在进行 accepted 之前需要绑定一个地址.
- public final ServerSocketChannel bind(SocketAddress local);// 绑定一个端口号
- public abstract SocketChannel accept() throws IOException; // 接收新的客户端
3,Selector 多路复用器 , 简单来说呢, Selector 会不断的轮训注册在其上的 Channel, 如果某个 Channel 上面发生了读写等事件, 这个 Channel 就会处理就绪状态, 会被 Selector 轮训出来, 然后拿到 SelectionKey Set 集合, 从而获取到每一个就绪状态的 Channel, 进行后续的 I/O 操作.
由于 JDK 使用了 epoll() 代替传统的 select 实现, 所以没有最大句柄的 1024/2048 的限制, 只需要一个线程负责 Selector 的轮训, 就可以接入成千上万的客户端. NB
channel 将会通过一个 SelectionKey 注册到一个 selector 上, 一个 selector 通过 open 方法去创建.
这一段着重指出, selectionKey 集合只能通过 set 集合的 remove() 方法 或者 一个迭代器的 remove() 方法来移除. 其余的方法都不可以修改 selected-key .
好了, 看到这里, 有些朋友可能似懂非懂, 但是看下下面的单元测试一下子就懂了.
这段代码实现了 Nio 的服务器端, 接收到客户端消息后, 然后通知所有的客户端.
- private static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap();
- public static void main(String[] args) {
- try {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 创建一个 Channel
- serverSocketChannel.configureBlocking(false); // 设置为非阻塞
- serverSocketChannel.bind(new InetSocketAddress(8899)); // 绑定端口
- Selector selector = Selector.open(); // 创建一个 Selector
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将 Channel 注册到 Selector 上, 设置 selectionKey 为 accept, 准备接收新的客户端连接
- while (true) { // 死循环不断轮训, 查看 是否有准备就绪的 channel
- selector.select(); // 阻塞等到就绪的 channel
- Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取到就绪的 selectionKeys 集合
- selectionKeys.forEach(value -> {
- try {
- if(value.isAcceptable()) { // 接收新的客户端事件
- ServerSocketChannel channel = (ServerSocketChannel)value.channel(); // 获取 channel
- SocketChannel clientChannel = channel.accept(); // 获取客户端的 socketChannel
- clientChannel.configureBlocking(false); // 设置为非阻塞
- String clientId = UUID.randomUUID().toString();
- System.out.println("客户端接入" + clientId);
- clientMap.put(clientId, clientChannel);
- clientChannel.register(selector, SelectionKey.OP_READ); // 这里重点说下, 当接收到新的客户端后, 接下来就是准备接收数据, 所以这里就是注册的是 Read 事件
- // 并且这里注册到 selector 上的是客户端对应的 SocketChannel, 而不是 ServerSocketChannel,
- // 因为 ServerScoketChannel 只负责接收新的客户端
- } else if(value.isReadable()) { // 接收到 read 事件
- SocketChannel clientChannel = (SocketChannel)value.channel(); // 所以这里是 SocketChannel
- ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配内存
- int count = clientChannel.read(buffer); // 写 channel 中的数据到 Buffer 中
- if (count> 0) {
- buffer.flip(); // 写完之后, 一定要执行 flip. 转化成读
- Charset charset = Charset.forName("utf-8");
- String receiveMsg = String.valueOf(charset.decode(buffer).array());
- System.out.println("receiveMsg =" +receiveMsg);
- Iterator<Map.Entry<String, SocketChannel>> it = clientMap.entrySet().iterator();
- String sendClient = null;
- while (it.hasNext()) {
- Map.Entry<String, SocketChannel> next = it.next();
- if(next.getValue() == clientChannel) {
- sendClient = next.getKey();
- break;
- }
- }
- it = clientMap.entrySet().iterator();
- ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
- while (it.hasNext()) {
- SocketChannel socketChannel = it.next().getValue();
- writeBuffer.clear();
- writeBuffer.put(("sendClient:" + sendClient + "发送了消息").getBytes());
- writeBuffer.flip();
- socketChannel.write(writeBuffer);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- selectionKeys.clear(); // 每次处理完这一批 selectionKeys, 一定要清空掉集合.
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- }
- }
ok, 上面是我自己的一些理解, 如果有问题欢迎大家指正. 下一篇, 我们将开始学习 Netty 的源码.
来源: https://www.cnblogs.com/huxipeng/p/10714404.html