文章摘要: 很多人喜欢把 RocketMQ 与 Kafka 做对比, 其实这两款消息队列的网络通信层还是比较相似的, 本文就为大家简要地介绍下 Kafka 的 NIO 网络通信模型
前面写的两篇 RocketMQ 源码研究笔记系列:
(1)消息中间件 - RocketMQ 的 RPC 通信(一)
(2)消息中间件 - RocketMQ 的 RPC 通信(二)
基本上已经较为详细地将 RocketMQ 这款分布式消息队列的 RPC 通信部分的协议格式, 消息编解码, 通信方式(同步 / 异步 / 单向), 消息收发流程和 Netty 的 Reactor 多线程分离处理架构讲了一遍. 同时, 联想业界大名鼎鼎的另一款开源分布式消息队列 - Kafka, 具备高吞吐量和高并发的特性, 其网络通信层是如何做到消息的高效传输的呢? 为了解开自己心中的疑虑, 就查阅了 Kafka 的 Network 通信模块的源码, 乘机会写本篇文章.
本文主要通过对 Kafka 源码的分析来简述其 Reactor 的多线程网络通信模型和总体框架结构, 同时简要介绍 Kafka 网络通信层的设计与具体实现.
一, Kafka 网络通信模型的整体框架概述
Kafka 的网络通信模型是基于 NIO 的 Reactor 多线程模型来设计的. 这里先引用 Kafka 源码中注释的一段话:
An NIO socket server. The threading model is
1 Acceptor thread that handles new connections.
Acceptor has N Processor threads that each have their own selector and read requests from sockets.
M Handler threads that handle requests and produce responses back to the processor threads for writing.
相信大家看了上面的这段引文注释后, 大致可以了解到 Kafka 的网络通信层模型, 主要采用了 1(1 个 Acceptor 线程)+N(N 个 Processor 线程)+M(M 个业务处理线程). 下面的表格简要的列举了下(这里先简单的看下后面还会详细说明):
线程数 | 线程名 | 线程具体说明 |
---|---|---|
1 | kafka-socket-acceptor_%x | Acceptor 线程,负责监听 Client 端发起的请求 |
N | kafka-network-thread_%d | Processor 线程,负责对 Socket 进行读写 |
M | kafka-request-handler-_%d | Worker 线程,处理具体的业务逻辑并生成 Response 返回 |
Kafka 网络通信层的完整框架图如下图所示:
Kafka 消息队列的通信层模型 - 1+N+M 模型. png
刚开始看到上面的这个框架图可能会有一些不太理解, 并不要紧, 这里可以先对 Kafka 的网络通信层框架结构有一个大致了解. 本文后面会结合 Kafka 的部分重要源码来详细阐述上面的过程. 这里可以简单总结一下其网络通信模型中的几个重要概念:
(1),Acceptor:1 个接收线程, 负责监听新的连接请求, 同时注册 OP_ACCEPT 事件, 将新的连接按照 "round robin" 方式交给对应的 Processor 线程处理;
(2),Processor:N 个处理器线程, 其中每个 Processor 都有自己的 selector, 它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件, N 的大小由 "num.networker.threads" 决定;
(3),KafkaRequestHandler:M 个请求处理线程, 包含在线程池 - KafkaRequestHandlerPool 内部, 从 RequestChannel 的全局请求队列 - requestQueue 中获取请求数据并交给 KafkaApis 处理, M 的大小由 "num.io.threads" 决定;
(4),RequestChannel: 其为 Kafka 服务端的请求通道, 该数据结构中包含了一个全局的请求队列 requestQueue 和多个与 Processor 处理器相对应的响应队列 responseQueue, 提供给 Processor 与请求处理线程 KafkaRequestHandler 和 KafkaApis 交换数据的地方.
(5),NetworkClient: 其底层是对 Java NIO 进行相应的封装, 位于 Kafka 的网络接口层. Kafka 消息生产者对象 - KafkaProducer 的 send 方法主要调用 NetworkClient 完成消息发送;
(6),SocketServer: 其是一个 NIO 的服务, 它同时启动一个 Acceptor 接收线程和多个 Processor 处理器线程. 提供了一种典型的 Reactor 多线程模式, 将接收客户端请求和处理请求相分离;
(7),KafkaServer: 代表了一个 Kafka Broker 的实例; 其 startup 方法为实例启动的入口;
(8),KafkaApis:Kafka 的业务逻辑处理 Api, 负责处理不同类型的请求; 比如 "发送消息","获取消息偏移量 - offset" 和 "处理心跳请求" 等;
二, Kafka 网络通信层的设计与具体实现
这一节将结合 Kafka 网络通信层的源码来分析其设计与实现, 这里主要详细介绍网络通信层的几个重要元素 - SocketServer,Acceptor,Processor,RequestChannel 和 KafkaRequestHandler. 本文分析的源码部分均基于 Kafka 的 0.11.0 版本.
1,SocketServer
SocketServer 是接收客户端 Socket 请求连接, 处理请求并返回处理结果的核心类, Acceptor 及 Processor 的初始化, 处理逻辑都是在这里实现的. 在 KafkaServer 实例启动时会调用其 startup 的初始化方法, 会初始化 1 个 Acceptor 和 N 个 Processor 线程(每个 EndPoint 都会初始化, 一般来说一个 Server 只会设置一个端口), 其实现如下:
- def startup() {
- this.synchronized {
- connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
- val sendBufferSize = config.socketSendBufferBytes
- val recvBufferSize = config.socketReceiveBufferBytes
- val brokerId = config.brokerId
- var processorBeginIndex = 0
- // 一个 broker 一般只设置一个端口
- config.listeners.foreach { endpoint =>
- val listenerName = endpoint.listenerName
- val securityProtocol = endpoint.securityProtocol
- val processorEndIndex = processorBeginIndex + numProcessorThreads
- //N 个 processor
- for (i <- processorBeginIndex until processorEndIndex)
- processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
- //1 个 Acceptor
- val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
- processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
- acceptors.put(endpoint, acceptor)
- KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
- acceptor.awaitStartup()
- processorBeginIndex = processorEndIndex
- }
- }
- 2,Acceptor
Acceptor 是一个继承自抽象类 AbstractServerThread 的线程类. Acceptor 的主要任务是监听并且接收客户端的请求, 同时建立数据传输通道 - SocketChannel, 然后以轮询的方式交给一个后端的 Processor 线程处理(具体的方式是添加 socketChannel 至并发队列并唤醒 Processor 线程处理).
在该线程类中主要可以关注以下两个重要的变量:
(1),nioSelector: 通过 NSelector.open()方法创建的变量, 封装了 JAVA NIO Selector 的相关操作;
(2),serverChannel: 用于监听端口的服务端 Socket 套接字对象;
下面来看下 Acceptor 主要的 run 方法的源码:
- def run() {
- // 首先注册 OP_ACCEPT 事件
- serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
- startupComplete()
- try {
- var currentProcessor = 0
- // 以轮询方式查询并等待关注的事件发生
- while (isRunning) {
- try {
- val ready = nioSelector.select(500)
- if (ready> 0) {
- val keys = nioSelector.selectedKeys()
- val iter = keys.iterator()
- while (iter.hasNext && isRunning) {
- try {
- val key = iter.next
- iter.remove()
- if (key.isAcceptable)
- // 如果事件发生则调用 accept 方法对 OP_ACCEPT 事件处理
- accept(key, processors(currentProcessor))
- else
- throw new IllegalStateException("Unrecognized key state for acceptor thread.")
- // 轮询算法
- // round robin to the next processor thread
- currentProcessor = (currentProcessor + 1) % processors.length
- } catch {
- case e: Throwable => error("Error while accepting connection", e)
- }
- }
- }
- }
- // 代码省略
- }
- def accept(key: SelectionKey, processor: Processor) {
- val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
- val socketChannel = serverSocketChannel.accept()
- try {
- connectionQuotas.inc(socketChannel.socket().getInetAddress)
- socketChannel.configureBlocking(false)
- socketChannel.socket().setTcpNoDelay(true)
- socketChannel.socket().setKeepAlive(true)
- if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
- socketChannel.socket().setSendBufferSize(sendBufferSize)
- processor.accept(socketChannel)
- } catch {
- // 省略部分代码
- }
- }
- def accept(socketChannel: SocketChannel) {
- newConnections.add(socketChannel)
- wakeup()
- }
在上面源码中可以看到, Acceptor 线程启动后, 首先会向用于监听端口的服务端套接字对象 - ServerSocketChannel 上注册 OP_ACCEPT 事件. 然后以轮询的方式等待所关注的事件发生. 如果该事件发生, 则调用 accept()方法对 OP_ACCEPT 事件进行处理. 这里, Processor 是通过 round robin 方法选择的, 这样可以保证后面多个 Processor 线程的负载基本均匀.
Acceptor 的 accept()方法的作用主要如下:
(1)通过 SelectionKey 取得与之对应的 serverSocketChannel 实例, 并调用它的 accept()方法与客户端建立连接;
(2)调用 connectionQuotas.inc()方法增加连接统计计数; 并同时设置第 (1) 步中创建返回的 socketChannel 属性(如 sendBufferSize,KeepAlive,TcpNoDelay,configureBlocking 等)
(3)将 socketChannel 交给 processor.accept()方法进行处理. 这里主要是将 socketChannel 加入 Processor 处理器的并发队列 newConnections 队列中, 然后唤醒 Processor 线程从队列中获取 socketChannel 并处理. 其中, newConnections 会被 Acceptor 线程和 Processor 线程并发访问操作, 所以 newConnections 是 ConcurrentLinkedQueue 队列(一个基于链接节点的无界线程安全队列)
3,Processor
Processor 同 Acceptor 一样, 也是一个线程类, 继承了抽象类 AbstractServerThread. 其主要是从客户端的请求中读取数据和将 KafkaRequestHandler 处理完响应结果返回给客户端. 在该线程类中主要关注以下几个重要的变量:
(1),newConnections: 在上面的 Acceptor 一节中已经提到过, 它是一种 ConcurrentLinkedQueue[SocketChannel]类型的队列, 用于保存新连接交由 Processor 处理的 socketChannel;
(2),inflightResponses: 是一个 Map[String, RequestChannel.Response]类型的集合, 用于记录尚未发送的响应;
(3),selector: 是一个类型为 KSelector 变量, 用于管理网络连接;
下面先给出 Processor 处理器线程 run 方法执行的流程图:
Kafk_Processor 线程的处理流程图. png
从上面的流程图中能够可以看出 Processor 处理器线程在其主流程中主要完成了这样子几步操作:
(1), 处理 newConnections 队列中的 socketChannel. 遍历取出队列中的每个 socketChannel 并将其在 selector 上注册 OP_READ 事件;
(2), 处理 RequestChannel 中与当前 Processor 对应响应队列中的 Response. 在这一步中会根据 responseAction 的类型 (NoOpAction/SendAction/CloseConnectionAction) 进行判断, 若为 "NoOpAction", 表示该连接对应的请求无需响应; 若为 "SendAction", 表示该 Response 需要发送给客户端, 则会通过 "selector.send" 注册 OP_WRITE 事件, 并且将该 Response 从 responseQueue 响应队列中移至 inflightResponses 集合中;"CloseConnectionAction", 表示该连接是要关闭的;
(3), 调用 selector.poll()方法进行处理. 该方法底层即为调用 nioSelector.select()方法进行处理.
(4), 处理已接受完成的数据包队列 - completedReceives. 在 processCompletedReceives 方法中调用 "requestChannel.sendRequest" 方法将请求 Request 添加至 requestChannel 的全局请求队列 - requestQueue 中, 等待 KafkaRequestHandler 来处理. 同时, 调用 "selector.mute" 方法取消与该请求对应的连接通道上的 OP_READ 事件;
(5), 处理已发送完的队列 - completedSends. 当已经完成将 response 发送给客户端, 则将其从 inflightResponses 移除, 同时通过调用 "selector.unmute" 方法为对应的连接通道重新注册 OP_READ 事件;
(6), 处理断开连接的队列. 将该 response 从 inflightResponses 集合中移除, 同时将 connectionQuotas 统计计数减 1;
4,RequestChannel
在 Kafka 的网络通信层中, RequestChannel 为 Processor 处理器线程与 KafkaRequestHandler 线程之间的数据交换提供了一个数据缓冲区, 是通信过程中 Request 和 Response 缓存的地方. 因此, 其作用就是在通信中起到了一个数据缓冲队列的作用. Processor 线程将读取到的请求添加至 RequestChannel 的全局请求队列 - requestQueue 中; KafkaRequestHandler 线程从请求队列中获取并处理, 处理完以后将 Response 添加至 RequestChannel 的响应队列 - responseQueue 中, 并通过 responseListeners 唤醒对应的 Processor 线程, 最后 Processor 线程从响应队列中取出后发送至客户端.
5,KafkaRequestHandler
KafkaRequestHandler 也是一种线程类, 在 KafkaServer 实例启动时候会实例化一个线程池 - KafkaRequestHandlerPool 对象(包含了若干个 KafkaRequestHandler 线程), 这些线程以守护线程的方式在后台运行. 在 KafkaRequestHandler 的 run 方法中会循环地从 RequestChannel 中阻塞式读取 request, 读取后再交由 KafkaApis 来具体处理.
6,KafkaApis
KafkaApis 是用于处理对通信网络传输过来的业务消息请求的中心转发组件. 该组件反映出 Kafka Broker Server 可以提供哪些服务.
三, 总结
仔细阅读 Kafka 的 NIO 网络通信层的源码过程中还是可以收获不少关于 NIO 网络通信模块的关键技术. Apache 的任何一款开源中间件都有其设计独到之处, 值得借鉴和学习. 对于任何一位使用 Kafka 这款分布式消息队列的同学来说, 如果能够在一定实践的基础上, 再通过阅读其源码能起到更为深入理解的效果, 对于大规模 Kafka 集群的性能调优和问题定位都大有裨益.
对于刚接触 Kafka 的同学来说, 想要自己掌握其 NIO 网络通信层模型的关键设计, 还需要不断地使用本地环境进行 debug 调试和阅读源码反复思考. 限于笔者的才疏学浅, 对本文内容可能还有理解不到位的地方, 如有阐述不合理之处还望留言一起探讨. 后续还会根据自己的实践和研发, 陆续发布关于 Kafka 分布式消息队列的其他相关技术文章, 敬请关注.
来源: http://www.jianshu.com/p/a6b9e5342878