1.1 概述
Kafka 是最初由 Linkedin 公司开发, 是一个分布式, 分区的, 多副本的, 多订阅者, 基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统), 常见可以用于 web/nginx 日志, 访问日志, 消息服务等等, Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目.
主要应用场景是: 日志收集系统和消息系统.
Kafka 主要设计目标如下:
以时间复杂度为 O(1)的方式提供消息持久化能力, 即使对 TB 级以上数据也能保证常数时间的访问性能.
高吞吐率. 即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输.
支持 Kafka Server 间的消息分区, 及分布式消费, 同时保证每个 partition 内的消息顺序传输.
同时支持离线数据处理和实时数据处理.
支持在线水平扩展
1.2 消息系统介绍
一个消息系统负责将数据从一个应用传递到另外一个应用, 应用只需关注于数据, 无需关注数据在两个或多个应用间是如何传递的. 分布式消息传递基于可靠的消息队列, 在客户端应用和消息系统之间异步传递消息. 有两种主要的消息传递模式: 点对点传递模式, 发布 - 订阅模式. 大部分的消息系统选用发布 - 订阅模式. Kafka 就是一种发布 - 订阅模式.
1.3 点对点消息传递
在点对点消息系统中, 消息持久化到一个队列中. 此时, 将有一个或多个消费者消费队列中的数据. 但是一条消息只能被消费一次. 当一个消费者消费了队列中的某条数据之后, 该条数据则从消息队列中删除. 该模式即使有多个消费者同时消费数据, 也能保证数据处理的顺序. 这种架构描述示意图如下:
生产者发送一条消息到 queue, 只有一个消费者能收到.
1.4 发布 - 订阅消息传递
在发布 - 订阅消息系统中, 消息被持久化到一个 topic 中. 与点对点消息系统不同的是, 消费者可以订阅一个或多个 topic, 消费者可以消费该 topic 中所有的数据, 同一条数据可以被多个消费者消费, 数据被消费后不会立马删除. 在发布 - 订阅消息系统中, 消息的生产者称为发布者, 消费者称为订阅者. 该模式的示例图如下:
发布者发送到 topic 的消息, 只有订阅了 topic 的订阅者才会收到消息.
1.5 Kafka 的优点
1)解耦:
在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的. 消息系统在处理过程中间插入了一个隐含的, 基于数据的接口层, 两边的处理过程都要实现这一接口. 这允许你独立的扩展或修改两边的处理过程, 只要确保它们遵守同样的接口约束.
2)冗余:(副本)
有些情况下, 处理数据的过程会失败. 除非数据被持久化, 否则将造成丢失. 消息队列把数据进行持久化直到它们已经被完全处理, 通过这一方式规避了数据丢失风险. 许多消息队列所采用的 "插入 - 获取 - 删除" 范式中, 在把一个消息从队列中删除之前, 需要你的处理系统明确的指出该消息已经被处理完毕, 从而确保你的数据被安全的保存直到你使用完毕.
3)扩展性
因为消息队列解耦了你的处理过程, 所以增大消息入队和处理的频率是很容易的, 只要另外增加处理过程即可. 不需要改变代码, 不需要调节参数. 扩展就像调大电力按钮一样简单.
4)灵活性 & 峰值处理能力
在访问量剧增的情况下, 应用仍然需要继续发挥作用, 但是这样的突发流量并不常见; 如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费. 使用消息队列能够使关键组件顶住突发的访问压力, 而不会因为突发的超负荷的请求而完全崩溃.
5)可恢复性
系统的一部分组件失效时, 不会影响到整个系统. 消息队列降低了进程间的耦合度, 所以即使一个处理消息的进程挂掉, 加入队列中的消息仍然可以在系统恢复后被处理.
6)顺序保证
在大多使用场景下, 数据处理的顺序都很重要. 大部分消息队列本来就是排序的, 并且能保证数据会按照特定的顺序来处理. Kafka 保证一个 Partition 内的消息的有序性.
7)缓冲
在任何重要的系统中, 都会有需要不同的处理时间的元素. 例如, 加载一张图片比应用过滤器花费更少的时间. 消息队列通过一个缓冲层来帮助任务最高效率的执行 --- 写入队列的处理会尽可能的快速. 该缓冲有助于控制和优化数据流经过系统的速度.
8)异步通信
很多时候, 用户不想也不需要立即处理消息. 消息队列提供了异步处理机制, 允许用户把一个消息放入队列, 但并不立即处理它. 想向队列中放入多少消息就放多少, 然后在需要的时候再去处理它们.
1.6 常用 MQ 对比
1)RabbitMQ
RabbitMQ 是使用 Erlang 编写的一个开源的消息队列, 本身支持很多的协议: AMQP,XMPP, SMTP, STOMP, 也正因如此, 它非常重量级, 更适合于企业级的开发. 同时实现了 Broker 构架, 这意味着消息在发送给客户端时先在中心队列排队. 对路由, 负载均衡或者数据持久化都有很好的支持.
2)Redis
Redis 是一个基于 Key-Value 对的 NoSQL 数据库, 开发维护很活跃. 虽然它是一个 Key-Value 数据库存储系统, 但它本身支持 MQ 功能, 所以完全可以当做一个轻量级的队列服务来使用. 对于 RabbitMQ 和 Redis 的入队和出队操作, 各执行 100 万次, 每 10 万次记录一次执行时间. 测试数据分为 128Bytes,512Bytes,1K 和 10K 四个不同大小的数据. 实验表明: 入队时, 当数据比较小时 Redis 的性能要高于 RabbitMQ, 而如果数据大小超过了 10K,Redis 则慢的无法忍受; 出队时, 无论数据大小, Redis 都表现出非常好的性能, 而 RabbitMQ 的出队性能则远低于 Redis.
3)ZeroMQ
ZeroMQ 号称最快的消息队列系统, 尤其针对大吞吐量的需求场景. ZeroMQ 能够实现 RabbitMQ 不擅长的高级 / 复杂的队列, 但是开发人员需要自己组合多种技术框架, 技术上的复杂度是对这 MQ 能够应用成功的挑战. ZeroMQ 具有一个独特的非中间件的模式, 你不需要安装和运行一个消息服务器或中间件, 因为你的应用程序将扮演这个服务器角色. 你只需要简单的引用 ZeroMQ 程序库, 可以使用 NuGet 安装, 然后你就可以愉快的在应用程序之间发送消息了. 但是 ZeroMQ 仅提供非持久性的队列, 也就是说如果宕机, 数据将会丢失. 其中, Twitter 的 Storm 0.9.0 以前的版本中默认使用 ZeroMQ 作为数据流的传输(Storm 从 0.9 版本开始同时支持 ZeroMQ 和 Netty 作为传输模块).
4)ActiveMQ
ActiveMQ 是 Apache 下的一个子项目. 类似于 ZeroMQ, 它能够以代理人和点对点的技术实现队列. 同时类似于 RabbitMQ, 它少量代码就可以高效地实现高级应用场景.
5)Kafka/Jafka
Kafka 是 Apache 下的一个子项目, 是一个高性能跨语言分布式发布 / 订阅消息队列系统, 而 Jafka 是在 Kafka 之上孵化而来的, 即 Kafka 的一个升级版. 具有以下特性: 快速持久化, 可以在 O(1)的系统开销下进行消息持久化; 高吞吐, 在一台普通的服务器上既可以达到 10W/s 的吞吐速率; 完全的分布式系统, Broker,Producer,Consumer 都原生自动支持分布式, 自动实现负载均衡; 支持 Hadoop 数据并行加载, 对于像 Hadoop 的一样的日志数据和离线分析系统, 但又要求实时处理的限制, 这是一个可行的解决方案. Kafka 通过 Hadoop 的并行加载机制统一了在线和离线的消息处理. Apache Kafka 相对于 ActiveMQ 是一个非常轻量级的消息系统, 除了性能非常好之外, 还是一个工作良好的分布式系统.
1.7 Kafka 中的术语解释
概述
在深入理解 Kafka 之前, 先介绍一下 Kafka 中的术语. 下图展示了 Kafka 的相关术语以及之间的关系:
一条消息在 kafka 中有唯一的 Offset, 即图中的 0 1 2 3 4
写操作都是通过 Leader
Kafka 存储规则 --- 一个主题有多个分区 分区由 log 文件类型来保存消息, 每个分区有一个 segment, 一个 segement 对应一个或多个 log 文件, 同时保存有 log 文件具体的消息索引, log 文件大小由 kafka 配置决定, 达到限制值即创建下一个 log 文件
kafka log 下相应主题分区的文件夹内容 --.log 和. index 说明 Message 限定成指定的大小 方便查找 同时索引文件还会存取具体消息的位置
上图中一个 topic 配置了 3 个 partition.Partition1 有两个 offset:0 和 1.Partition2 有 4 个 offset.Partition3 有 1 个 offset. 副本的 id 和副本所在的机器的 id 恰好相同.
如果一个 topic 的副本数为 3, 那么 Kafka 将在集群中为每个 partition 创建 3 个相同的副本. 集群中的每个 broker 存储一个或多个 partition. 多个 producer 和 consumer 可同时生产和消费数据.
1 broker
Kafka 集群包含一个或多个服务器, 服务器节点称为 broker.
broker 存储 topic 的数据. 如果某 topic 有 N 个 partition, 集群有 N 个 broker, 那么每个 broker 存储该 topic 的一个 partition.
如果某 topic 有 N 个 partition, 集群有 (N+M) 个 broker, 那么其中有 N 个 broker 存储该 topic 的一个 partition, 剩下的 M 个 broker 不存储该 topic 的 partition 数据.
如果某 topic 有 N 个 partition, 集群中 broker 数目少于 N 个, 那么一个 broker 存储该 topic 的一个或多个 partition. 在实际生产环境中, 尽量避免这种情况的发生, 这种情况容易导致 Kafka 集群数据不均衡.(通常不这么干)
2 Topic
每条发布到 Kafka 集群的消息都有一个类别, 这个类别被称为 Topic.(物理上不同 Topic 的消息分开存储, 逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名
3 Partition
topic 中的数据分割为一个或多个 partition. 每个 topic 至少有一个 partition. 每个 partition 中的数据使用多个 segment 文件存储. partition 中的数据是有序的, 不同 partition 间的数据丢失了数据的顺序. 如果 topic 有多个 partition, 消费数据时就不能保证数据的顺序. 在需要严格保证消息的消费顺序的场景下, 需要将 partition 数目设为 1.(不需要严格实时性的 可自行在消息中加标识 )
4 Producer
生产者即数据的发布者, 该角色将消息发布到 Kafka 的 topic 中. broker 接收到生产者发送的消息后, broker 将该消息追加到当前用于追加数据的 segment 文件中. 生产者发送的消息, 存储到一个 partition 中, 生产者也可以指定数据存储的 partition.
5 Consumer
消费者可以从 broker 中读取数据. 消费者可以消费多个 topic 中的数据.
6 Consumer Group
每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name, 若不指定 group name 则属于默认的 group). 这是 kafka 用来实现一个 topic 消息的广播 (发给所有的 consumer) 和单播 (发给任意一个 consumer) 的手段. 一个 topic 可以有多个 CG.topic 的消息会复制 - 给 consumer. 如果需要实现广播, 只要每个 consumer 有一个独立的 CG 就可以了. 要实现单播只要所有的 consumer 在同一个 CG. 用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic.
7 Leader
每个 partition 有多个副本, 其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition.
8 Follower
Follower 跟随 Leader, 所有写请求都通过 Leader 路由, 数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步. 如果 Leader 失效, 则从 Follower 中选举出一个新的 Leader. 当 Follower 与 Leader 挂掉, 卡住或者同步太慢, leader 会把这个 follower 从 "in sync replicas"(ISR)列表中删除, 重新创建一个 Follower.
9 Offset
kafka 的存储文件都是按照 offset.kafka 来命名, 用 offset 做名字的好处是方便查找. 例如你想找位于 2049 的位置, 只要找到 2048.kafka 的文件即可. 当然 the first offset 就是 00000000000.kafka
一, Kafka 的架构(某马老师的图)
(特别提醒: kafka0.9 版本以前 --- 消费者的 Offset 信息存放在 zookeeper 内 0.9 版本以后, 消费者的 offset 信息存放在 kafka 中, kafka 中消息的存取需要限定消息的最大大小, 存取的时候是按相同的大小空间存取的, 同时保存有相应的索引检索到某条消息的具体位置范围, 所以 kafka 虽然是磁盘存取, 但是效率还是挺高的)
如上图所示, 一个典型的 Kafka 集群中包含若干 Producer(可以是 Web 前端产生的 Page View, 或者是服务器日志, 系统 CPU,Memory 等), 若干 broker(Kafka 支持水平扩展, 一般 broker 数量越多, 集群吞吐率越高), 若干 Consumer Group, 以及一个 Zookeeper 集群. Kafka 通过 Zookeeper 管理集群配置, 选举 leader, 以及在 Consumer Group 发生变化时进行 rebalance.Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息.
2.1 分布式模型
Kafka 每个主题的多个分区日志分布式地存储在 Kafka 集群上, 同时为了故障容错, 每个分区都会以副本的方式复制到多个消息代理节点上. 其中一个节点会作为主副本(Leader), 其他节点作为备份副本(Follower, 也叫作从副本). 主副本会负责所有的客户端读写操作, 备份副本仅仅从主副本同步数据. 当主副本出现故障时, 备份副本中的一个副本会被选择为新的主副本. 因为每个分区的副本中只有主副本接受读写, 所以每个服务器端都会作为某些分区的主副本, 以及另外一些分区的备份副本, 这样 Kafka 集群的所有服务端整体上对客户端是负载均衡的.
Kafka 的生产者和消费者相对于服务器端而言都是客户端.
Kafka 生产者客户端发布消息到服务端的指定主题, 会指定消息所属的分区. 生产者发布消息时根据消息是否有键, 采用不同的分区策略. 消息没有键时, 通过轮询方式进行客户端负载均衡; 消息有键时, 根据分区语义 (例如 hash) 确保相同键的消息总是发送到同一分区.
Kafka 的消费者通过订阅主题来消费消息, 并且每个消费者都会设置一个消费组名称. 因为生产者发布到主题的每一条消息都只会发送给消费者组的一个消费者. 所以, 如果要实现传统消息系统的 "队列" 模型, 可以让每个消费者都拥有相同的消费组名称, 这样消息就会负责均衡到所有的消费者; 如果要实现 "发布 - 订阅" 模型, 则每个消费者的消费者组名称都不相同, 这样每条消息就会广播给所有的消费者.
分区是消费者现场模型的最小并行单位. 如下图 (图 1) 所示, 生产者发布消息到一台服务器的 3 个分区时, 只有一个消费者消费所有的 3 个分区. 在下图 (图 2) 中, 3 个分区分布在 3 台服务器上, 同时有 3 个消费者分别消费不同的分区. 假设每个服务器的吞吐量时 300MB, 在下图 (图 1) 中分摊到每个分区只有 100MB, 而在下图 (图 2) 中, 集群整体的吞吐量有 900MB. 可以看到, 增加服务器节点会提升集群的性能, 增加消费者数量会提升处理性能.
同一个消费组下多个消费者互相协调消费工作, Kafka 会将所有的分区平均地分配给所有的消费者实例, 这样每个消费者都可以分配到数量均等的分区. Kafka 的消费组管理协议会动态地维护消费组的成员列表, 当一个新消费者加入消费者组, 或者有消费者离开消费组, 都会触发再平衡操作.
Kafka 的消费者消费消息时, 只保证在一个分区内的消息的完全有序性, 并不保证同一个主题汇中多个分区的消息顺序. 而且, 消费者读取一个分区消息的顺序和生产者写入到这个分区的顺序是一致的. 比如, 生产者写入 "hello" 和 "Kafka" 两条消息到分区 P1, 则消费者读取到的顺序也一定是 "hello" 和 "Kafka". 如果业务上需要保证所有消息完全一致, 只能通过设置一个分区完成, 但这种做法的缺点是最多只能有一个消费者进行消费. 一般来说, 只需要保证每个分区的有序性, 再对消息假设键来保证相同键的所有消息落入同一分区, 就可以满足绝大多数的应用.
二, Topics 和 Partition
Topic 在逻辑上可以被认为是一个 queue, 每条消费都必须指定它的 Topic, 可以简单理解为必须指明把这条消息放进哪个 queue 里. 为了使得 Kafka 的吞吐率可以线性提高, 物理上把 Topic 分成一个或多个 Partition, 每个 Partition 在物理上对应一个文件夹, 该文件夹下存储这个 Partition 的所有消息和索引文件. 创建一个 topic 时, 同时可以指定分区数目, 分区数越多, 其吞吐量也越大, 但是需要的资源也越多, 同时也会导致更高的不可用性, kafka 在接收到生产者发送的消息之后, 会根据均衡策略将消息存储到不同的分区中. 因为每条消息都被 append 到该 Partition 中, 属于顺序写磁盘, 因此效率非常高(经验证, 顺序写磁盘效率比随机写内存还要高, 这是 Kafka 高吞吐率的一个很重要的保证).
对于传统的 message queue 而言, 一般会删除已经被消费的消息, 而 Kafka 集群会保留所有的消息, 无论其被消费与否. 当然, 因为磁盘限制, 不可能永久保留所有数据(实际上也没必要), 因此 Kafka 提供两种策略删除旧数据. 一是基于时间, 二是基于 Partition 文件大小. 例如可以通过配置 $KAFKA_HOME/config/server.properties, 让 Kafka 删除一周前的数据, 也可在 Partition 文件超过 1GB 时删除旧数据, 配置如下所示:
- # The minimum age of a log file to be eligible for deletion
- log.retention.hours=168
- # The maximum size of a log segment file. When this size is reached a new log segment will be created.
- log.segment.bytes=1073741824
- # The interval at which log segments are checked to see if they can be deleted according to the retention policies
- log.retention.check.interval.ms=300000
- # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
- log.cleaner.enable=false
因为 Kafka 读取特定消息的时间复杂度为 O(1), 即与文件大小无关, 所以这里删除过期文件与提高 Kafka 性能无关. 选择怎样的删除策略只与磁盘以及具体的需求有关. 另外, Kafka 会为每一个 Consumer Group 保留一些 metadata 信息 -- 当前消费的消息的 position, 也即 offset. 这个 offset 由 Consumer 控制. 正常情况下 Consumer 会在消费完一条消息后递增该 offset. 当然, Consumer 也可将 offset 设成一个较小的值, 重新消费一些消息. 因为 offet 由 Consumer 控制, 所以 Kafka broker 是无状态的, 它不需要标记哪些消息被哪些消费过, 也不需要通过 broker 去保证同一个 Consumer Group 只有一个 Consumer 能消费某一条消息, 因此也就不需要锁机制, 这也为 Kafka 的高吞吐率提供了有力保障.
三, Producer 消息路由
Producer 发送消息到 broker 时, 会根据 Paritition 机制选择将其存储到哪一个 Partition. 如果 Partition 机制设置合理, 所有消息可以均匀分布到不同的 Partition 里, 这样就实现了负载均衡. 如果一个 Topic 对应一个文件, 那这个文件所在的机器 I/O 将会成为这个 Topic 的性能瓶颈, 而有了 Partition 后, 不同的消息可以并行写入不同 broker 的不同 Partition 里, 极大的提高了吞吐率. 可以在 $KAFKA_HOME/config/server.properties 中通过配置项 num.partitions 来指定新建 Topic 的默认 Partition 数量, 也可在创建 Topic 时通过参数指定, 同时也可以在 Topic 创建之后通过 Kafka 提供的工具修改.
在发送一条消息时, 可以指定这条消息的 key,Producer 根据这个 key 和 Partition 机制来判断应该将这条消息发送到哪个 Parition.Paritition 机制可以通过指定 Producer 的 paritition. class 这一参数来指定, 该 class 必须实现 kafka.producer.Partitioner 接口.
四, Consumer Group
使用 Consumer high level API 时, 同一 Topic 的一条消息只能被同一个 Consumer Group 内的一个 Consumer 消费, 但多个 Consumer Group 可同时消费这一消息.
这是 Kafka 用来实现一个 Topic 消息的广播 (发给所有的 Consumer) 和单播 (发给某一个 Consumer) 的手段. 一个 Topic 可以对应多个 Consumer Group. 如果需要实现广播, 只要每个 Consumer 有一个独立的 Group 就可以了. 要实现单播只要所有的 Consumer 在同一个 Group 里. 用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic.
实际上, Kafka 的设计理念之一就是同时提供离线处理和实时处理. 根据这一特性, 可以使用 Storm 这种实时流处理系统对消息进行实时在线处理, 同时使用 Hadoop 这种批处理系统进行离线处理, 还可以同时将数据实时备份到另一个数据中心, 只需要保证这三个操作所使用的 Consumer 属于不同的 Consumer Group 即可.
五, Push vs. Pull
作为一个消息系统, Kafka 遵循了传统的方式, 选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息. 一些 logging-centric system, 比如 Facebook 的 Scribe 和 Cloudera 的 Flume, 采用 push 模式. 事实上, push 模式和 pull 模式各有优劣.
push 模式很难适应消费速率不同的消费者, 因为消息发送速率是由 broker 决定的. push 模式的目标是尽可能以最快速度传递消息, 但是这样很容易造成 Consumer 来不及处理消息, 典型的表现就是拒绝服务以及网络拥塞. 而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息.
对于 Kafka 而言, pull 模式更合适. pull 模式可简化 broker 的设计, Consumer 可自主控制消费消息的速率, 同时 Consumer 可以自己控制消费方式 -- 即可批量消费也可逐条消费, 同时还能选择不同的提交方式从而实现不同的传输语义.
六, Kafka delivery guarantee
有这么几种可能的 delivery guarantee:
At most once 消息可能会丢, 但绝不会重复传输
At least one 消息绝不会丢, 但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次, 很多时候这是用户所想要的.
当 Producer 向 broker 发送消息时, 一旦这条消息被 commit, 因数 replication 的存在, 它就不会丢. 但是如果 Producer 发送数据给 broker 后, 遇到网络问题而造成通信中断, 那 Producer 就无法判断该条消息是否已经 commit. 虽然 Kafka 无法确定网络故障期间发生了什么, 但是 Producer 可以生成一种类似于主键的东西, 发生故障时幂等性的重试多次, 这样就做到了 Exactly once.
接下来讨论的是消息从 broker 到 Consumer 的 delivery guarantee 语义.(仅针对 Kafka consumer high level API).Consumer 在从 broker 读取消息后, 可以选择 commit, 该操作会在 Zookeeper 中保存该 Consumer 在该 Partition 中读取的消息的 offset. 该 Consumer 下一次再读该 Partition 时会从下一条开始读取. 如未 commit, 下一次读取的开始位置会跟上一次 commit 之后的开始位置相同. 当然可以将 Consumer 设置为 autocommit, 即 Consumer 一旦读到数据立即自动 commit. 如果只讨论这一读取消息的过程, 那 Kafka 是确保了 Exactly once. 但实际使用中应用程序并非在 Consumer 读取完数据就结束了, 而是要进行进一步处理, 而数据处理与 commit 的顺序在很大程度上决定了消息从 broker 和 consumer 的 delivery guarantee semantic.
Kafka 默认保证 At least once, 并且允许通过设置 Producer 异步提交来实现 At most once. 而 Exactly once 要求与外部存储系统协作, 幸运的是 Kafka 提供的 offset 可以非常直接非常容易得使用这种方式.
来源: http://www.bubuko.com/infodetail-3433900.html