EventLoopGroup
(如果使用到的是 NIO, 那么通常是 NioEventLoopGroup), 那么这个 NioEventLoopGroup 在 Netty 中到底扮演着什么角色呢?
NIO 的 Reactor 模型
补充多线程的 reactor 模式
Reactor 多线程模型 有如下特点:
有专门一个线程, 即 Acceptor 线程用于监听客户端的 TCP 连接请求.
客户端连接的 IO 操作都是由一个特定的 NIO 线程池负责. 每个客户端连接都与一个特定的 NIO 线程绑定, 因此在这个客户端连接中的所有 IO 操作都是在同一个线程中完成的.
客户端连接有很多, 但是 NIO 线程数是比较少的, 因此一个 NIO 线程可以同时绑定到多个客户端连接中.
图片. PNG
Netty 是 Reactor 模型与 NIO 的 Reactor 本质上区别不是很大. 那么和 nio 中的实现有哪些不同的. 下面我们分析:
reactor 一般是服务端用的最多, 这里我们以 EchoServer 分析
单线程模式:
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup)
- .channel(NioServerSocketChannel.class)
- ...
多线程模式
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- ...
上面两端代码, 区别其实就是单线程重载方法 group.
- @Override
- public ServerBootstrap group(EventLoopGroup group) {
- return group(group, group);
- }
接下来分析 reactor 的核心 NioEventLoopGroup, 来确定这是个什么玩意, 为什么它能充当一个线程组
类图如下:
图片. PNG
- public NioEventLoopGroup(int nThreads) {
- this(nThreads, (Executor) null);
- }
- // 调用下面
- public NioEventLoopGroup(int nThreads, Executor executor) {
- this(nThreads, executor, SelectorProvider.provider());
- }
- // 调用下面
- public NioEventLoopGroup(
- int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
- this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
- }
- /// 调用下面
- public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
- final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
- super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
- }
- // 继续调用父类 MultithreadEventLoopGroup
- protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
- super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
- }
- // 在次调用父类的父类 MultithreadEventExecutorGroup
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
- this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
- }
注意: 这里我们初始话 executor 为 null 那么后续我们猜测应该 netty 会为我们创建默认的 executor.SelectorProvider.provider() 这个方法前面介绍过, 会根据当前系统来选择核实的 io 多路复用 (select,poll,epoll).DefaultSelectStrategy 默认策略 .Execution 的拒绝策略 reject(线程池的拒绝策略)
最后在父类的父类 MultithreadEventExecutorGroup 构造器中
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
- EventExecutorChooserFactory chooserFactory, Object... args) {
- if (nThreads <= 0) {
- throw new IllegalArgumentException(String.format("nThreads: %d (expected:> 0)", nThreads));
- }
- if (executor == null) {
- executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- }
- // 创建一个大小为 nThreads 的 EventExecutor 数组
- children = new EventExecutor[nThreads];
- for (int i = 0; i <nThreads; i ++) {
- boolean success = false;
- try {
- //newChild 的实现类在 NioEventLoopGroup 中, 返回 NioEventLoop
- children[i] = newChild(executor, args);
- success = true;
- } catch (Exception e) {
- // TODO: Think about if this is a good exception type
- throw new IllegalStateException("failed to create a child event loop", e);
- } finally {
- if (!success) {
- for (int j = 0; j < i; j ++) {
- children[j].shutdownGracefully();
- }
- for (int j = 0; j < i; j ++) {
- EventExecutor e = children[j];
- try {
- while (!e.isTerminated()) {
- e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
- }
- } catch (InterruptedException interrupted) {
- // Let the caller handle the interruption.
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
- }
- // 从 DefaultEventExecutorChooserFactory 工厂实现类中的 newChooser 方法: 根据线程数在 children 数组中选出一个合适的 EventExecutor 实例
- chooser = chooserFactory.newChooser(children);
- final FutureListener<Object> terminationListener = new FutureListener<Object>() {
- @Override
- public void operationComplete(Future<Object> future) throws Exception {
- if (terminatedChildren.incrementAndGet() == children.length) {
- terminationFuture.setSuccess(null);
- }
- }
- };
- for (EventExecutor e: children) {
- e.terminationFuture().addListener(terminationListener);
- }
- Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
- Collections.addAll(childrenSet, children);
- readonlyChildren = Collections.unmodifiableSet(childrenSet);
- }
已知 children 是一个 EventExecutor 数组, 而 ThreadPerTaskExecutor 是 Executor, 最后使用 newChild 方法将 ThreadPerTaskExecutor 封装成 EventLoop 放到数组中
- public final class ThreadPerTaskExecutor implements Executor {
- private final ThreadFactory threadFactory;
- public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
- if (threadFactory == null) {
- throw new NullPointerException("threadFactory");
- }
- this.threadFactory = threadFactory;
- }
- @Override
- public void execute(Runnable command) {
- threadFactory.newThread(command).start();
- }
- }
newChild 方法将 ThreadPerTaskExecutor 封装成 EventLoop 放到数组中
- @Override
- protected EventLoop newChild(Executor executor, Object... args) throws Exception {
- return new NioEventLoop(this, executor, (SelectorProvider) args[0],
- ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
- }
综上所述, 我们可以先猜测这个 EventLoop 的作用, 可能是客户端一旦和服务端 accept 后会将 task 丢到从 EventExecutor 数组取出一个 EventLoop 来执行, 那么会是这样吗? 我们来继续
简要分析下 NioEventLoop:
图片. PNG
NioEventLoop 的继承很多, 这里我们只需了解他的父类 SingleThreadEventExecutor 构造器中, 通过 threadFactory.newThread 创建了一个新的 Java 线程. 在这个线程中所做的事情主要就是调用 SingleThreadEventExecutor.this.run() 方法, 而因为 NioEventLoop 实现了这个方法, 因此根据多态性, 其实调用的是 NioEventLoop.run() 方法.
接下来我们追踪这个 NioEventLoop 是在哪里器作用的, 需要注意的是我们使用了两个 NioEventLoopGroup, 一个是 bossGroup 一个是 workerGroup
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
- super.group(parentGroup);
- if (childGroup == null) {
- throw new NullPointerException("childGroup");
- }
- if (this.childGroup != null) {
- throw new IllegalStateException("childGroup set already");
- }
- this.childGroup = childGroup;
- return this;
- }
作为服务端我们肯定是要从启动的 bind 入手分析:
根据之前服务端的分析, 我们一路找到 ServerBootstrap 父类 AbstractBootstrap 中 doBind0 这个方法
- private static void doBind0(
- final ChannelFuture regFuture, final Channel channel,
- final SocketAddress localAddress, final ChannelPromise promise) {
- // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
- // the pipeline in its channelRegistered() implementation.
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- if (regFuture.isSuccess()) {
- channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } else {
- promise.setFailure(regFuture.cause());
- }
- }
- });
- }
根据前面的分析 channel.eventLoop() 取得为 bossgroup, 也就是应该 accept 的线程, 正好 channel.bind 也同时印证了我们的猜想. 那么接下来 workgroup 从哪里来呢
想一下处理 io 阻塞事件在 netty 中一般是一何种形式处理的呢, 对了就是 handler, 一般在 ServerBootstrapAcceptor 这 handler 和客户端连接后就会交个后面的 handler 处理, 在哪里处理就是在 childgroup 线程组中处理
回想一下, 在分析 server 端的是我们有介绍过 ServerBootstrap 实现的 init 初始化 handler, 这里出现过 childGroup, 正是我们苦苦寻找的 workgroup
- // 这里初始化的为 nioserverchannel
- @Override
- void init(Channel channel) throws Exception {
- final Map<ChannelOption<?>, Object> options = options0();
- synchronized (options) {
- setChannelOptions(channel, options, logger);
- }
- final Map<AttributeKey<?>, Object> attrs = attrs0();
- synchronized (attrs) {
- for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
- @SuppressWarnings("unchecked")
- AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
- channel.attr(key).set(e.getValue());
- }
- }
- ChannelPipeline p = channel.pipeline();
- final EventLoopGroup currentChildGroup = childGroup;
- final ChannelHandler currentChildHandler = childHandler;
- final Entry<ChannelOption<?>, Object>[] currentChildOptions;
- final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
- synchronized (childOptions) {
- currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
- }
- synchronized (childAttrs) {
- currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
- }
- p.addLast(new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(final Channel ch) throws Exception {
- final ChannelPipeline pipeline = ch.pipeline();
- // 这里从 config 获取的 handler 为 parent handler
- ChannelHandler handler = config.handler();
- if (handler != null) {
- pipeline.addLast(handler);
- }
- //currentChildGroup,currentChildHandler 客户端的连接的 IO 交互
- ch.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- pipeline.addLast(new ServerBootstrapAcceptor(
- ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
- }
- });
- }
- });
- }
简要分析下, 像 pipeline 中添加 ChannelInitializer, 前面分析 pipeline 已经知道之后再 register 掉 initChannel 方法. 添加的 ServerBootstrapAcceptor 这个 handler
在它抄写了 channelread 事件, 然后交给 childgroup 线程处理自定义 handler
ServerBootstrapAcceptor 中 channelRead 方法
- //inbound 事件到来时, 这里就是客户端和
- @Override
- @SuppressWarnings("unchecked")
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final Channel child = (Channel) msg;
- child.pipeline().addLast(childHandler);
- setChannelOptions(child, childOptions, logger);
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
- try {
- // 将操作 io 的 handler 绑定到 childGroup, 执行完成后断开 childchannel
- childGroup.register(child).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
基本上就暂时分析 enveloop 作为 netty 的 reactor 模式的核心. bossgroup,workgroup 等作用
来源: http://www.jianshu.com/p/39b9194202b0