简介
Apache Kafka 是分布式发布 - 订阅消息系统. 它最初由 LinkedIn 公司开发, 之后成为 Apache 项目的一部分. Kafka 是一种快速, 可扩展的, 设计内在就是分布式的, 分区的和可复制的提交日志服务.
Kafka 架构
它的架构包括以下组件:
话题 (Topic): 是特定类型的消息流. 消息是字节的有效负载(Payload), 话题是消息的分类名或种子(Feed) 名.
生产者(Producer): 是能够发布消息到话题的任何对象.
服务代理 (Broker): 已发布的消息保存在一组服务器中, 它们被称为代理(Broker) 或 Kafka 集群.
消费者(Consumer): 可以订阅一个或多个话题, 并从 Broker 拉数据, 从而消费这些已发布的消息.
Kafka 存储策略
1)kafka 以 topic 来进行消息管理, 每个 topic 包含多个 partition, 每个 partition 对应一个逻辑 log, 有多个 segment 组成.
2)每个 segment 中存储多条消息(见下图), 消息 id 由其逻辑位置决定, 即从消息 id 可直接定位到消息的存储位置, 避免 id 到位置的额外映射.
3)每个 part 在内存中对应一个 index, 记录每个 segment 中的第一条消息偏移.
4)发布者发到某个 topic 的消息会被均匀的分布到多个 partition 上(或根据用户指定的路由规则进行分布),broker 收到发布消息往对应 partition 的最后一个 segment 上添加该消息, 当某个 segment 上的消息条数达到配置值或消息发布时间超过阈值时, segment 上的消息会被 flush 到磁盘, 只有 flush 到磁盘上的消息订阅者才能订阅到, segment 达到一定的大小后将不会再往该 segment 写数据, broker 会创建新的 segment.
Kafka 数据保留策略
1)N 天前的删除.
2)保留最近的多少 Size 数据.
Kafka broker
与其它消息系统不同, Kafka broker 是无状态的. 这意味着消费者必须维护已消费的状态信息. 这些信息由消费者自己维护, broker 完全不管(有 offset managerbroker 管理).
从代理删除消息变得很棘手, 因为代理并不知道消费者是否已经使用了该消息. Kafka 创新性地解决了这个问题, 它将一个简单的基于时间的 SLA 应用于保留策略. 当消息在代理中超过一定时间后, 将会被自动删除.
这种创新设计有很大的好处, 消费者可以故意倒回到老的偏移量再次消费数据. 这违反了队列的常见约定, 但被证明是许多消费者的基本特征.
以下摘抄自 kafka 官方文档:
Kafka Design
目标
1) 高吞吐量来支持高容量的事件流处理
2) 支持从离线系统加载数据
3) 低延迟的消息系统
持久化
1) 依赖文件系统, 持久化到本地
2) 数据持久化到 log
效率
1) 解决 "small IO problem":
使用 "message set" 组合消息.
server 使用 "chunks of messages" 写到 log.
consumer 一次获取大的消息块.
2)解决 "byte copying":
在 producer,broker 和 consumer 之间使用统一的 binary message format.
使用系统的 page cache.
使用 sendfile 传输 log, 避免拷贝.
端到端的批量压缩(End-to-end Batch Compression)
Kafka 支持 GZIP 和 Snappy 压缩协议.
复制(Replication)
1)一个 partition 的复制个数 (replication factor) 包括这个 partition 的 leader 本身.
2)所有对 partition 的读和写都通过 leader.
3)Followers 通过 pull 获取 leader 上 log(message 和 offset)
4)如果一个 follower 挂掉, 卡住或者同步太慢, leader 会把这个 follower 从 "in sync replicas"(ISR)列表中删除.
5)当所有的 "in sync replicas" 的 follower 把一个消息写入到自己的 log 中时, 这个消息才被认为是 "committed" 的.
6)如果针对某个 partition 的所有复制节点都挂了, Kafka 默认选择最先复活的那个节点作为 leader(这个节点不一定在 ISR 里).
Leader 选举
Kafka 在 Zookeeper 中为每一个 partition 动态的维护了一个 ISR, 这个 ISR 里的所有 replica 都跟上了 leader, 只有 ISR 里的成员才能有被选为 leader 的可能(unclean.leader.election.enable=false).
在这种模式下, 对于 f+1 个副本, 一个 Kafka topic 能在保证不丢失已经 commit 消息的前提下容忍 f 个副本的失败, 在大多数使用场景下, 这种模式是十分有利的. 事实上, 为了容忍 f 个副本的失败,"少数服从多数" 的方式和 ISR 在 commit 前需要等待的副本的数量是一样的, 但是 ISR 需要的总的副本的个数几乎是 "少数服从多数" 的方式的一半.
The Producer
发送确认
通过 request.required.acks 来设置, 选择是否等待消息 commit(是否等待所有的 "in sync replicas" 都成功复制了数据)
Producer 可以通过 acks 参数指定最少需要多少个 Replica 确认收到该消息才视为该消息发送成功. acks 的默认值是 1, 即 Leader 收到该消息后立即告诉 Producer 收到该消息, 此时如果在 ISR 中的消息复制完该消息前 Leader 宕机, 那该条消息会丢失.
推荐的做法是, 将 acks 设置为 all 或者 - 1, 此时只有 ISR 中的所有 Replica 都收到该数据(也即该消息被 Commit),Leader 才会告诉 Producer 该消息发送成功, 从而保证不会有未知的数据丢失.
负载均衡
1)producer 可以自定义发送到哪个 partition 的路由规则. 默认路由规则: hash(key)%numPartitions, 如果 key 为 null 则随机选择一个 partition.
2)自定义路由: 如果 key 是一个 user id, 可以把同一个 user 的消息发送到同一个 partition, 这时 consumer 就可以从同一个 partition 读取同一个 user 的消息.
异步批量发送
批量发送: 配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据.
The Consumer
consumer 控制消息的读取.
- Push vs Pull
- 1) producer push data to broker,consumer pull data from broker
2) consumer pull 的优点: consumer 自己控制消息的读取速度和数量.
3) consumer pull 的缺点: 如果 broker 没有数据, 则可能要 pull 多次忙等待, Kafka 可以配置 consumer long pull 一直等到有数据.
Consumer Position
1) 大部分消息系统由 broker 记录哪些消息被消费了, 但 Kafka 不是.
2) Kafka 由 consumer 控制消息的消费, consumer 甚至可以回到一个 old offset 的位置再次消费消息.
Consumer group
每一个 consumer 实例都属于一个 consumer group.
每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费.
不同 consumer group 可以同时消费同一条消息.
- Consumer Rebalance
- Kafka consumer high level API:
如果某 consumer group 中 consumer 数量少于 partition 数量, 则至少有一个 consumer 会消费多个 partition 的数据.
如果 consumer 的数量与 partition 数量相同, 则正好一个 consumer 消费一个 partition 的数据.
如果 consumer 的数量多于 partition 的数量时, 会有部分 consumer 无法消费该 topic 下任何一条消息.
Message Delivery Semantics
三种:
- At most once-Messages may be lost but are never redelivered.
- At least once-Messages are never lost but may be redelivered.
- Exactly once-this is what people actually want, each message is delivered once and only once.
Producer: 有个 "acks" 配置可以控制接收的 leader 的在什么情况下就回应 producer 消息写入成功.
Consumer:
读取消息, 写 log, 处理消息. 如果处理消息失败, log 已经写入, 则无法再次处理失败的消息, 对应 "At most once".
读取消息, 处理消息, 写 log. 如果消息处理成功, 写 log 失败, 则消息会被处理两次, 对应 "At least once".
读取消息, 同时处理消息并把 result 和 log 同时写入. 这样保证 result 和 log 同时更新或同时失败, 对应 "Exactly once".
Kafka 默认保证 at-least-once delivery, 容许用户实现 at-most-once 语义, exactly-once 的实现取决于目的存储系统, kafka 提供了读取 offset, 实现也没有问题.
- Distribution
- Consumer Offset Tracking
1)High-level consumer 记录每个 partition 所消费的 maximum offset, 并定期 commit 到 offset manager(broker).
2)Simple consumer 需要手动管理 offset. 现在的 Simple consumer Java API 只支持 commit offset 到 zookeeper.
Consumers and Consumer Groups
1)consumer 注册到 zookeeper
2)属于同一个 group 的 consumer(group id 一样)平均分配 partition, 每个 partition 只会被一个 consumer 消费.
3)当 broker 或同一个 group 的其他 consumer 的状态发生变化的时候, consumer rebalance 就会发生.
Zookeeper 协调控制
1)管理 broker 与 consumer 的动态加入与离开.
2)触发负载均衡, 当 broker 或 consumer 加入或离开时会触发负载均衡算法, 使得一个 consumer group 内的多个 consumer 的订阅负载平衡.
3)维护消费关系及每个 partition 的消费信息.
日志压缩(Log Compaction)
1)针对一个 topic 的 partition, 压缩使得 Kafka 至少知道每个 key 对应的最后一个值.
2)压缩不会重排序消息.
3)消息的 offset 是不会变的.
4)消息的 offset 是顺序的.
5)压缩发送和接收能降低网络负载.
6)以压缩后的形式持久化到磁盘.
生产者代码示例:
Partitioning Code:
消费者代码示例:
关于 Consumer 的一个细节说明:
topicCountMap.put(topic, new Integer(a_numThreads));
这里, 如果提供的 thread 数目 (a_numThreads) 大于这个 topic 的 partition 的数目, 有些 thread 会永远读不到消息.
如果如果提供的 thread 数目 (a_numThreads) 小于这个 topic 的 partition 的数目, 有些 thread 会从多个 partition 读到消息.
如果一个线程从多个 partition 读取消息, 无法保证的消息的顺序, 只能保证从同一个 partition 读取到的消息是顺序的.
增加更多的进程 / 线程消费消息, 会导致 Kafka re-balance, 可能会改变 Partition 和消费 Thread 的对应关系.
来源: http://blog.51cto.com/13732225/2157293