接口定义
io.netty.channel.EventLoopGroup extends EventExecutorGroup
方法
|
说明
|
ChannelFuture register(Channel channel)
|
把一个 channel 注册到一个 EventLoop
|
ChannelFuture register(Channel channel, ChannelPromise promise);
|
同上
|
io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup
方法
|
说明
|
EventLoopGroup parent()
|
得到创建这个 eventLoop 的 EventLoopGroup
|
EventLoopGroup 定义的主要方法是 register, 这个方法的语义是把 channel 和 eventLoop 绑定在一起. 一个 channel 对应一个 eventLoop, 一个 eventLoop 会持有多个 channel.
I/O 线程 EventLoopGroup 的抽象实现
- io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup
- io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
两个类主功能都是实现了 EventLoopGroup 定义的 register 方法
- MultithreadEventLoopGroup
- public ChannelFuture register(Channel channel) {
- return next().register(channel);
- }
- public ChannelFuture register(Channel channel, ChannelPromise promise) {
- return next().register(channel, promise);
- }
- SingleThreadEventLoop
- public ChannelFuture register(Channel channel) {
- return register(channel, new DefaultChannelPromise(channel, this));
- }
- public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
- channel.unsafe().register(this, promise);
- return promise;
- }
register 的实现主要是为了调用 Channel.Unsafe 实例的 register 方法.
NIO 实现
- io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup
- io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop
NioEventLoopGroup 是在 MultithreadEventLoopGroup 基础上实现了对 JDK NIO Selector 的封装, 它实现以下几个功能:
创建 selector
在 selector 上注册 channel 感兴趣的 NIO 事件
实现 EventExecutor 的 run 方法, 定义 NIO 事件和 Executor 任务的处理流程.
把 NIO 事件转换成对 channel unsafe 的调用或 NioTask 的调用
控制线程执行 I/O 操作和排队任务的用时比例
处理 epoll selector CPU 100% 的 bug
下面来具体分析这几个功能的实现.
创建 Selector
- NioEventLoop#openSelector() 实现了创建 selector 的功能, 默认情况下, 使用 SelectorProvider#openSelector() 方法创建一个新个 selector:
- final Selector unwrappedSelector = provider.openSelector();
如果设置环境变量 io.netty.noKeySetOptimization=true, 会创建一个 selectedKeySet = new SelectedSelectionKeySet(), 然后使用 java 的反射机制把 selector 的 selectedKeys 和 publicSelectedKeys 替换成 selectedKeySet, 具体步骤是:
1. 得到 selector 的真正类型: sun.nio.ch.SelectorImpl
- Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
- @Override
- public Object run() {
- try {
- return Class.forName(
- "sun.nio.ch.SelectorImpl",
- false,
- PlatformDependent.getSystemClassLoader());
- } catch (Throwable cause) {
- return cause;
- }
- }
- });
- final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
2. 替换 selector 是属性 unwrappedSelector
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
- selectedKeysField.set(unwrappedSelector, selectedKeySet);
- publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
之所以会设计一个这样的优化选项, 是因为一般情况下调用完 selector 的 select 或 selectNow 方法后需要调用 Selector#selectedKeys() 得到触发 NIO 事件的的 SelectableChannel, 这样优化之后, 可以直接从 selectedKeySet 中得到已经触发了 NIO 事件的 SelectableChannel.
在 selector 上注册 channel 感兴趣的 NIO 事件
NioEventLoop 提供了 unwrappedSelector 方法, 这个方法返回了它创建好的 Selector 实例. 这样任何的外部类都可以把任意的 SelectableChannel 注册到这 selector 上. 在 AbstractNioChannel 中, doRegister 方法的实现就是使用了这个方法:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
另外, 它还提供了一个 register 方法:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
这个方法会把 task 当成 SelectableChannel 的附件注册到 selector 上:
ch.register(selector, interestOps, task);
实现 EventExecutor 的 run 方法, 定义 NIO 事件和 Executor 任务的处理流程
在 NioEventLoop 的 run 方法中实现 NIO 事件和 EventExecutor 的任务处理逻辑, 这个 run 方法在 io.netty.util.concurrent.SingleThreadEventExecutor 中定义. 在上一章中, 我们看到了 DefaultEventExecutor 中是如何实现这个 run 方法的, 这里我们将要看到这 run 方法的另一个实现. 和 SingleThreadEventExecutor 中的 run 方法相比, NioEventLoop 的 run 方法不仅要及时地执行 taskQueue 中的任务, 还要能及时地处理 NIO 事件, 因此它会同时检查 selector 中的 NIO 事件和和 taskQueue 队列, 任何一个中有事件需要处理或有任务需要执行, 它不会阻塞线程. 同时它也保证了在没有 NIO 事件和任务的情况下线程不会无谓的空转浪费 CUP 资源.
run 主要实现如下, 为了更清晰的说明它的主要功能, 我对原来的代码进行了一些删减.
- for(;;){
- try{
- //phase1: 同时检查 NIO 事件和任务
- switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
- case SelectStrategy.CONTINUE:
- continue;
- case SelectStrategy.SELECT:
- select(wakenUp.getAndSet(false)); // 在 taskQueue 中没有任务的时候执行 select
- }
- //phase2: 进入处理 NIO 事件, 执行 executor 任务
- try{
- // 处理 NIO 事件
- processSelectedKeys();
- }finally{
- // 处理 taskQueu 中的任务
- runAllTasks();
- }
- }catch(Throwable t){
- handleLoopException(t);
- }
- }
run 方法有两个阶段构成:
phase1: 检查 NIO 事件或 executor 任务, 如果有任何的 NIO 事件或 executor 任务进入 phase2.
这样阶段的主要工作在 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()) 和 select 中完成.
selectStrategy.calculateStrategy 实现
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
这行代码的含义是: 如果 hasTasks() == true, 调用以下 selector#selectNow, 然后进入 phase2. 否则调用 select. 这里使用了 strategy 模式, 默认的 strategy 实现是 io.netty.channe.DefaultSelectStrategy implements SelectStrategy
- @Override
- public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
- return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
- }
DefaultSelectStrategy 实现了 SelectStrategy 接口, 这接口定义了两个常量:
- int SELECT = -1;
- int CONTINUE = -2;
运行时 selectSuppler 参数传入的是 selectNowSupplier, 它的实现如下:
- private final IntSupplier selectNowSupplier = new IntSupplier() {
- @Override
- public int get() throws Exception {
- return selectNow();
- }
- };
这里的 get 方法调用了 selectNow, selectNow 调用的是 Selector#selectNew 方法, 这个方法的返回值是 >=0.
hashTasks 的传入的参数是 hasTask() 的返回值: return !taskQueue.isEmpty();
代码读到这里就会发现, 使用默认的的 SelectStrategy 实现, calculateStrategy 在 hasTasks()==true 时返回值 >=0, hasTasks() == false 时返回值是 SelectStrategy.SELECT, 不会返回 SelectStrategy.CONTINUE.
select 实现
select 的执行逻辑是:
1. 计算超 select 方法的结束时间 selectDeadLineNanos
- long currentTimeNanos = System.nanoTime();
- long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
2. 进入循环, 检查超时 -- 超时跳出循环.
- long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
- if (timeoutMillis <= 0) {
- if (selectCnt == 0) {
- selector.selectNow();
- selectCnt = 1;
- }
- break;
- }
3. 如果在 select 执行过程中有 executor 任务提交或可以当前的 wakeUp 由 false 变成 true, 跳出循环
- if (hasTasks() && wakenUp.compareAndSet(false, true)) {
- selector.selectNow();
- selectCnt = 1;
- break;
- }
4. 调用 selector#select 等待 NIO 事件.
- int selectedKeys = selector.select(timeoutMillis);
- selectCnt ++;
5. 如果满足这些条件的任何一个, 跳出循环: 有 NIO 事件, wakeUp 的新旧值都是 true,taskQueue 中有任务, 有定时任务到期.
- if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
- break;
- }
6. 如果线程被中断, 跳出循环.
- if (Thread.interrupted()) {
- break;
- }
7. 如果 selector.select 超时, 没有检查到任何 NIO 事件, 会在下次循环开始时跳出循环. 如果每次超时, 跳到第 2 步继续下一次循环.
- long time = System.nanoTime();
- if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis)>= currentTimeNanos) {
- selectCnt = 1;
- }
- currentTimeNanos = time;
select 最迟会在当前时间 >= selectDeadLineNanos 时返回, 这个时间是最近一个到期的定时任务执行的时间, 换言之如果没有任何的 NIO 事件或 executor 任务, select 会在定时任务到期时返回. 如果没有定时任务, delayNanos(currentTimeNanos) 返回的值是 TimeUnit.SECONDS.toNanos(1), 即 1 秒. select 会在检查到任何 NIO 事件或 executor 任务时返回, 为了保证这点, 在 selector.select(timeoutMillis) 前后都会调用 hasTasks 检查 executor 任务, 为了能在调用 executet 提交任务时唤醒 selector.select,NioEventLoop 覆盖了 SingleThreadEventExecutor 的 wake 方法:
- protected void wakeup(boolean inEventLoop) {
- if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
- selector.wakeup();
- }
- }
这个方法会及时的唤醒 selector.select, 保证新提交的任务可以得到及时的执行.
phase2: 进入处理 NIO 事件, 执行 executor 任务
这个阶段是先调用 processSelectedKeys() 处理 NIO 事件, 然后掉用 runAllTasks() 处理所有已经到期的定时任务和已经在排队的任务. 这个阶段还实现了 NIO 事件和 executor 任务的用时比例管理, 这个特性稍后会详细分析.
来源: https://www.cnblogs.com/brandonli/p/10100139.html