以下分析只讲 NIO
使用 java nio 做网络编程大致流程如下
这个流程有哪些可以优化的空间?
java nio 使用简介
java nio 启动源码分析
Netty 是对 java 网络框架的包装, 它本身肯定也会有类似的处理流程. 必定在这个方面做了自己的优化处理
Netty 使用入门
Netty Hello world 源码分析
获得 Selector
使用 Netty 的时候都会用到对应的 EventLoopGroup, 它实际上就完成了 Selector 的初始化过程
Netty 自定义了 SelectionKey 的集合, 做了层包装, 实际将 Selector 只有 1 个 SelectorKey 的集合换成了默认的两个集合
获得 Channel
使用 Netty 时会执行 channel 的类型, 然后在执行 bind 方法时, 此处就会对 channel 实行初始化
构建的方式为 class.newInstance(), 以 NioServerSocketChannel 为例, 它执行的就是对应的无参构造函数.
- public NioServerSocketChannel() {
- //newSocket 即返回 java 的 ServerSocketChannel
- this(newSocket(DEFAULT_SELECTOR_PROVIDER));
- }
- public NioServerSocketChannel(ServerSocketChannel channel) {
- // 指定当前 channel 用来接收连接请求, 并在父类中指定为非阻塞
- super(null, channel, SelectionKey.OP_ACCEPT);
- //javaChannel() 即这里的参数 channel
- config = new NioServerSocketChannelConfig(this, javaChannel().socket());
- }
紧接着 Netty 开始 channel 的初始化, 在 NioServerSocketChannel 的 pipeline 最后添加了一个 ChannelInboundHandlerAdapter 即 ServerBootstrapAcceptor, 它会执有 childGroup 和 childHandler,childHandler 即用户自定义的 channelHandler, 而 childGroup 则是处理请求所用的 EventLoop, 此时整个 pipeline 的结构为
childGroup 为源码中字段的命名, 对应为 group 中传递的 worker 线程池
channel 的注册与监听端口地址关联
注册即建立 channel 和 Selector 的关系, 值得注意的是, 注册使用的线程池为 group, 对应用户传入的线程池即 boss 线程池, 注册和端口, 地址关联则顺着 Netty 的启动流程进行
至此可以看到, java nio 所需要的准备工作都已经准备好了, 剩下的就是等待事件发生以及处理发生的事件. 与普通 java nio 的不同之处在于
Netty 准备了两个线程池, channel 注册, 端口绑定监听的只用到了其中同一个线程池
等待事件发生
NioEventLoop 实现了 Executor, 意味着它接受其它地方提交任务给它执行, execute 的大致结构如下
- // 判断当前正在执行的线程是否是 Netty 自己的 eventLoop 中保存的线程
- boolean inEventLoop = inEventLoop();
- if (inEventLoop) {
- // 往队列里添加任务
- addTask(task);
- } else {
- // 这里即运行 NioEventLoop 自身的 run 方法
- startThread();
- addTask(task);
- }
NioEventLoop 启动线程执行 run 方法, 整体结构如下
- for (;;) {
- if (hasTasks()) {
- selectNow();
- } else {
- select(oldWakenUp);
- }
- processSelectedKeys();
- runAllTasks();
- }
run 循环处理的流程如下
值得注意的是, 这是单个线程在运行, 而且非本线程的任务一概不处理
boss 线程的启动时机
在启动的过程中, 有 ServerBootstrap 来串起整个流程, 它的执行线程为主线程, 而注册事件都是交由线程池自己来执行的, 用程序表达来讲, 就是执行了 eventLoop 自己的 execute, 此时执行线程必定不是 EventLoop 自己的线程, 从而 boss 中的线程启动, 在队列任务中完成注册
新连接请求的到来
当 NioServerSocketChannel 绑定了端口之后, NioServerSocketChannel 对应的 NioEventLoop 会等待 channel 发生事件. 整个处理流程如下
读取消息的内容, 发生在 NioServerSocketChannel, 对于这个新的连接事件, 则包装成一个客户端的请求 channel 作为后续处理
- protected int doReadMessages(List<Object> buf) throws Exception {
- //1: 获取请求的 channel
- SocketChannel ch = javaChannel().accept();
- try {
- if (ch != null) {
- //2: 包装成一个请求, Socket channel 返回
- buf.add(new NioSocketChannel(this, ch));
- return 1;
- }
- } catch (Throwable t) {
- logger.warn("Failed to create a new channel from an accepted socket.", t);
- try {
- ch.close();
- } catch (Throwable t2) {
- logger.warn("Failed to close a socket.", t2);
- }
- }
- return 0;
- }
返回的 NioSocketChannel 则完成自身 channel 的初始化, 注册感兴趣的事件
- protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
- super(parent, ch, SelectionKey.OP_READ);
- }
回想到 boss 中的下一环即 ServerBootstrapAcceptor, 而它读取消息的处理则是添加用户自己的 handler, 并继续完成注册事件
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final Channel child = (Channel) msg;
- child.pipeline().addLast(childHandler);
- for (Entry<ChannelOption<?>, Object> e: childOptions) {
- try {
- if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
- logger.warn("Unknown channel option:" + e);
- }
- } catch (Throwable t) {
- logger.warn("Failed to set a channel option:" + child, t);
- }
- }
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
- try {
- childGroup.register(child).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
worker 线程的启动时机
worker 的注册发生在 boss 的线程执行中, 此刻必定不是同一个线程, 因而开始启动 worker 的线程, 并在内部完成注册事件, 等待读消息的到来
OP_read 消息处理
连接建立后的请求则是交由 NioSocketChannel 来处理, 它将读到的消息封装成 ByteBuf, 通过 InBound 处理器 fireChannelRead 依次传给其它的地方消费, 一直到 tailContext 消息处理完毕
此处也可以得知管道的 in 表示数据传入 netty, 回写则是通过 out 一直到 Head 然后写入 channel
Netty 中 Nio 的处理流程
从上述分析可以得到, Netty 的处理流程如下
boss 是否需要多个线程
mainReactor 多线程配置 , 对于多个端口监听是有益的, 当然 1 个也可以处理多端口
Reactor 模式
CPU 的处理速度快于 IO 处理速度, 在处理事情时, 最佳情况是 CPU 不会由于 IO 处理而遭到阻塞, 造成 CPU 的 "浪费", 当然可以用多线程去处理 IO 请求, 但是这会增加线程的上下文切换, 切换过去可能 IO 操作也还没有完成, 这也存在浪费的情况.
另一种方式是: 当 IO 操作完成之后, 再通知 CPU 进行处理. 那谁来知晓 IO 操作完成? 并将事件讲给 CPU 处理呢? 在 Reactor 模式中, 这就是 Reactor 的作用, 它启动一个不断执行的线程来等待 IO 发生, 并按照事件类型, 分发给不同的事先注册好的事件处理器来处理
Reactor 模式抽象如下
抽象图由作者提供 http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
reactor 参考
来源: https://juejin.im/post/5c88bbd5e51d455909681847