1 Kafka 介绍
Kafka 是最初由 Linkedin 公司开发, 是一个分布式, 支持分区的(partition), 多副本的(replica), 基于 zookeeper 协调的分布式消息系统, 它的最大的特性就是可以实时的处理大量数据以满足各种需求场景: 比如基于 hadoop 的批处理系统, 低延迟的实时系统, storm/Spark 流式处理引擎, web/nginx 日志, 访问日志, 消息服务等等, 用 scala 语言编写, Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源 项目.
2 为什么使用 Kafka
2.1 解耦
允许你独立的扩展或修改两边的处理过程, 只要确保它们遵守同样的接口约束.
2.2 冗余
消息队列把数据进行持久化直到它们已经被完全处理, 通过这一方式规避了数据丢失风险. 许多消息队列所采用的 "插入 - 获取 - 删除" 范式中, 在把一个消息从队列中删除之前, 需要你的处理系统明确的指出该消息已经被处理完毕, 从而确保你的数据被安全的保存直到你使用完毕.
2.3 扩展性
因为消息队列解耦了你的处理过程, 所以增大消息入队和处理的频率是很容易的, 只要另外增加处理过程即可.
2.4 灵活性 & 峰值处理能力
在访问量剧增的情况下, 应用仍然需要继续发挥作用, 但是这样的突发流量并不常见. 如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费. 使用消息队列能够使关键组件顶住突发的访问压力, 而不会因为突发的超负荷的请求而完全崩溃.
2.5 可恢复性
系统的一部分组件失效时, 不会影响到整个系统. 消息队列降低了进程间的耦合度, 所以即使一个处理消息的进程挂掉, 加入队列中的消息仍然可以在系统恢复后被处理.
2.6 顺序保证
在大多使用场景下, 数据处理的顺序都很重要. 大部分消息队列本来就是排序的, 并且能保证数据会按照特定的顺序来处理.(Kafka 保证一个 Partition 内的消息的有序性)
2.7 缓冲
有助于控制和优化数据流经过系统的速度, 解决生产消息和消费消息的处理速度不一致的情况.
2.8 异步通信
很多时候, 用户不想也不需要立即处理消息. 消息队列提供了异步处理机制, 允许用户把一个消息放入队列, 但并不立即处理它. 想向队列中放入多少消息就放多少, 然后在需要的时候再去处理它们.
3 Kafka 架构
image
3.1 Broker
kafka 集群中包含的服务器. 一个单独的 Kafka server 就是一个 Broker.Broker 的主要工作就是接受生产者发过来的消息, 分配 offset, 之后保存到磁盘中, 同时, 接收消费者, 其他 Borker 的请求, 根据请求类型进行相应处理并返回响应. 多个 Broker 可以做成一个 Cluster 对外提供服务, 每个 Cluster 当中会选举出一个 Broker 来担任 Controller,Controller 是 Kafka 集群的指挥中心, 而其他 Broker 则听从 Controller 指挥实现相应的功能. Controller 负责管理分区的状态, 管理每个分区的副本的状态, 监听 Zookeeper 中数据的变化等工作. Controller 也是一主多从的实现, 所有的 Broker 都会监听 Controller Leader 的状态, 当 Controller Leader 出现故障时则重新选举新的 Controller Leader.
3.2 Producer
producer 发送消息到 broker 时, 会根据分区算法选择将其存储到哪一个 partition. 其路由机制为:
1 指定了 patition, 则直接使用.
2 未指定 patition 但指定 key, 通过对 key 的 value 进行 hash 选出一个 patition.
3 patition 和 key 都未指定, 使用轮询选出一个 patition.
image
流程说明:
1 producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader.
2 producer 将消息发送给该 leader.
3 leader 将消息写入本地 log.
4 followers 从 leader pull 消息, 写入本地 log 后 leader 发送 ACK.
5 leader 收到所有 ISR 中的 replica 的 ACK 后, 增加 HW(high watermark, 最后 commit 的 offset) 并向 producer 发送 ACK.
下面来介绍消息传递的保证 (Delivery guarantee semantic) 的相关内容, 传递保证语义有以下三个级别.
1 At most once: 消息可能会丢, 但绝不会重复传递.
2 At least once: 消息绝不会丢, 但可能会重复传递.
3 Exactly once: 每条消息都只被传递一次.
当 producer 向 broker 发送消息时, 一旦这条消息被 commit, 由于 replication 的存在, 它就不会丢. 但是如果 producer 发送数据给 broker 后, 遇到网络问题而造成通信中断, 那 Producer 就无法判断该条消息是否已经 commit. 虽然 Kafka 无法确定网络故障期间发生了什么. 为了实现 Exactly once 语义, 这里提供两种可选方案:
1 每个分区只有一个生产者写入消息, 当出现异常或超时的情况时, 生产者就要查询此分区的最后一个消息, 用来决定后续操作的消息重传还是继续发送.
2 为每个消息添加一个全局唯一主键, 生产者不做其他特殊处理, 按照之前分析的方式进行重传, 由消费者进行去重.
3.3 Topic&Log
每条发布到 kafka 集群的消息属于的类别, 即 kafka 是面向 Topic 的. 每个 Topic 可以划分成多个分区, 同一个 Topic 下的不同分区包含的消息是不同的. 每个消息在被添加到分区时, 都会被分配到一个 offset, 它是消息在此分区中的唯一编号, Kafka 通过 offset 保证消息的分区内的顺序, offset 的顺序性不夸分区, 即 Kafka 只保证在同一个分区内的消息是有序的.
image
同一 Topic 的不同分区会分配在不同的 Broker 上. 分区是 Kafka 水平扩展性的基础, 我们可以通过增加服务器并在其上分配 Partition 的方式来增加 Kafka 的并行处理能力.
分区在逻辑上对应着一个 Log, 当生产者将消息写入分区时, 实际上是写入到了分区对应的 Log 中. Log 是一个逻辑概念, 可以对应到磁盘上的一个文件夹. Log 有多个 Segment 组成, 每个 Segment 对应一个日志文件和索引文件. 在面对海量数据时, 为避免 Segment 出现超大文件, 每个日志文件的大小是有限制的, 当超出限制后会创建新的 Segment 继续对外提供服务. 这里要注意, 因为 Kafka 采用顺序 IO, 所以只向最新的 Segment 追加数据. 为了权衡文件大小, 索引速度, 占用内存大小等多方面因素, 索引文件采用稀疏索引的方式, 大小并不会很大, 在运行时会将其内容映射到内存, 提高索引速度.
3.4 Partition
每一个 Topic 都可以划分成多个 Partition(每一个 Topic 都至少有一个 Partition), 不同的 Partition 会分配在不同的 Broker 上以对 Kafka 进行水平扩展从而增加 Kafka 的并行处理能力. 同一个 Topic 下的不同 Partition 包含的消息是不同的. 每一个消息在被添加到 Partition 的时候, 都会被分配一个 offset, 他是消息在此分区中的唯一编号, 此外, Kafka 通过 offset 保证消息在 Partition 中的顺序, offset 的顺序性不跨 Partition, 也就是说在 Kafka 的同一个 Partition 中的消息是有序的, 不同 Partition 的消息可能不是有序的.
3.5 Consumer
消费者 (Consumer) 的主要工作是从 Topic 中拉取消息, 并对消息进行消费. 某个消息消费到 Partition 的哪个位置 (offset) 的相关信息, 是 Consumer 自己维护的.
image
这样设计非常巧妙. 避免了 Kafka Server 端维护消费者消费位置的开销, 尤其是在消费数量较多的情况下. 另一方面, 如果是由 Kafka Server 端管理每个 Consumer 消费状态, 一旦 Kafka Server 端出现延时或是消费状态丢失, 将会影响大量的 Consumer. 同时, 这一设计也提高了 Consumer 的灵活性, Consumer 可以按照自己需要的顺序和模式拉取消息进行消费. 例如: Consumer 可以通过修改其消费者的位置实现针对某些特殊 key 的消息进行反复消费, 或是跳过某些消息的需求.
3.6 Consumer group
high-level consumer API 中, 每个 consumer 都属于一个 consumer group, 每条消息只能被 consumer group 中的一个 Consumer 消费, 但可以被多个 consumer group 消费.
3.7 Replica
Kafka 对消息进行了冗余备份, 每个 Partition 分区都可以有多个副本, 每一个副本中包含的消息是相同的(但不保证同一时刻下完全相同). 每个分区至少有一个副本, 当分区只有一个副本的时候, 就只有 Leader 副本, 没有 Follower. 在每个副本集合中, 都会选举出一个副本作为 Leader 副本, Kafka 在不同的场景中会采用不同的选举策略. Kafka 中所有的读写请求都由选举出的 Leader 副本处理, 其他的都作为 Follower 副本, Follower 副本仅仅是从 Leader 副本中把数据拉取到本地之后, 同步更新到自己的 Log 中.
3.8 ISR
ISR(In-Sync-Replica)集合表示的是目前可用 (alive) 且消息量与 Leader 相差不多的副本集合, 这是整个副本集合的一个子集. ISR 集合中的副本必须满足下面两个条件:
(1)副本所在节点必须维持着与 ZooKeeper 的链接.
(2)副本最后一条消息的 offset 与 Leader 副本的最后一条消息的 offset 之间的差值不能超出指定的阈值.
每个分区中的 Leader 都会维护此分区的 ISR 集合, 写请求首先由 Leader 副本处理, 之后 Follower 副本都会从 Leader 上拉取写入的消息, 这个过程会有一定的延迟, 导致 Follower 副本中保存的消息略少于 Leader 副本, 只要未超出阈值都是可以容忍的. 如果一个 Follower 副本出现异常, 比如: 宕机, 发生长时间 GC 而导致 Kafka 僵死或是网络断开连接导致长时间没有拉取消息进行同步, 就回违反上面两个条件, 从而被 Leader 副本踢出 ISR 集合. 当 Follower 副本从异常中恢复之后, 会继续与 Leader 副本进行同步, 当 Follower 副本追上(即最后一条消息的 offset 的差值小于指定阈值)Leader 副本的时候, 此 Follower 副本会被 Leader 副本重新加入到 ISR 中.
3.9 HW&LEO
HW(High Watermark)和 LEO 与 ISR 集合紧密相关. HW 标记了一个特殊的 offset, 当消费者处理消息的时候, 只能拉取到 HW 之前的消息, HW 之后的消息对消费者来说是不可见的. 与 ISR 集合类似, HW 也是由 Leader 副本管理的. 当 ISR 集合中全部的 Follower 副本都拉取 HW 指定消息进行同步后, Leader 副本会递增 HW 的值. Kafka 官方网站将 HW 之前的消息的状态称为 commit, 其含义是这些消息在多个副本中同时存在, 即使 Leader 副本损坏, 也不会出现数据丢失.
LEO(Log End Offset)是所有的副本都会有的一个 offset 标记, 它指向追加到当前副本的最后一个消息的 offset. 当生产者向 Leader 副本追加消息的时候, Leader 副本的 LEO 标记会递增; 当 Follower 副本成功从 Leader 副本拉取消息并更新到本地的时候, Follower 副本的 LEO 就会增加.
为了让读者更好的理解 HW 和 LEO 之间的关系, 下面通过一个示例进行分析 ISR 集合, HW 和 LEO 是如何协调工作的.
image
1 Poducer 向此 Partition 推送消息.
2 Leader 副本将消息追加到 Log 中, 并递增其 LEO.
3 Follower 副本从 Leader 副本拉取消息并进行同步.
4 Follower 副本将拉取到的消息更新到本地 Log 中, 并递增其 LEO.
5 当 ISR 集合中所有副本都完成了对 offset 消息的同步, Leader 副本会递增 HW.
3.10 ZooKeeper
kafka 通过 zookeeper 来存储集群的 meta 信息.
来源: http://www.jianshu.com/p/2d1fea813ee3