NioEventLoop 的启动时机是在服务端的 NioServerSocketChannel 中的 ServerSocketChannel 初始化完成, 且注册在 NioEventLoop 后执行的, 下一步就是去绑定端口, 但是在绑定端口前, 需要完成 NioEventLoop 的启动工作, 因为程序运行到这个阶段为止, 依然只有 MainThread 一条线程, 下面就开始阅读源码看 NioEventLoop 如何开启新的线程自立家门的
总想说 NioEventLoop 的整体结构, 像极了这个图
该图为, 是我画的 NioEventLoop 启动的流程草图, 很糙, 但是不画它, 总觉的少了点啥...
NioEventLoop 的继承体系图
NioEventLoop 的线程开启之路
程序的入口是 AbstractBootStrap, 这个抽象的启动辅助类, 找到它准备绑定端口的 doBind0() 方法, 下面是源码:
- private static void doBind0(
- final ChannelFuture regFuture, final Channel channel,
- final SocketAddress localAddress, final ChannelPromise promise) {
- // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
- // the pipeline in its channelRegistered() implementation.
- // todo 此方法在触发 channelRegistered() 之前调用, 给用户一个机会, 在 channelRegistered() 中设置 pipeline
- // todo 这是 eventLoop 启动的逻辑 , 下面的 Runable 就是一个 task 任务, 什么任务的呢? 绑定端口
- // todo 进入 exeute()
- System.out.println("00000");
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- if (regFuture.isSuccess()) {
- // todo channel 绑定端口并且添加了一个 listenner
- channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } else {
- promise.setFailure(regFuture.cause());
- }
- }
- });
- }
我们关注上面的 channel.execute(Runable) 方法, 如果我们直接使用鼠标点击进去, 会进入 java.util.concurrent 包下的 Executor 接口, 原因是因为, 它是 NioEventLoop 继承体系的超顶级接口, 见上图, 我们进入它的实现类, SingleThreadEventExcutor, 也就是 NioEventLoop 的间接父类, 源码如下:
- // todo eventLoop 事件循环里面的 task, 会在本类 SingleThreadEventExecutor 里面: execute() 执行
- @Override
- public void execute(Runnable task) {
- if (task == null) {
- throw new NullPointerException("task");
- }
- // todo 同样判断当前线程是不是 eventLoop 里面的那条唯一的线程, 如果是的话, 就把当前任务放到任务队列里面等着当前的线程执行
- // todo , 不是的话就开启新的线程去执行这个新的任务
- // todo , eventLoop 一生只会绑定一个线程, 服务器启动时只有一条主线程, 一直都是在做初始化的工作, 并没有任何一次 start()
- // todo 所以走的是 else, 在 else 中首先开启新的线程, 而后把任务添加进去
- boolean inEventLoop = inEventLoop();
- if (inEventLoop) {
- addTask(task);
- } else {
- // todo 开启线程 , 进入查看
- startThread();
- // todo 把任务丢进队列
- addTask(task);
- if (isShutdown() && removeTask(task)) {
- reject();
- }
- }
- if (!addTaskWakesUp && wakesUpForTask(task)) {
- wakeup(inEventLoop);
- }
- }
现在执行这些代码的线程依然是主线程, 主线程手上有绑定端口任务, 但是它想把这个任务提交给 NioEventLoop 去执行, 于是它就做出下面的判断
- boolean inEventLoop = inEventLoop();
- // 方法实现
- @Override
- public boolean inEventLoop(Thread thread) {
- return thread == this.thread;
- }
但是发现, 主线程并不是 NioEventLoop 唯一绑定的那个线程, 于是他就准备下面两件事:
开启激活当前 NioEventLoop 中的线程
把绑定端口的任务添加到任务队列
开启新线程的逻辑在下面, 我删除了一些收尾, 以及判断的代码, 保留了主要的逻辑
- private void doStartThread() {
- assert thread == null;
- // todo 断言线程为空, 然后才创建新的线程
- executor.execute(new Runnable() { // todo 每次 Execute 都是在使用 默认的线程工厂, 创建一个线程并执行 Runable 里面的任务
- @Override
- public void run() {
- // todo 获取刚才创建出来的线程, 保存在 NioEventLoop 中的 thread 变量里面, 这里其实就是在进行那个唯一的绑定
- thread = Thread.currentThread();
- updateLastExecutionTime();
- try {
- // todo 实际启动线程, 到这里 NioEventLoop 就启动完成了
- SingleThreadEventExecutor.this.run();
- }
- }
主要做了两件事第一波高潮来了 1. 调用了 NioEventLoop 的线程执行器的 execute, 这个方法的源码在下面, 可以看到, excute, 其实就是在创建线程, 线程创建完成后, 立即把新创建出来的线程当作是 NioEventLoop 相伴终生的线程;
- public final class ThreadPerTaskExecutor implements Executor {
- private final ThreadFactory threadFactory;
- public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
- if (threadFactory == null) {
- throw new NullPointerException("threadFactory");
- }
- this.threadFactory = threadFactory;
- }
- // todo 必须实现 Executor 里面唯一的抽象方法, execute , 执行性 任务
- @Override
- public void execute(Runnable command) {
- threadFactory.newThread(command).start();
- }
- }
创建 / 绑定完成了新的线程后, 第二波高潮来了, SingleThreadEventExecutor.this.run(); 这行代码的意思是, 调用本类的 Run() 方法, 这个 Run() 方法就是真正在干活的事件循环, 但是呢, 在本类中, Run() 是一个抽象方法, 因此我们要去找他的子类, 那么是谁重写的这个 Run() 呢? 就是 NioEventLoop, 它根据自己需求, 重写了这个方法
小结: 到现在, NioEventLoop 的线程已经开启了, 下面的重头戏就是看他是如何进行事件循环的
NioEventLoop 的事件循环 run()
我们来到了 NioEventLoop 的 run(), 他是个无限 for 循环, 主要完成了下面三件事
轮询 IO 事件
处理 IO 事件
处理非 IO 任务
这是 NioEventLoop 的 run() 的源码, 删除了部分注解和收尾工作,
- /**
- * todo select() 检查是否有 IO 事件
- * todo ProcessorSelectedKeys() 处理 IO 事件
- * todo RunAllTask() 处理异步任务队列
- */
- @Override
- protected void run() {
- for (; ; ) {
- try {
- switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
- case SelectStrategy.CONTINUE:
- continue;
- case SelectStrategy.SELECT:
- // todo 轮询 IO 事件, 等待事件的发生, 本方法下面的代码是处理接受到的感性趣的事件, 进入查看本方法
- select(wakenUp.getAndSet(false));
- if (wakenUp.get()) {
- selector.wakeup();
- }
- default:
- }
- cancelledKeys = 0;
- needsToSelectAgain = false;
- final int ioRatio = this.ioRatio; // todo 默认 50
- // todo 如果 ioRatio==100 就调用第一个 processSelectedKeys(); 否则就调用第二个
- if (ioRatio == 100) {
- try {
- // todo 处理 处理发生的感性趣的事件
- processSelectedKeys();
- } finally {
- // Ensure we always run tasks.
- // todo 用于处理 本 eventLoop 外的线程 扔到 taskQueue 中的任务
- runAllTasks();
- }
- } else {// todo 因为 ioRatio 默认是 50 , 所以来 else
- // todo 记录下开始的时间
- final long ioStartTime = System.nanoTime();
- try {
- // todo 处理 IO 事件
- processSelectedKeys();
- } finally {
- // Ensure we always run tasks.
- // todo 根据处理 IO 事件耗时 , 控制 下面的 runAllTasks 执行任务不能超过 ioTime 时间
- final long ioTime = System.nanoTime() - ioStartTime;
- // todo 这里面有聚合任务的逻辑
- runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
- }
- }
- } catch (Throwable t) {
- handleLoopException(t);
- }
- }
- }
下面进入它的 select(), 我们把 select() 称作: 基于 deadline 的任务穿插处理逻辑
下面直接贴出它的源码: 下面的代码中我写了一些注解了, 主要是分如下几步走
根据当前时间计算出本次 for() 的最迟截止时间, 也就是他的 deadline
判断 1 如果超过了 截止时间,
selector.selectNow();
直接退出
判断 2 如果任务队列中出现了新的任务
selector.selectNow();
直接退出
经过了上面 12 两次判断后, netty 进行阻塞式 select(time) , 默认是 1 秒这时可会会出现空轮询的 Bug
判断 3 如果经过阻塞式的轮询之后, 出现的感兴趣的事件, 或者任务队列又有新任务了, 或者定时任务中有新任务了, 或者被外部线程唤醒了 都直接退出循环
如果前面都没出问题, 最后检验是否出现了 JDK 空轮询的 BUG
- // todo 循环接受 IO 事件
- // todo 每次进行 select() 操作时, oldWakenUp 被标记为 false
- private void select(boolean oldWakenUp) throws IOException {
- Selector selector = this.selector;
- try {
- ///todo ----------------------------------------- 如下部分代码, 是 select() 的 deadLine 及任务穿插处理逻辑 -----------------------------------------------------
- // todo selectCnt 这个变量记录了 循环 select 的次数
- int selectCnt = 0;
- // todo 记录当前时间
- long currentTimeNanos = System.nanoTime();
- // todo 计算出估算的截止时间, 意思是, select() 操作不能超过 selectDeadLineNanos 这个时间, 不让它一直耗着, 外面也可能有任务等着当前线程处理
- long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
- // -------for 循环开始 -------
- for (; ; ) {
- // todo 计算超时时间
- long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
- if (timeoutMillis <= 0) {// todo 如果超时了 , 并且 selectCnt==0 , 就进行非阻塞的 select() , break, 跳出 for 循环
- if (selectCnt == 0) {
- selector.selectNow();
- selectCnt = 1;
- }
- break;
- }
- // todo 判断任务队列中时候还有别的任务, 如果有任务的话, 进入代码块, 非阻塞的 select() 并且 break; 跳出循环
- //todo 通过 cas 把线程安全的把 wakenU 设置成 true 表示退出 select() 方法, 已进入时, 我们设置 oldWakenUp 是 false
- if (hasTasks() && wakenUp.compareAndSet(false, true)) {
- selector.selectNow();
- selectCnt = 1;
- break;
- }
- ///todo ----------------------------------------- 如上部分代码, 是 select() 的 deadLine 及任务穿插处理逻辑 -----------------------------------------------------
- ///todo ----------------------------------------- 如下, 是 阻塞式的 select() -----------------------------------------------------
- // todo 上面设置的超时时间没到, 而且任务为空, 进行阻塞式的 select() , timeoutMillis 默认 1
- // todo netty 任务, 现在可以放心大胆的 阻塞 1 秒去轮询 channel 连接上是否发生的 selector 感性的事件
- int selectedKeys = selector.select(timeoutMillis);
- // todo 表示当前已经轮询了 SelectCnt 次了
- selectCnt++;
- // todo 阻塞完成轮询后, 马上进一步判断 只要满足下面的任意一条. 也将退出无限 for 循环, select()
- // todo selectedKeys != 0 表示轮询到了事件
- // todo oldWakenUp 当前的操作是否需要唤醒
- // todo wakenUp.get() 可能被外部线程唤醒
- // todo hasTasks() 任务队列中又有新任务了
- // todo hasScheduledTasks() 当时定时任务队列里面也有任务
- if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
- break;
- }
- ///todo ----------------------------------------- 如上, 是 阻塞式的 select() -----------------------------------------------------
- if (Thread.interrupted()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Selector.select() returned prematurely because" +
- "Thread.currentThread().interrupt() was called. Use" +
- "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
- }
- selectCnt = 1;
- break;
- }
- // todo 每次执行到这里就说明, 已经进行了一次阻塞式操作 , 并且还没有监听到任何感兴趣的事件, 也没有新的任务添加到队列, 记录当前的时间
- long time = System.nanoTime();
- // todo 如果 当前的时间 - 超时时间 >= 开始时间 把 selectCnt 设置为 1 , 表明已经进行了一次阻塞式操作
- // todo 每次 for 循环都会判断, 当前时间 currentTimeNanos 不能超过预订的超时时间 timeoutMillis
- // todo 但是, 现在的情况是, 虽然已经进行了一次 时长为 timeoutMillis 时间的阻塞式 select 了,
- // todo 然而, 我执行到当前代码的 时间 - 开始的时间 >= 超时的时间
- // todo 但是 如果 当前时间 - 超时时间 < 开始时间, 也就是说, 并没有阻塞 select, 而是立即返回了, 就表明这是一次空轮询
- // todo 而每次轮询 selectCnt ++; 于是有了下面的判断,
- if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis)>= currentTimeNanos) {
- // timeoutMillis elapsed without anything selected.
- selectCnt = 1;
- } else if (SELECTOR_AUTO_REBUILD_THRESHOLD> 0 &&
- // todo selectCnt 如果大于 512 表示 CPU 确实在空轮询, 于是 rebuild Selector
- selectCnt>= SELECTOR_AUTO_REBUILD_THRESHOLD) {
- // The selector returned prematurely many times in a row.
- // Rebuild the selector to work around the problem.
- logger.warn(
- "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
- selectCnt, selector);
- // todo 它的逻辑创建一个新的 selectKey , 把老的 Selector 上面的 key 注册进这个新的 selector 上面 , 进入查看
- rebuildSelector();
- selector = this.selector;
- // Select again to populate selectedKeys.
- // todo 解决了 Select 空轮询的 bug
- selector.selectNow();
- selectCnt = 1;
- break;
- }
- currentTimeNanos = time;
- }
- //// -----------for 循环结束 --------------
- if (selectCnt> MIN_PREMATURE_SELECTOR_RETURNS) {
- if (logger.isDebugEnabled()) {
- logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
- selectCnt - 1, selector);
- }
- }
- } catch (CancelledKeyException e) {
- if (logger.isDebugEnabled()) {
- logger.debug(CancelledKeyException.class.getSimpleName() + "raised by a Selector {} - JDK bug?",
- selector, e);
- }
- // Harmless exception - log anyway
- }
- }
什么是 Jdk 的 Selector 空轮询
我们可以看到, 上面的 run() 方法, 经过两次判断后进入了指定时长的阻塞式轮询, 而我们常说的空轮询 bug, 指的就是本来该阻塞住轮询, 但是却直接返回了, 在这个死循环中, 它的畅通执行很可能使得 CPU 的使用率飙升, 于是把这种情况说是 jdk 的 selector 的空轮询的 bug
Netty 如何解决了 Jdk 的 Selector 空轮询 bug?
一个分支语句 if(){}else{} , 首先他记录下, 现在执行判断时的时间, 然后用下面的公式判断
当前的时间 t1 - 预订的 deadLine 截止时间 t2>= 开始进入 for 循环的时间 t3
我们想, 如果说, 上面的阻塞式 select(t2) 没出现任何问题, 那么 我现在来检验是否出现了空轮询是时间 t1 = t2 + 执行其他代码的时间, 如果是这样, 上面的等式肯定是成立的, 等式成立说没 bug, 顺道把 selectCnt = 1;
但是如果出现了空轮询, select(t2) 并没有阻塞, 而是之间返回了, 那么现在的时间 t1 = 0 + 执行其他代码的时间, 这时的 t1 相对于上一个没有 bug 的大小, 明显少了一个 t2, 这时再用 t1-t2 都可能是一个负数, 等式不成立, 就进入了 else 的代码块, netty 接着判断, 是否是真的在空轮询, 如果说循环的次数达到了 512 次, netty 就确定真的出现了空轮询, 于是 nettyrebuild()Selector , 从新开启一个 Selector, 循环老的 Selector 上面的上面的注册的时间, 重新注册进新的 Selector 上, 用这个中替换 Selector 的方法, 解决了空轮询的 bug
感性趣的事件, 是何时添加到 selectedkeys 中的?
ok, run() 的三部曲第一步轮询已经完成了, 下一步就是处理轮询出来的感兴趣的 IO 事件, processSelectedKeys() , 下面我们进入这个方法, 如果这个 selectedKeys 不为空, 就进去 processSelectedKeysOptimized(); 继续处理 IO 事件,
比较有趣的是, 这个 selectedKeys 是谁? , 别忘了我们是在 NioEventLoop 中, 是它开启了 Selector, 也是他使用反射的手段将 Selector, 存放感兴趣事件的 HashSet 集合替换成了 SelectedSelectionKeySet 这个名叫 set, 实为数组的数据结构, 当时的情况如下:
创建出
SelectedSelectionKeySet
的实例 selectedKeySet
使用反射, 将 unwrappedSelector 中的 selectedKeysField 字段, 替换成 selectedKeySet
最后一步, 也很重要
selectedKeys = selectedKeySet;
看到第三步没? 也就是说, 我们现在再想获取装有感兴趣 Key 的 HashSet 集合, 已经不可能了, 取而代之的是更优秀的 selectedKeySet, 也就是下面我们使用的 selectedKeys , 于是我们想处理感性趣的事件, 直接从 selectedKeys 中取, Selector 轮询到感兴趣的事件, 也会直接往 selectedKeys 中放
- private void processSelectedKeys() {
- // todo selectedKeys 就是经过优化后的 keys(底层是数组)
- if (selectedKeys != null) {
- processSelectedKeysOptimized();
- } else {
- processSelectedKeysPlain(selector.selectedKeys());
- }
- }
下面接着跟进 processSelectedKeysOptimized();, 关于这个方法的有趣的地方, 我写在这段代码的下面
- private void processSelectedKeysOptimized() {
- for (int i = 0; i <selectedKeys.size; ++i) {
- final SelectionKey k = selectedKeys.keys[i];
- // null out entry in the array to allow to have it GC'ed once the Channel close
- // todo 数组输出空项, 从而允许在 channel 关闭时对其进行垃圾回收
- // See https://github.com/netty/netty/issues/2363
- // todo 数组中当前循环对应的 keys 质空, 这种感兴趣的事件只处理一次就行
- selectedKeys.keys[i] = null;
- // todo 获取出 attachment, 默认情况下就是注册进 Selector 时, 传入的第三个参数 this===> NioServerSocketChannel
- // todo 一个 Selector 中可能被绑定上了成千上万个 Channel, 通过 K+attachment 的手段, 精确的取出发生指定事件的 channel, 进而获取 channel 中的 unsafe 类进行下一步处理
- final Object a = k.attachment();
- // todo
- if (a instanceof AbstractNioChannel) {
- // todo 进入这个方法, 传进入 感兴趣的 key + NioSocketChannel
- processSelectedKey(k, (AbstractNioChannel) a);
- } else {
- @SuppressWarnings("unchecked")
- NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
- processSelectedKey(k, task);
- }
- if (needsToSelectAgain) {
- // null out entries in the array to allow to have it GC'ed once the Channel close
- // See https://github.com/netty/netty/issues/2363
- selectedKeys.reset(i + 1);
- selectAgain();
- i = -1;
- }
- }
- }
NioEventLoop 是如何在千百条 channel 中, 精确获取出现指定感兴趣事件的 channel 的?
上面这个方法, 就是在真真正正的处理 IO 事件, 看看这段代码, 我们发现了这样一行代码
final Object a = k.attachment();
并且, 判断出 Key 的类型后, 执行处理逻辑的代码中的入参都是一样的 processSelectedKey(a,k) , 这是在干什么呢?
其实, 我们知道, 每个 NioEventLoop 开始干活后, 会有很多客户端的连接 channel 前来和它建立连接, 一个事件循环同时为多条 channel 服务, 而且一条 channel 的整个生命周期都只和一个 NioEventLoop 关联
现在好了, 事件循环的选择器轮询出了诸多的 channel 中有 channel 出现了感兴趣的事件, 下一步处理这个事件的前提得知道, 究竟是哪个 channel?
使用的 attachment 特性, 早在 Channel 注册进 Selector 时, 进存放进去了, 下面是 Netty 中, Channel 注册进 Selector 的源码
- @Override
- protected void doRegister() throws Exception {
- boolean selected = false;
- for (;;) {
- try {
- // todo javaChannel() -- 返回 SelectableChanel 可选择的 Channel, 换句话说, 可以和 Selector 搭配使用, 他是 channel 体系的顶级抽象类, 实际的类型是 ServerSocketChannel
- // todo eventLoop().unwrappedSelector(), --> 获取选择器, 现在在 AbstractNioChannel 中 获取到的 eventLoop 是 BossGroup 里面的
- // todo 到目前看, 他是把 ServerSocketChannel(系统创建的) 注册进了 EventLoop 的选择器
- // todo 到目前为止, 虽然注册上了, 但是它不关心任何事件
- selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- return;
- } catch (CancelledKeyException e) {
这里的 最后一个参数是 this 是当前的 channel , 意思是把当前的 Channel 当成是一个 attachment(附件) 绑定到 selector 上 作用如下:
当 channel 在这里注册进 selector 中返回一个 selectionKey, 这个 key 告诉 selector 这个 channel 是自己的
当 selector 轮询到 有 channel 出现了自己的感兴趣的事件时, 需要从成百上千的 channel 精确的匹配出 出现 Io 事件的 channel, 于是 seleor 就在这里提前把 channel 存放入 attachment 中, 后来使用
最后一个 this 参数, 如果是服务启动时, 他就是 NioServerSocketChannel 如果是客户端他就是 NioSocketChannel
ok, 现在就捋清楚了, 挖坑, 填坑的过程; 下面进入 processSelectedKey(SelectionKey k, AbstractNioChannel ch) 执行 IO 任务, 源码如下: 我们可以看到, 具体的处理 IO 的任务都是用 Channel 的内部类 unSafe() 完成的, 到这里就不往下跟进了, 后续写新博客连载
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
- // todo 这个 unsafe 也是可 channel 也是和 Channel 进行唯一绑定的对象
- final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
- if (!k.isValid()) { // todo 确保 Key 的合法
- final EventLoop eventLoop;
- try {
- eventLoop = ch.eventLoop();
- } catch (Throwable ignored) {
- // If the channel implementation throws an exception because there is no event loop, we ignore this
- // because we are only trying to determine if ch is registered to this event loop and thus has authority
- // to close ch.
- return;
- }
- // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
- // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
- // still healthy and should not be closed.
- // See https://github.com/netty/netty/issues/5125
- if (eventLoop != this || eventLoop == null) { // todo 确保多线程下的安全性
- return;
- }
- // close the channel if the key is not valid anymore
- unsafe.close(unsafe.voidPromise());
- return;
- }
- // todo NioServerSocketChannel 和 selectKey 都合法的话, 就进入下面的 处理阶段
- try {
- // todo 获取 SelectedKey 的 关心的选项
- int readyOps = k.readyOps();
- // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
- // the NIO JDK channel implementation may throw a NotYetConnectedException.
- // todo 在 read() write() 之前我们需要调用 finishConnect() 方法, 否则 NIO JDK 抛出异常
- if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
- // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
- // See https://github.com/netty/netty/issues/924
- int ops = k.interestOps();
- ops &= ~SelectionKey.OP_CONNECT;
- k.interestOps( );
- unsafe.finishConnect();
- }
- // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
- if ((readyOps & SelectionKey.OP_WRITE) != 0) {
- // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
- ch.unsafe().forceFlush();
- }
- // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
- // to a spin loop
- // todo 同样是检查 readOps 是否为零, 来检查是否出现了 jdk 空轮询的 bug
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- unsafe.read();
- }
- } catch (CancelledKeyException ignored) {
- unsafe.close(unsafe.voidPromise());
- }
- }
处理非 IO 任务
上面的处理 IO 事件结束后, 第三波高潮就来了, 处理任务队列中的任务, runAllTask(timeOutMinils), 他也是有生命时长限制的 deadline, 它主要完成了如下的几步:
聚合任务, 把到期的定时任务转移到普通任务队列
循环从普通队列获取任务
执行任务
每执行完 64 个任务, 判断是否到期了
收尾工作
源码如下:
- protected boolean runAllTasks(long timeoutNanos) {
- // todo 聚合任务, 会把定时任务放入普通的任务队列中 进入查看
- fetchFromScheduledTaskQueue();
- // todo 从普通的队列中拿出一个任务
- Runnable task = pollTask();
- if (task == null) {
- afterRunningAllTasks();
- return false;
- }
- // todo 计算截止时间, 表示任务的执行, 最好别超过这个时间
- final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
- long runTasks = 0;
- long lastExecutionTime;
- // todo for 循环执行任务
- for (;;) {
- // todo 执行任务, 方法里调用 task.run();
- safeExecute(task);
- runTasks ++;
- // Check timeout every 64 tasks because nanoTime() is relatively expensive.
- // XXX: Hard-coded value - will make it configurable if it is really a problem.
- // todo 因为 nanoTime(); 的执行也是个相对耗时的操作, 因此没执行完 64 个任务后, 检查有没有超时
- if ((runTasks & 0x3F) == 0) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
- if (lastExecutionTime>= deadline) {
- break;
- }
- }
- // todo 拿新的任务
- task = pollTask();
- if (task == null) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
- break;
- }
- }
- // todo 每个任务执行结束都有个收尾的构造
- afterRunningAllTasks();
- this.lastExecutionTime = lastExecutionTime;
- return true;
- }
NioEventLoop 如何聚合任务?
聚合任务就是把已经到执行时间的任务从定时任务队列中全部取出 , 放入普通任务队列然后执行, 我们进入上的第一个方法 fetchFromScheduledTaskQueue, 源码如下,
- private boolean fetchFromScheduledTaskQueue() {
- // todo 拉取第一个聚合任务
- long nanoTime = AbstractScheduledEventExecutor.nanoTime();
- // todo 从任务丢列中取出 截止时间是 nanoTime 的定时任务 ,
- // todo 往定时队列中添加 ScheduledFutureTask 任务, 排序的基准是 ScheduledFutureTask 的 compare 方法, 按照时间, 从小到大
- // todo 于是当我们发现队列中的第一个任务, 也就是截止时间最近的任务的截止时间比我们的
- Runnable scheduledTask = pollScheduledTask(nanoTime);
- while (scheduledTask != null) {
- // todo scheduledTask != null 表示定时任务该被执行了, 于是将定时任务添加到 普通任务队列
- if (!taskQueue.offer(scheduledTask)) {
- // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
- // todo 如果添加失败了, 把这个任务从新放入到定时任务队列中, 再尝试添加
- scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
- return false;
- }
- // todo 循环, 尝试拉取定时任务 , 循环结束后, 所有的任务全部会被添加到 task 里面
- scheduledTask = pollScheduledTask(nanoTime);
- }
- return true;
- }
根据指定的截止时间, 从定时任务队列中取出任务, 定时任务队列中任务按照时间排序, 时间越短的, 排在前面, 时间相同, 按照添加的顺序排序, 现在的任务就是检查定时任务队列中任务, 尝试把里面的任务挨个取出来, 于是 netty 使用这个方法 Runnable scheduledTask = pollScheduledTask(nanoTime); 然后马上在 while(){} 循环中判断是否存在, 这个方法实现源码如下, 不难看出, 他是在根据时间判断
- /**
- * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
- * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
- * todo 根据给定的纳秒值, 返回 Runable 定时任务 , 并且, 每次使用都要冲洗使用是 nanoTime() 来矫正时间
- */
- protected final Runnable pollScheduledTask(long nanoTime) {
- assert inEventLoop();
- Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
- ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
- if (scheduledTask == null) {
- return null;
- }
- // todo 如果定时任务的截止时间 <= 我们穿进来的时间, 就把他返回
- if (scheduledTask.deadlineNanos() <= nanoTime) {
- scheduledTaskQueue.remove();
- return scheduledTask;
- }
- // todo 否则返回 kong, 表示当前所有的定时任务都没到期, 没有可以执行的
- return null;
- }
经过循环之后, 到期的任务, 全被添加到 taskQueue 里面了, 下面就是执行 TaskQueue 里面的任务
任务队列中的任务是怎么执行的?
safeExecute(task); 方法, 执行任务队列中的任务
源码如下: 实际上就行执行了 task 这个 Runable 的 Run 方法
- /**
- * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
- */
- protected static void safeExecute(Runnable task) {
- try {
- task.run();
- } catch (Throwable t) {
- logger.warn("A task raised an exception. Task: {}", task, t);
- }
- }
总结一下: 到现在为止, EventLoop 已经启动了, 一说到 NioEventLoop 总是想起上图, 现在他可以接受新的连接接入, 轮询, 处理任务...
来源: https://www.cnblogs.com/ZhuChangwu/p/11196791.html