上一节, 我们研究了 NioEventLoop 的创建过程, 其实也就是做了一些初始化, 把该准备的准备好了. 重点有两个
准备了一个
ThreadPerTaskExecutor
, 以后添加的 task, 每次执行的时候其实是实例化了了一个 netty 自定义的 Thread :(
FastThreadLocalThread
) 然后再调用 start() 执行任务.
准备了一个 NioEventLoop 数组 (EventExecutor), 配套了一个循环使用该数组的选择器 chooser, 该选择器根据 "线程数是否是 2 的幂次方" 提供了不同选择策略.
准备了 taskQueue 和 tailQueue 用于存放 eventLoop 任务, 和外部线程任务
这一节, 具体研究 NioEventLoop 的的启动过程.
入口: 端口绑定的时候 AbstractBootstrap#doBind0()
- // 入口
- ChannelFuture future = serverBootstrap.bind(PORT).sync();
- doBind(localAddress)
- doBind0(regFuture, channel, localAddress, promise);
- private static void doBind0(
- final ChannelFuture regFuture, final Channel channel,
- final SocketAddress localAddress, final ChannelPromise promise) {
- // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
- // the pipeline in its channelRegistered() implementation.
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- if (regFuture.isSuccess()) {
- channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } else {
- promise.setFailure(regFuture.cause());
- }
- }
- });
- }
首先是 channel.eventLoop() 获取的肯定是 NioEventLoop, 但是具体是怎么获取的呢? 往下研究
- @Override
- public NioEventLoop eventLoop() {
- return (NioEventLoop) super.eventLoop();
- }
- @Override
- public EventLoop eventLoop() {
- EventLoop eventLoop = this.eventLoop;
- if (eventLoop == null) {
- throw new IllegalStateException("channel not registered to an event loop");
- }
- return eventLoop;
- }
问题来了. 该方法确实是获取的 NioEventLoop, 但是是什么时候初始化 AbstractChannel#eventLoop 这个成员变量的呢? 翻看之前的《注册 Selector》 过程的分析, 发现确实是注册的时候做的. 也就是 AbstractChannel.AbstractUnsafe#register() 方法中, 通过方法参数讲 eventLoop 传入, 以下这段代码
- @Override
- public final void register(EventLoop eventLoop, final ChannelPromise promise) {
- // 省略代码
- AbstractChannel.this.eventLoop = eventLoop;
- // 省略代码
- }
该方法被 SingleThreadEventLoop#register() 调用并将自己传入的, 其本身就是 NioEventLoop 的父类, 在创建的时候就被存储子啊 EventExecutor[] 数组中了. 那么是怎么从数组中取出来的呢?
- @Override
- public ChannelFuture register(final ChannelPromise promise) {
- ObjectUtil.checkNotNull(promise, "promise");
- promise.channel().unsafe().register(this, promise);
- return promise;
- }
前面也提到过, 注册的过程我们发现从 EventExecutor[] 中选择 EventLoop 的代码是在 MultithreadEventLoopGroup#register() 中
- @Override
- public ChannelFuture register(Channel channel) {
- return next().register(channel);
- }
- @Override
- public EventLoop next() {
- return (EventLoop) super.next();
- }
- //io.netty.util.concurrent.MultithreadEventExecutorGroup#next()
- @Override
- public EventExecutor next() {
- return chooser.next();
- }
- @UnstableApi
- interface EventExecutorChooser {
- /**
- * Returns the new {@link EventExecutor} to use.
- */
- EventExecutor next();
- }
终于, 在这里获取了之前创建 NioEventLoop 准备好的选择器. 那么 next() 方法是在两个选择器策略里面定义的, 用于继续按下标后, 循环获取 EventExecutor[] 数组中的 EventLoop(这里是 NioEventLoop)
- private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
- private final AtomicInteger idx = new AtomicInteger();
- private final EventExecutor[] executors;
- PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- return executors[idx.getAndIncrement() & executors.length - 1];
- }
- }
- private static final class GenericEventExecutorChooser implements EventExecutorChooser {
- private final AtomicInteger idx = new AtomicInteger();
- private final EventExecutor[] executors;
- GenericEventExecutorChooser(EventExecutor[] executors) {
- this.executors = executors;
- }
- @Override
- public EventExecutor next() {
- return executors[Math.abs(idx.getAndIncrement() % executors.length)];
- }
- }
回过到启动过程, NioEventLoop 已经获取, 调用 execute() 方法, 并且实例化了一个 Runnable, 该 Runnable 就是一个新的 task
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- if (regFuture.isSuccess()) {
- channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } else {
- promise.setFailure(regFuture.cause());
- }
- }
- });
而 execute() 方法其实是在 NioEventLoop 的父类, SingleThreadEventExecutor#execute(). 这里有三个动作.
inEventLoop() : 判断当前线程是否是 EventLoop 持有的线程
startThread() : 开启自定义线程
addTask(task) : 添加到任务队列
- @Override
- public void execute(Runnable task) {
- if (task == null) {
- throw new NullPointerException("task");
- }
- // 判断当前线程是否是 EventLoop 持有的线程, 这个时间点返回的是 false
- boolean inEventLoop = inEventLoop();
- if (inEventLoop) {
- addTask(task);
- } else {
- // 不在同一线程, 开启自定义线程
- startThread();
- // 添加任务队列
- addTask(task);
- if (isShutdown() && removeTask(task)) {
- reject();
- }
- }
- if (!addTaskWakesUp && wakesUpForTask(task)) {
- wakeup(inEventLoop);
- }
- }
inEventLoop() 很多地方都用到, 值得深究, 追综到
SingleThreadEventExecutor#inEventLoop()
, 确实是跟持有的成员变量
private volatile Thread thread;
进行对比. 注意, 该成员变量 this.thread 此时并没有被初始化, 因此为 null, 返回的是 false. 可以 Debug 进行跟踪.
- @Override
- public boolean inEventLoop() {
- // 传入当前线程, 当前线程为: main 线程
- return inEventLoop(Thread.currentThread());
- }
- @Override
- public boolean inEventLoop(Thread thread) {
- return thread == this.thread;
- }
startThread() 开启一个线程用于执行 NioEventLoop 的 run 方法, 并且让 NioEventLoop 持有这个线程. 这一刻, NioEventLoop 跑起来了.
- private void startThread() {
- // 判断当前线程是否是未启动的
- if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
- if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
- // 启动
- doStartThread();
- }
- }
- }
- private void doStartThread() {
- assert thread == null;
- //ThreadPerTaskExecutor#execute() 开启 netty 自定义的线程 FastThreadLocalThread, 并执行
- executor.execute(new Runnable() {
- @Override
- public void run() {
- // 运行时, 开始持有运行时线程
- thread = Thread.currentThread();
- // 省略代码
- // 执行 NioEventLoop.run()
- SingleThreadEventExecutor.this.run();
- // 省略代码
- });
- }
addTask(task) 是在 NioEventLoop 的线程跑起来了之后, 将最初绑定端口的任务 offer() 到了 taskQueue, 存储起来, 异步执行.
- protected void addTask(Runnable task) {
- if (task == null) {
- throw new NullPointerException("task");
- }
- if (!offerTask(task)) {
- reject(task);
- }
- }
- final boolean offerTask(Runnable task) {
- if (isShutdown()) {
- reject();
- }
- return taskQueue.offer(task);
- }
来源: http://www.jianshu.com/p/128db5367936