1. 简介
Kafka 是一个分布式消息系统, 由 LinkedIn 使用 Scala 语言编写的, 具有高水平扩展和高吞吐量.
目前流行的消息队列主要有三种: ActiveMQ,RabbitMQ,Kafka
* 其中 ActiveMQ,RabbitMQ 均支持 AMQP 协议, Kafka 有其自己的协议 (仿 AMQP, 并不通用), 但目前越来越多的开源分布式处理系统如 Flume(日志收集系统),Storm(实时数据处理系统),Spark(内存数据处理系统),Elasticsearch(全文检索系统) 都支持与 Kafka 的集成.
* 动态扩容: 在不需停止服务的前提下动态的增加或减少消息队列服务器, Kafka 的动态扩容是通过 zookeeper 实现的, zookeeper 上保存着 kafka 的相关状态信息(topic,partition 等).
Kafka 使用场景:
1. 网站活动追踪: 用户的活动追踪, 搜索, 浏览, 点击率等, 将操作信息发布到不同的主题中, 可对数据实时监控, 统计分析用户行为.
2. 日志聚合: 作为一种日志聚合的解决方案( 日志聚合的作用在于可以把来自不同服务器上不同应用程序产生的日志聚合起来, 存放在单一的服务器上, 方便进行搜索和分析)
3. 数据集中管理: 分布式应用产生的数据统一存放在 kafka 集群中, 集中式管理, 供其他程序使用或后续对数据统计分析.
2.AMQP
AMQP(Advanced Message Queuing Protocol), 高级消息队列协议是一个统一消息服务的应用层协议, 为面向消息的中间件所设计, 基于此协议的客户端与消息中间件可传递消息, 并不受客户端和中间件的产品以及开发语言不同所限制.
AMQP 协议模型
生产者(Producer): 往消息队列中发送消息的应用程序
消费者(Consumer): 从消息队列中获取消息的应用程序
AMQP 服务器(Broker): 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
* 消息队列以 broker 为最小的运行单元, 一个 broker 的运行就代表着一个 Kafka 应用程序实例.
*Kafka 客户端支持的语言: C,C++,Erlang,Java,.net,perl,PHP,Python,Ruby,Go,Javascript, 可以使用任何一种语言和 Kafka 服务器进行通信, 编写自己的生产者与消费者客户端程序.
3.Kafka 的组件
1. 核心组件 Broker
*broker 中可以包含多个主题, 每个主题又可以包含多个分区.
主题(topic)
一个主题类似新闻中的体育, 娱乐, 教育等分类概念, 在实际工程中通常一个业务一个主题.
分区(partition)
一个 topic 中由一到多个分区组成, 分区是 Kafka 结构的最小单元, 一个分区就是一个 FIFO(First In First Out)的队列, 用于存放 topic 中的消息,
*Kafka 的分区是提高 Kafka 性能的关键手段, 当 Kafka 集群的性能不高时, 可以试着往 topic 中添加分区.
Kafka 的分区模型
,
* 可见每个分区都是一个先进先出的队列, 生产者往 broker 指定的 topic 发送消息时, 通常由负载均衡机制随机选择分区存储, 也可以通过特定的分区函数选择分区, 最终的消息会从队顶进行追加.
* 每个消息都有一个连续的序列号叫做 offset(偏移量), 是消息在分区中的唯一标识.
* 每个 consumer 都需要维护消息在分区中的位置(偏移量), 随着 consumer 不断的读取消息, offset 的值也会不断的增加, consumer 也可以以任意的顺序读取消息, 只需要设置偏移量即可.
* 每个 consumer 的偏移量都会同步给 Kafka, 在 Kafka 集群中同时会维护各个 consumer 消费的偏移量..
* 在一个可配置的时间段内, Kafka 集群将保留所有发布的消息, 不管这些消息是否被被消费.
Kafka 的分区备份
* 每个分区在 Kafka 集群的若干服务中都有副本( 数量可配置 ), 使 Kafka 具备了容错能力.
* 在逻辑上相关的一组分区中, 都由一个服务器作为 leader, 其余服务器作为 follower,leader 和 follwer 的选举是随机的, 当 follower 接收到请求首先会发送给 leader, 由 leader 负责消息的读和写并将消息同步给各个 follower, 如果 Leader 所在节点宕机, followers 中的一台则会自动成为 leader.
2.Producer 生产者
Producer 将消息发布到其指定的 topic 中并负责决定发布到哪个分区, 通常简单的由负载均衡机制随机选择分区, 但也可以通过特定的分区函数选择分区.
3.Consumer 消费者
Kafka 中提供了 Consumer 组的概念, 一个 Consumer 组中包含若干个 Consumer, 整体对外可看成是一个消费者.
传统的消息队列模式
传统的消息队列能顺序的保存同一个生产者发送的消息, 但尽管服务器保证了消息的顺序, 但消息最终是通过异步的方式发送给各个消费者, 当多个消费者并行消费时, 并不能保证队列中消息能按顺序到达各个消费者中.
*Kafka 采用的策略: 一个 topic 中的各个 partition 只能被 consumer 组下的唯一一个 consumer 消费, 用于保证消息到达的顺序, 因此同一个组下的 consumer 的数量不能超 topic 中的分区数, 否则其他 consumer 将会处于空闲状态.
队列模式
若所有的消费者都在同一个 consumer 组中则成为队列模式, topic 中各个分区的消息仅能被组中分区个的唯一 consumer 消费, 组下的 consumer 共同竞争 topic 中的分区.
发布 - 订阅模式
若所有的消费者都不在同一个 consumer 组中则成为发布 - 订阅模式, topic 中各个分区的消息都会广播给所有的 consumer 组.
4.Kafka 的使用
1. 安装
由于 Kafka 使用 scala 语言编写, scale 语言运行在 JVM 中, 因此需要先安装 JDK 并且配置好环境变量.
由于 Kafka 中的状态信息 (topic,partition) 都保存在 zk 上, 虽然 Kafka 中自带 zk, 但一般是使用外置的 zk 集群, 因此需要先安装 zk 服务并且配置好 zk 集群关系.
从 Kafka 官网中下载 http://kafka.apache.org/downloads 安装包并进行解压.
2. 配置文件
config 是 Kafka 配置文件的存放目录
server.proeperties(broker 的配置文件)
* 由于多个 Kafka 服务 (broker) 都使用同一个 zk 集群, 因此在同一个 zk 集群中的 Kafka 也就自动成为集群的关系, 因此 borker.id 在同一个集群中不能重复.
*Kafka 中的消息是缓存到本地磁盘的(log.dirs 目录下), 每个 topic 的分区在 broker 的日志路径下都对应一个目录, 目录下的. log 文件用于存放分区中的消息, 当有新消息进入分区时直接追加到文件中.
* 若创建的 topic 其备份数大于 1(状态保存在 zk), 则 Kafka 集群中备份数个 broker 也会创建此 topic, 因此在其日志路径下也会存在此 topic 各个分区的目录.
consumer.properties(消费者的配置文件)
* 在使用 Kafka 提供的消费者脚本文件时可以指定其使用的配置文件.
* 在程序中使用时需要手动设置配置项
producer.properties(生产者的配置文件)
* 在使用 Kafka 提供的生产者脚本文件时可以指定其使用的配置文件.
* 在程序中使用时需要手动设置配置项
3. 启动
1. 启动 zk 集群
逐一启动 zk 服务
2. 启动 Kafka 集群
逐一启动 kafka 服务并指定使用的配置文件( service.properties 文件中配置使用外置的 ZK 集群)
3. 创建主题
创建名为 chat 的 topic, 其备份数为 3 并且每个 topic 下存在 3 个分区(由于启动了 3 个 broker, 因此 topic 的备份数最多只能是 3)
创建 topic 时需指定 zk 服务地址, zk 中保存了 topic 的相关属性(备份数和分区数),Kafka 集群再从 zk 服务中获取 topic 属性信息并在 Kafka 集群中备份数个节点创建该 topic
查看各个 broker 中的日志目录, 可见目录下都生成了 chat-0,chat-1,chat2 分别表示 chat 主题中的第一个, 第二个, 第三个分区, 每个分区中都有. log 文件存放分区中的消息.
查看 Kafka 集群中 chat 主题下各个分区的状态
Topic: 主题名称
PartitionCount: 主题包含的分区数
ReplicationFactor:topic 的备份数
Partition: 分区号
Leader: 充当 leader 的 broker 节点
Replicas: 存在备份的 broker 节点(不管存活)
Isr: 存在备份并且存活的 broker 节点
4. 生产者发送消息
往 Kafka 集群中的 chat 主题发送消息
* 消息将会根据负载均衡机制随机进入分区
5. 消费者订阅消息
* 由于使用脚本文件启动消费者时没有指定使用的配置文件, 所以三个消费者都不是同一个消费者组中, 因此三个消费者都能够消费到 chat 主题中各个分区的消息.
* 启动了三个消费者并指定使用的配置文件, 默认的 group.id 是 test-consumer-group, 因此三个消费者都属于同一个消费者组中, topic 中各个分区仅能被组下的唯一 consumer 消费.
* 由于启动第一个消费者时, 消费者组下只有一个消费者, 因此消息都会被此消费者消费, 当往消费者组添加消费者并且生产者往主题添加消息, 此时消费者会重新竞争消息.
6.Java 操作 Kafka
导入 Kafka 依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>0.11.0.1</version>
- </dependency>
1. 创建主题
- ZkUtils zkUtils = ZkUtils.apply("192.168.1.80:2181,192.168.1.81:2181,192.168.1.82:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
- // 创建一个名为 chat 的主题其包含 2 个分区, 备份数是 3
AdminUtils.createTopic(zkUtils, "chat", 2, 3, new Properties(), RackAwareMode.Enforced$.MODULE$);
zkUtils.close();
2. 生产者
- // 创建 Properties 对象用于封装配置项
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 创建 KafkaProducer 生产者实例
- Producer<String, String> producer = new KafkaProducer<>(props);
- // 创建生产端消息实体 ProducerRecord 并指定消息上传的 topic 名称, 消息的 Key, 消息的 Value(消息以 Key-Value 的形式存在)
- ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic","key","value");
- // 发送消息
- producer.send(record);
- // 关闭连接
- producer.close();
*KafkaProducer 是线程安全的, 在线程之间可以共享单个生产者实例.
*send()方法是异步的, 一旦消息被保存在
待发送缓冲区中此方法就
立即返回, 其返回 Future<RecordMetadata > 实例, 当调用该实例的 get()方法时将会阻塞直到服务器对请求进行应答(阻塞时长跟 acks 配置项有关), 当服务器处理异常时将抛出异常.
* 使用 Kafka API 往 topic 发送消息时, 默认情况下将会根据消息的 Key 值进行散列来决定消息到达的分区(Key 相同则消息位于的分区必定相同).
* 生产者的缓冲区保留尚未发送到服务器的消息, 后台 I/O 线程负责将这些消息转换成请求发送到集群, 若使用后不关闭生产者则会泄露这些资源.
2. 消费者
- // 创建 Properties 对象用于封装配置项
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.1.80:9092,192.168.1.81:9092,192.168.1.82:9092");
- props.put("group.id", "consumerA");
- // 自动提交 Consumer 的偏移量给 Kafka 服务
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "earliest");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- // 创建 KafkaConsumer 消费者实例
- Consumer<String, String> consumer = new KafkaConsumer<>(props);
- // 订阅主题, 一个消费者实例可以订阅多个主题
- consumer.subscribe(Arrays.asList("chat", "hello"));
- // 接收数据, 消息存放在 ConsumerRecords 消息集合中
- ConsumerRecords<String, String> records = consumer.poll(1000*5);
- // 遍历消费端消息集合获取 ConsumerRecord 消费端消息实体, 一个消费端消息实体包含偏移量, 消息 Key 值, 消息 Value 值
- for (ConsumerRecord<String, String> record : records){
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
*poll(long blockTime)方法用于接收 topic 中的消息, 当没有消息时将会等待 blockTime 的时间(单位: 毫秒), 执行结果需结合 auto.offset.reset 配置项.
* 使用 commitSync()方法可以手动同步消费者的偏移量给 Kafka(若设置自动提交偏移量给 Kafka, 当消费消息后, 后续需要进行入库或其他操作失败了, 那么数据将会丢失, 需要重新去消费).
* 使用 seek(TopicPartition, long)方法手动设置消费者的偏移量.
来源: https://www.cnblogs.com/funyoung/p/8855714.html