一, 为何要使用 Netty 作为高性能的通信库? http://www.iocoder.cn/RocketMQ/huzhongtang/rpc-2/
二, RocketMQ 中 RPC 通信的 Netty 多线程模型 http://www.iocoder.cn/RocketMQ/huzhongtang/rpc-2/
2.1,Netty 的 Reactor 多线程模型设计概念与简述 http://www.iocoder.cn/RocketMQ/huzhongtang/rpc-2/
2.2,RocketMQ 中 RPC 通信的 1+N+M1+M2 的 Reactor 多线程设计与实现 http://www.iocoder.cn/RocketMQ/huzhongtang/rpc-2/
三, 总结 http://www.iocoder.cn/RocketMQ/huzhongtang/rpc-2/
666. 彩蛋 http://www.iocoder.cn/RocketMQ/huzhongtang/rpc-2/
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复. 甚至不知道如何读源码也可以请教噢.
新的源码解析文章实时收到通知. 每周更新一篇左右.
认真的源码交流微信群.
文章摘要: 如何设计 RPC 通信层模型是任何一款性能强劲的 MQ 所要重点考虑的问题
在 (一) 篇中主要介绍了 RocketMQ 的协议格式, 消息编解码, 通信方式(同步 / 异步 / 单向), 消息发送 / 接收以及异步回调的主要通信流程. 而本篇将主要对 RocketMQ 消息队列 RPC 通信部分的 Netty 多线程模型进行重点介绍.
一, 为何要使用 Netty 作为高性能的通信库?
在看 RocketMQ 的 RPC 通信部分时候, 可能有不少同学有这样子的疑问, RocketMQ 为何要选择 Netty 而不直接使用 JDK 的 NIO 进行网络编程呢? 这里有必要先来简要介绍下 Netty. Netty 是一个封装了 JDK 的 NIO 库的高性能网络通信开源框架. 它提供异步的, 事件驱动的网络应用程序框架和工具, 用以快速开发高性能, 高可靠性的网络服务器和客户端程序. 下面主要列举了下一般系统的 RPC 通信模块会选择 Netty 作为底层通信库的理由(作者认为 RocketMQ 的 RPC 同样也是基于此选择了 Netty):
(1)Netty 的编程 API 使用简单, 开发门槛低, 无需编程者去关注和了解太多的 NIO 编程模型和概念;
(2)对于编程者来说, 可根据业务的要求进行定制化地开发, 通过 Netty 的 ChannelHandler 对通信框架进行灵活的定制化扩展;
(3)Netty 框架本身支持拆包 / 解包, 异常检测等机制, 让编程者可以从 JAVA NIO 的繁琐细节中解脱, 而只需要关注业务处理逻辑;
(4)Netty 解决了(准确地说应该是采用了另一种方式完美规避了)JDK NIO 的 Bug(Epoll bug, 会导致 Selector 空轮询, 最终导致 CPU 100%);
(5)Netty 框架内部对线程, selector 做了一些细节的优化, 精心设计的 reactor 多线程模型, 可以实现非常高效地并发处理;
(6)Netty 已经在多个开源项目 (Hadoop 的 RPC 框架 avro 使用 Netty 作为通信框架) 中都得到了充分验证, 健壮性 / 可靠性比较好.
二, RocketMQ 中 RPC 通信的 Netty 多线程模型
RocketMQ 的 RPC 通信部分采用了 **"1+N+M1+M2"** 的 Reactor 多线程模式, 对网络通信部分进行了一定的扩展与优化, 这一节主要让我们来看下这一部分的具体设计与实现内容.
2.1,Netty 的 Reactor 多线程模型设计概念与简述
这里有必要先来简要介绍下 Netty 的 Reactor 多线程模型. Reactor 多线程模型的设计思想是分而治之 + 事件驱动.
(1)分而治之 一般来说, 一个网络请求连接的完整处理过程可以分为接受 (accept), 数据读取(read), 解码 / 编码(decode/encode), 业务处理(process), 发送响应(send) 这几步骤. Reactor 模型将每个步骤都映射成为一个任务, 服务端线程执行的最小逻辑单元不再是一次完整的网络请求, 而是这个任务, 且采用以非阻塞方式执行.
(2)事件驱动 每个任务对应特定网络事件. 当任务准备就绪时, Reactor 收到对应的网络事件通知, 并将任务分发给绑定了对应网络事件的 Handler 执行.
2.2,RocketMQ 中 RPC 通信的 1+N+M1+M2 的 Reactor 多线程设计与实现
(1)RocketMQ 中 RPC 通信的 Reactor 多线程设计与流程 RocketMQ 的 RPC 通信采用 Netty 组件作为底层通信库, 同样也遵循了 Reactor 多线程模型, 同时又在这之上做了一些扩展和优化. 下面先给出一张 RocketMQ 的 RPC 通信层的 Netty 多线程模型框架图, 让大家对 RocketMQ 的 RPC 通信中的多线程分离设计有一个大致的了解.
从上面的框图中可以大致了解 RocketMQ 中 NettyRemotingServer 的 Reactor 多线程模型. 一个 Reactor 主线程 (eventLoopGroupBoss, 即为上面的 1) 负责监听 TCP 网络连接请求, 建立好连接后丢给 Reactor 线程池 (eventLoopGroupSelector, 即为上面的 "N", 源码中默认设置为 3), 它负责将建立好连接的 socket 注册到 selector 上去(RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll, 也可以通过参数配置), 然后监听真正的网络数据. 拿到网络数据后, 再丢给 Worker 线程池(defaultEventExecutorGroup, 即为上面的 "M1", 源码中默认设置为 8). 为了更为高效的处理 RPC 的网络请求, 这里的 Worker 线程池是专门用于处理 Netty 网络通信相关的(包括编码 / 解码, 空闲链接管理, 网络连接管理以及网络请求处理). 而处理业务操作放在业务线程池中执行(这个内容在 "RocketMQ 的 RPC 通信(一) 篇" 中也有提到), 根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor, 然后封装成 task 任务后, 提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor, 以发送消息为例, 即为上面的 "M2"). 下面以表格的方式列举了下上面所述的 "1+N+M1+M2"Reactor 多线程模型
线程数 | 线程名 | 线程具体说明 |
---|---|---|
1 | NettyBoss_%d | |
N | NettyServerEPOLLSelector_%d_%d | |
M1 | NettyServerCodecThread_%d | |
M2 | RemotingExecutorThread_%d |
(2)RocketMQ 中 RPC 通信的 Reactor 多线程的代码具体实现 说完了 Reactor 多线程整体的设计与流程, 大家应该就对 RocketMQ 的 RPC 通信的 Netty 部分有了一个比较全面的理解了, 那接下来就从源码上来看下一些细节部分(在看该部分代码时候需要读者对 JAVA NIO 和 Netty 的相关概念与技术点有所了解). 在 NettyRemotingServer 的实例初始化时, 会初始化各个相关的变量包括 serverBootstrap,nettyServerConfig 参数, channelEventListener 监听器并同时初始化 eventLoopGroupBoss 和 eventLoopGroupSelector 两个 Netty 的 EventLoopGroup 线程池(这里需要注意的是, 如果是 Linux 平台, 并且开启了 native epoll, 就用 EpollEventLoopGroup, 这个也就是用 JNI, 调的 c 写的 epoll; 否则, 就用 Java NIO 的 NioEventLoopGroup.), 具体代码如下:
- public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
- final ChannelEventListener channelEventListener) {
- super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
- this.serverBootstrap = new ServerBootstrap();
- this.nettyServerConfig = nettyServerConfig;
- this.channelEventListener = channelEventListener;
- // 省略部分代码
- // 初始化时候 nThreads 设置为 1, 说明 RemotingServer 端的 Disptacher 链接管理和分发请求的线程为 1, 用于接收客户端的 TCP 连接
- this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
- }
- });
- /**
- * 根据配置设置 NIO 还是 Epoll 来作为 Selector 线程池
- * 如果是 Linux 平台, 并且开启了 native epoll, 就用 EpollEventLoopGroup, 这个也就是用 JNI, 调的 c 写的 epoll; 否则, 就用 Java NIO 的 NioEventLoopGroup.
- *
- */
- if (useEpoll()) {
- this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- private int threadTotal = nettyServerConfig.getServerSelectorThreads();
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
- }
- });
- } else {
- this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- private int threadTotal = nettyServerConfig.getServerSelectorThreads();
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
- }
- });
- }
- // 省略部分代码
在 NettyRemotingServer 实例初始化完成后, 就会将其启动. Server 端在启动阶段会将之前实例化好的 1 个 acceptor 线程 (eventLoopGroupBoss),N 个 IO 线程(eventLoopGroupSelector),M1 个 worker 线程(defaultEventExecutorGroup) 绑定上去. 前面部分也已经介绍过各个线程池的作用了. 这里需要说明的是, Worker 线程拿到网络数据后, 就交给 Netty 的 ChannelPipeline(其采用责任链设计模式), 从 Head 到 Tail 的一个个 Handler 执行下去, 这些 Handler 是在创建 NettyRemotingServer 实例时候指定的. NettyEncoder 和 NettyDecoder 负责网络传输数据和 RemotingCommand 之间的编解码. NettyServerHandler 拿到解码得到的 RemotingCommand 后, 根据 RemotingCommand.type 来判断是 request 还是 response 来进行相应处理, 根据业务请求码封装成不同的 task 任务后, 提交给对应的业务 processor 处理线程池处理.
- @Override
- public void start() {
- // 默认的处理线程池组, 使用默认的处理线程池组用于处理后面的多个 Netty Handler 的逻辑操作
- this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
- nettyServerConfig.getServerWorkerThreads(),
- new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
- }
- });
- /**
- * 首先来看下 RocketMQ NettyServer 的 Reactor 线程模型,
- * 一个 Reactor 主线程负责监听 TCP 连接请求;
- * 建立好连接后丢给 Reactor 线程池, 它负责将建立好连接的 socket 注册到 selector
- * 上去(这里有两种方式, NIO 和 Epoll, 可配置), 然后监听真正的网络数据;
- * 拿到网络数据后, 再丢给 Worker 线程池;
- *
- */
- //RocketMQ-> Java NIO 的 1+N+M 模型: 1 个 acceptor 线程, N 个 IO 线程, M1 个 worker 线程.
- ServerBootstrap childHandler =
- this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
- .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- // 服务端处理客户端连接请求是顺序处理的, 所以同一时间只能处理一个客户端连接, 多个客户端来的时候, 服务端将不能处理的客户端连接请求放在队列中等待处理, backlog 参数指定了队列的大小
- .option(ChannelOption.SO_REUSEADDR, true)// 这个参数表示允许重复使用本地地址和端口
- .option(ChannelOption.SO_KEEPALIVE, false)// 当设置该选项以后, 如果在两小时内没有数据的通信时, TCP 会自动发送一个活动探测数据报文.
- .childOption(ChannelOption.TCP_NODELAY, true)// 该参数的作用就是禁止使用 Nagle 算法, 使用于小数据即时传输
- .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())// 这两个参数用于操作接收缓冲区和发送缓冲区
- .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
- .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
- new HandshakeHandler(TlsSystemConfig.tlsMode))
- .addLast(defaultEventExecutorGroup,
- new NettyEncoder(),//rocketmq 解码器, 他们分别覆盖了父类的 encode 和 decode 方法
- new NettyDecoder(),//rocketmq 编码器
- new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty 自带的心跳管理器
- new NettyConnectManageHandler(),// 连接管理器, 他负责捕获新连接, 连接断开, 异常等事件, 然后统一调度到 NettyEventExecuter 处理器处理.
- new NettyServerHandler()// 当一个消息经过前面的解码等步骤后, 然后调度到 channelRead0 方法, 然后根据消息类型进行分发
- );
- }
- });
- if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
- childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- }
- try {
- ChannelFuture sync = this.serverBootstrap.bind().sync();
- InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
- this.port = addr.getPort();
- } catch (InterruptedException e1) {
- throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
- }
- if (this.channelEventListener != null) {
- this.nettyEventExecutor.start();
- }
- // 定时扫描 responseTable, 获取返回结果, 并且处理超时
- this.timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- NettyRemotingServer.this.scanResponseTable();
- } catch (Throwable e) {
- log.error("scanResponseTable exception", e);
- }
- }
- }, 1000 * 3, 1000);
- }
从上面的描述中可以概括得出 RocketMQ 的 RPC 通信部分的 Reactor 线程池模型框图.
整体可以看出 RocketMQ 的 RPC 通信借助 Netty 的多线程模型, 其服务端监听线程和 IO 线程分离, 同时将 RPC 通信层的业务逻辑与处理具体业务的线程进一步相分离. 时间可控的简单业务都直接放在 RPC 通信部分来完成, 复杂和时间不可控的业务提交至后端业务线程池中处理, 这样提高了通信效率和 MQ 整体的性能.(ps: 其中抽象出 NioEventLoop 来表示一个不断循环执行处理任务的线程, 每个 NioEventLoop 有一个 selector, 用于监听绑定在其上的 socket 链路.)
三, 总结
仔细阅读 RocketMQ 的过程中收获了很多关于网络通信设计技术和知识点. 对于刚接触开源版的 RocketMQ 的童鞋来说, 想要自己掌握 RPC 通信部分的各个技术知识点, 还需要不断地使用本地环境进行 debug 调试和阅读源码反复思考. 限于笔者的才疏学浅, 对本文内容可能还有理解不到位的地方, 如有阐述不合理之处还望留言一起探讨. 后续还会陆续发布 RocketMQ 其他模块 (Client,Broker 和 NameServer 等) 的相关技术文章, 敬请关注. 在此顺便为自己打个 Call, 有兴趣的朋友可以关注下我的个人公众号:"匠心独运的博客", 对于 Java 并发, Spring, 数据库和消息队列的一些细节, 问题的文章将会在这个公众号上发布, 欢迎交流与讨论.
666. 彩蛋
如果你对 Dubbo 感兴趣, 欢迎加入我的知识星球一起交流.
来源: https://juejin.im/entry/5c785a09e51d457fd23d5005