MQ(消息队列)是跨进程通信的方式之一, 可理解为异步 rpc, 上游系统对调用结果的态度往往是重要不紧急. 使用消息队列有以下好处: 业务解耦, 流量削峰, 灵活扩展. 接下来介绍消息中间件 Kafka.
Kafka 是什么?
Kafka 是一个分布式的消息引擎. 具有以下特征
能够发布和订阅消息流(类似于消息队列)
以容错的, 持久的方式存储消息流
多分区概念, 提高了并行能力
Kafka 架构总览
Topic
消息的主题, 队列, 每一个消息都有它的 topic,Kafka 通过 topic 对消息进行归类. Kafka 中可以将 Topic 从物理上划分成一个或多个分区 (Partition), 每个分区在物理上对应一个文件夹, 以 "topicName_partitionIndex" 的命名方式命名, 该 dir 包含了这个分区的所有消息(.log) 和索引文件(.index), 这使得 Kafka 的吞吐率可以水平扩展.
Partition
每个分区都是一个 顺序的, 不可变的消息队列, 并且可以持续的添加; 分区中的消息都被分了一个序列号, 称之为偏移量(offset), 在每个分区中此偏移量都是唯一的.
producer 在发布消息的时候, 可以为每条消息指定 Key, 这样消息被发送到 broker 时, 会根据分区算法把消息存储到对应的分区中(一个分区存储多个消息), 如果分区规则设置的合理, 那么所有的消息将会被均匀的分布到不同的分区中, 这样就实现了负载均衡.
Broker
Kafka server, 用来存储消息, Kafka 集群中的每一个服务器都是一个 Broker, 消费者将从 broker 拉取订阅的消息
Producer
向 Kafka 发送消息, 生产者会根据 topic 分发消息. 生产者也负责把消息关联到 Topic 上的哪一个分区. 最简单的方式从分区列表中轮流选择. 也可以根据某种算法依照权重选择分区. 算法可由开发者定义.
Cousumer
Consermer 实例可以是独立的进程, 负责订阅和消费消息. 消费者用 consumerGroup 来标识自己. 同一个消费组可以并发地消费多个分区的消息, 同一个 partition 也可以由多个 consumerGroup 并发消费, 但是在 consumerGroup 中一个 partition 只能由一个 consumer 消费
CousumerGroup
Consumer Group: 同一个 Consumer Group 中的 Consumers,Kafka 将相应 Topic 中的每个消息只发送给其中一个 Consumer
Kafka producer 设计原理
发送消息的流程
1. 序列化消息 &&. 计算 partition
根据 key 和 value 的配置对消息进行序列化, 然后计算 partition:
ProducerRecord 对象中如果指定了 partition, 就使用这个 partition. 否则根据 key 和 topic 的 partition 数目取余, 如果 key 也没有的话就随机生成一个 counter, 使用这个 counter 来和 partition 数目取余. 这个 counter 每次使用的时候递增.
2 发送到 batch&& 唤醒 Sender 线程
根据 topic-partition 获取对应的 batchs(Dueue ), 然后将消息 append 到 batch 中. 如果有 batch 满了则唤醒 Sender 线程. 队列的操作是加锁执行, 所以 batch 内消息时有序的. 后续的 Sender 操作当前方法异步操作.
3.Sender 把消息有序发到 broker(tp replia leader)
3.1 确定 tp relica leader 所在的 broker
Kafka 中 每台 broker 都保存了 kafka 集群的 metadata 信息, metadata 信息里包括了每个 topic 的所有 partition 的信息: leader, leader_epoch, controller_epoch, isr, replicas 等; Kafka 客户端从任一 broker 都可以获取到需要的 metadata 信息; sender 线程通过 metadata 信息可以知道 tp leader 的 brokerId
producer 也保存了 metada 信息, 同时根据 metadata 更新策略 (定期更新 metadata.max.age.ms, 失效检测, 强制更新: 检查到 metadata 失效以后, 调用 metadata.requestUpdate() 强制更新
- public class PartitionInfo {
- private final String topic;
- private final int partition;
- private final Node leader;
- private final Node[] replicas;
- private final Node[] inSyncReplicas;
- private final Node[] offlineReplicas;
- }
3.2 幂等性发送
为实现 Producer 的幂等性, Kafka 引入了 Producer ID(即 PID)和 Sequence Number. 对于每个 PID, 该 Producer 发送消息的每个
如果消息序号比 Broker 维护的序号差值比一大, 说明中间有数据尚未写入, 即乱序, 此时 Broker 拒绝该消息, Producer 抛出 InvalidSequenceNumber
如果消息序号小于等于 Broker 维护的序号, 说明该消息已被保存, 即为重复消息, Broker 直接丢弃该消息, Producer 抛出 DuplicateSequenceNumber
Sender 发送失败后会重试, 这样可以保证每个消息都被发送到 broker
4. Sender 处理 broker 发来的 produce response
一旦 broker 处理完 Sender 的 produce 请求, 就会发送 produce response 给 Sender, 此时 producer 将执行我们为 send()设置的回调函数. 至此 producer 的 send 执行完毕.
吞吐性 && 延时:
buffer.memory:buffer 设置大了有助于提升吞吐性, 但是 batch 太大会增大延迟, 可搭配 linger_ms 参数使用
linger_ms: 如果 batch 太大, 或者 producer qps 不高, batch 添加的会很慢, 我们可以强制在 linger_ms 时间后发送 batch 数据
ack:producer 收到多少 broker 的答复才算真的发送成功
0 表示 producer 无需等待 leader 的确认(吞吐最高, 数据可靠性最差)
1 代表需要 leader 确认写入它的本地 log 并立即确认
-1/all 代表所有的 ISR 都完成后确认(吞吐最低, 数据可靠性最高)
Sender 线程和长连接
每初始化一个 producer 实例, 都会初始化一个 Sender 实例, 新增到 broker 的长连接.
代码角度: 每初始化一次 KafkaProducer, 都赋一个空的 client
- public KafkaProducer(final Map configs) {
- this(configs, null, null, null, null, null, Time.SYSTEM);
- }
终端查看 TCP 连接数:
lsof -p portNum -np | grep TCP, 适当增大 producer 数量能提升吞吐
Consumer 设计原理
poll 消息
消费者通过 fetch 线程拉消息(单线程)
消费者通过心跳线程来与 broker 发送心跳. 超时会认为挂掉
每个 consumer group 在 broker 上都有一个 coordnator 来管理, 消费者加入和退出, 以及消费消息的位移都由 coordnator 处理.
位移管理
consumer 的消息位移代表了当前 group 对 topic-partition 的消费进度, consumer 宕机重启后可以继续从该 offset 开始消费. 在 kafka0.8 之前, 位移信息存放在 zookeeper 上, 由于 zookeeper 不适合高并发的读写, 新版本 Kafka 把位移信息当成消息, 发往__consumers_offsets 这个 topic 所在的 broker,__consumers_offsets 默认有 50 个分区. 消息的 key 是 groupId+topic_partition,value 是 offset.
Kafka Group 状态
Empty: 初始状态, Group 没有任何成员, 如果所有的 offsets 都过期的话就会变成 Dead
PreparingRebalance:Group 正在准备进行 Rebalance
AwaitingSync:Group 正在等待来 group leader 的 分配方案
Stable: 稳定的状态(Group is stable);
Dead:Group 内已经没有成员, 并且它的 Metadata 已经被移除
注意
重平衡 reblance
当一些原因导致 consumer 对 partition 消费不再均匀时, kafka 会自动执行 reblance, 使得 consumer 对 partition 的消费再次平衡.
什么时候发生 rebalance?:
组订阅 topic 数变更
topic partition 数变更
consumer 成员变更
consumer 加入群组或者离开群组的时候
consumer 被检测为崩溃的时候
reblance 过程
举例 1 consumer 被检测为崩溃引起的 reblance
比如心跳线程在 timeout 时间内没和 broker 发送心跳, 此时 coordnator 认为该 group 应该进行 reblance. 接下来其他 consumer 发来 fetch 请求后, coordnator 将回复他们进行 reblance 通知. 当 consumer 成员收到请求后, 只有 leader 会根据分配策略进行分配, 然后把各自的分配结果返回给 coordnator. 这个时候只有 consumer leader 返回的是实质数据, 其他返回的都为空. 收到分配方法后, consumer 将会把分配策略同步给各 consumer
举例 2 consumer 加入引起的 reblance
使用 join 协议, 表示有 consumer 要加入到 group 中
使用 sync 协议, 根据分配规则进行分配
(上图图片摘自网络)
引申: 以上 reblance 机制存在的问题
在大型系统中, 一个 topic 可能对应数百个 consumer 实例. 这些 consumer 陆续加入到一个空消费组将导致多次的 rebalance; 此外 consumer 实例启动的时间不可控, 很有可能超出 coordinator 确定的 rebalance timeout(即 max.poll.interval.ms), 将会再次触发 rebalance, 而每次 rebalance 的代价又相当地大, 因为很多状态都需要在 rebalance 前被持久化, 而在 rebalance 后被重新初始化.
新版本改进
通过延迟进入 PreparingRebalance 状态减少 reblance 次数
新版本新增了 group.initial.rebalance.delay.ms 参数. 空消费组接受到成员加入请求时, 不立即转化到 PreparingRebalance 状态来开启 reblance. 当时间超过 group.initial.rebalance.delay.ms 后, 再把 group 状态改为 PreparingRebalance(开启 reblance). 实现机制是在 coordinator 底层新增一个 group 状态: InitialReblance. 假设此时有多个 consumer 陆续启动, 那么 group 状态先转化为 InitialReblance, 待 group.initial.rebalance.delay.ms 时间后, 再转换为 PreparingRebalance(开启 reblance)
Broker 设计原理
Broker 是 Kafka 集群中的节点. 负责处理生产者发送过来的消息, 消费者消费的请求. 以及集群节点的管理等. 由于涉及内容较多, 先简单介绍, 后续专门抽出一篇文章分享
broker zk 注册
broker 消息存储
Kafka 的消息以二进制的方式紧凑地存储, 节省了很大空间
此外消息存在 ByteBuffer 而不是堆, 这样 broker 进程挂掉时, 数据不会丢失, 同时避免了 gc 问题
通过零拷贝和顺序寻址, 让消息存储和读取速度都非常快
处理 fetch 请求的时候通过 zero-copy 加快速度
broker 状态数据
broker 设计中, 每台机器都保存了相同的状态数据. 主要包括以下:
controller 所在的 broker ID, 即保存了当前集群中 controller 是哪台 broker
集群中所有 broker 的信息: 比如每台 broker 的 ID, 机架信息以及配置的若干组连接信息
集群中所有节点的信息: 严格来说, 它和上一个有些重复, 不过此项是按照 broker ID 和监听器类型进行分组的. 对于超大集群来说, 使用这一项缓存可以快速地定位和查找给定节点信息, 而无需遍历上一项中的内容, 算是一个优化吧
集群中所有分区的信息: 所谓分区信息指的是分区的 leader,ISR 和 AR 信息以及当前处于 offline 状态的副本集合. 这部分数据按照 topic-partitionID 进行分组, 可以快速地查找到每个分区的当前状态.(注: AR 表示 assigned replicas, 即创建 topic 时为该分区分配的副本集合)
broker 负载均衡
分区数量负载: 各台 broker 的 partition 数量应该均匀
partition Replica 分配算法如下:
将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序
将第 i 个 Partition 分配到第 (i mod n) 个 Broker 上
将第 i 个 Partition 的第 j 个 Replica 分配到第 ((i + j) mod n) 个 Broker 上
容量大小负载: 每台 broker 的硬盘占用大小应该均匀
在 kafka1.1 之前, Kafka 能够保证各台 broker 上 partition 数量均匀, 但由于每个 partition 内的消息数不同, 可能存在不同硬盘之间内存占用差异大的情况. 在 Kafka1.1 中增加了副本跨路径迁移功能 kafka-reassign-partitions.sh, 我们可以结合它和监控系统, 实现自动化的负载均衡
Kafka 高可用
在介绍 kafka 高可用之前先介绍几个概念
同步复制: 要求所有能工作的 Follower 都复制完, 这条消息才会被认为 commit, 这种复制方式极大的影响了吞吐率
异步复制: Follower 异步的从 Leader pull 数据, data 只要被 Leader 写入 log 认为已经 commit, 这种情况下如果 Follower 落后于 Leader 的比较多, 如果 Leader 突然宕机, 会丢失数据
Isr
Kafka 结合同步复制和异步复制, 使用 ISR(与 Partition Leader 保持同步的 Replica 列表)的方式在确保数据不丢失和吞吐率之间做了平衡. Producer 只需把消息发送到 Partition Leader,Leader 将消息写入本地 Log.Follower 则从 Leader pull 数据. Follower 在收到该消息向 Leader 发送 ACK. 一旦 Leader 收到了 ISR 中所有 Replica 的 ACK, 该消息就被认为已经 commit 了, Leader 将增加 HW 并且向 Producer 发送 ACK. 这样如果 leader 挂了, 只要 Isr 中有一个 replica 存活, 就不会丢数据.
Isr 动态更新
Leader 会跟踪 ISR, 如果 ISR 中一个 Follower 宕机, 或者落后太多, Leader 将把它从 ISR 中移除. 这里所描述的 "落后太多" 指 Follower 复制的消息落后于 Leader 后的条数超过预定值 (replica.lag.max.messages) 或者 Follower 超过一定时间 (replica.lag.time.max.ms) 未向 Leader 发送 fetch 请求.
broker Nodes In Zookeeper
/brokers/topics/[topic]/partitions/[partition]/state 保存了 topic-partition 的 leader 和 Isr 等信息
Controller 负责 broker 故障检查 && 故障转移(fail/recover)
Controller 在 Zookeeper 上注册 Watch, 一旦有 Broker 宕机, 其在 Zookeeper 对应的 znode 会自动被删除, Zookeeper 会触发 Controller 注册的 watch,Controller 读取最新的 Broker 信息
Controller 确定 set_p, 该集合包含了宕机的所有 Broker 上的所有 Partition
对 set_p 中的每一个 Partition, 选举出新的 leader,Isr, 并更新结果.
3.1 从 / brokers/topics/[topic]/partitions/[partition]/state 读取该 Partition 当前的 ISR
3.2 决定该 Partition 的新 Leader 和 Isr. 如果当前 ISR 中有至少一个 Replica 还幸存, 则选择其中一个作为新 Leader, 新的 ISR 则包含当前 ISR 中所有幸存的 Replica. 否则选择该 Partition 中任意一个幸存的 Replica 作为新的 Leader 以及 ISR(该场景下可能会有潜在的数据丢失)
3.3 更新 Leader,ISR,leader_epoch,controller_epoch: 写入 / brokers/topics/[topic]/partitions/[partition]/state
直接通过 RPC 向 set_p 相关的 Broker 发送 LeaderAndISRRequest 命令. Controller 可以在一个 RPC 操作中发送多个命令从而提高效率.
Controller 挂掉
每个 broker 都会在 zookeeper 的临时节点 "/controller" 注册 watcher, 当 controller 宕机时 "/controller" 会消失, 触发 broker 的 watch, 每个 broker 都尝试创建新的 controller path, 只有一个竞选成功并当选为 controller.
使用 Kafka 如何保证幂等性
不丢消息
首先 kafka 保证了对已提交消息的 at least 保证
Sender 有重试机制
producer 业务方在使用 producer 发送消息时, 注册回调函数. 在 onError 方法中重发消息
consumer 拉取到消息后, 处理完毕再 commit, 保证 commit 的消息一定被处理完毕
不重复
consumer 拉取到消息先保存, commit 成功后删除缓存数据
Kafka 高性能
partition 提升了并发
zero-copy
顺序写入
消息聚集 batch
页缓存
业务方对 Kafka producer 的优化
增大 producer 数量
ack 配置
batch
来源: http://developer.51cto.com/art/201909/603365.htm