无论是读取副本还是写入副本, 都是通过底层的 Partition 对象完成的, 而这些分区对象全部保存在上节课所学的 allPartitions 字段中. 可以说, 理解这些字段的用途, 是后续我们探索副本管理器类功能的重要前提.
现在, 我们就来学习下副本读写功能. 整个 Kafka 的同步机制, 本质上就是副本读取 + 副本写入, 搞懂了这两个功能, 你就知道了 Follower 副本是如何同步 Leader 副本数据的.
appendRecords - 副本写入
向副本底层日志写入消息的逻辑就实现在 ReplicaManager#appendRecords.
Kafka 需副本写入的场景:
生产者向 Leader 副本写入消息
Follower 副本拉取消息后写入副本
仅该场景调用 Partition 对象的方法, 其余 3 个都是调用 appendRecords 完成
消费者组写入组信息
事务管理器写入事务信息 (包括事务标记, 事务元数据等)
appendRecords 方法将给定的一组分区的消息写入对应 Leader 副本, 并根据 PRODUCE 请求中 acks 的设置, 有选择地等待其他副本写入完成. 然后, 调用指定回调逻辑.
appendRecords 向副本日志写入消息的过程:
执行流程
可见, appendRecords:
实现消息写入的方法是 appendToLocalLog
判断是否需要等待其他副本写入的方法 delayedProduceRequestRequired
appendToLocalLog 写入副本本地日志
利用 Partition#appendRecordsToLeader 写入消息集合, 就是利用 appendAsLeader 方法写入本地日志的.
delayedProduceRequestRequired
判断消息集合被写入到日志之后, 是否需要等待其它副本也写入成功:
- private def delayedProduceRequestRequired(
- requiredAcks: Short,
- entriesPerPartition: Map[TopicPartition, MemoryRecords],
- localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
- requiredAcks == -1 && entriesPerPartition.nonEmpty &&
- localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
- }
若等待其他副本的写入, 须同时满足:
requiredAcks==-1
依然有数据尚未写完
至少有一个分区的消息, 已成功被写入本地日志
2 和 3 可结合来看. 若所有分区的数据写入都不成功, 则可能出现严重错误, 此时应不再等待, 而是直接返回错误给发送方.
而有部分分区成功写入, 部分分区写入失败, 则可能偶发的瞬时错误导致. 此时, 不妨将本次写入请求放入 Purgatory, 给个重试机会.
副本读取: fetchMessages
ReplicaManager#fetchMessages 负责读取副本数据. 无论:
Java 消费者 API
Follower 副本
拉取消息的主途径都是向 Broker 发 FETCH 请求, Broker 端接收到该请求后, 调用 fetchMessages 从底层的 Leader 副本取出消息.
fetchMessages 也可能会延时处理 FETCH 请求, 因 Broker 端必须要累积足够多数据后, 才会返回 Response 给请求发送方.
整个方法分为:
读取本地日志
首先判断, 读取消息的请求方, 就能确定可读取的范围了.
fetchIsolation, 读取隔离级别:
对 Follower 副本, 它能读取到 Leader 副本 LEO 值以下的所有消息
普通 Consumer, 只能 "看到"Leader 副本高水位值以下的消息
确定可读取范围后, 调用 readFromLog 读取本地日志上的消息数据, 并将结果赋给 logReadResults 变量. readFromLog 调用 readFromLocalLog, 在待读取分区上依次调用其日志对象的 read 方法执行实际的消息读取.
根据读取结果确定 Response
根据上一步读取结果创建对应 Response:
根据上一步得到的读取结果, 统计可读取的总字节数, 然后判断此时是否能够立即返回 Reponse.
副本管理器读写副本的两个方法 appendRecords 和 fetchMessages 本质上在底层分别调用 Log 的 append 和 read 方法, 以实现本地日志的读写操作. 完成读写操作后, 这两个方法还定义了延时处理的条件. 一旦满足延时处理条件, 就交给对应 Purgatory 处理.
从这俩方法可见单个组件融合一起的趋势. 虽然我们学习单个源码文件的顺序是自上而下, 但串联 Kafka 主要组件功能的路径却是自下而上.
如副本写入操作, 日志对象 append 方法被上一层的 Partition 对象中的方法调用, 而后者又进一步被副本管理器中的方法调用. 我们按自上而下阅读了副本管理器, 日志对象等单个组件的代码, 了解了各自的独立功能.
现在开始慢慢地把它们融合一起, 构建 Kafka 操作分区副本日志对象的完整调用路径. 同时采用这两种方式来阅读源码, 就能更高效弄懂 Kafka 原理.
总结
Kafka 副本状态机类 ReplicaManager 读写副本的核心方法:
appendRecords: 向副本写入消息, 利用 Log#append 方法和 Purgatory 机制实现 Follower 副本向 Leader 副本获取消息后的数据同步操作
fetchMessages: 从副本读取消息, 为普通 Consumer 和 Follower 副本所使用. 当它们向 Broker 发送 FETCH 请求时, Broker 上的副本管理器调用该方法从本地日志中获取指定消息
来源: https://blog.csdn.net/qq_33589510/article/details/122401683