前言
在上一篇 Kafka 使用 Java 实现数据的生产和消费 demo 中介绍如何简单的使用 kafka 进行数据传输本篇则重点介绍 kafka 中的 consumer 消费者的讲解
应用场景
在上一篇 kafka 的 consumer 消费者, 我们使用的是自动提交 offset 下标
但是 offset 下标自动提交其实在很多场景都不适用, 因为自动提交是在 kafka 拉取到数据之后就直接提交, 这样很容易丢失数据, 尤其是在需要事物控制的时候
很多情况下我们需要从 kafka 成功拉取数据之后, 对数据进行相应的处理之后再进行提交如拉取数据之后进行写入 mysql 这种 , 所以这时我们就需要进行手动提交 kafka 的 offset 下标
这里顺便说下 offset 具体是什么
offset: 指的是 kafka 的 topic 中的每个消费组消费的下标
简单的来说就是一条消息对应一个 offset 下标, 每次消费数据的时候如果提交 offset, 那么下次消费就会从提交的 offset 加一那里开始消费
比如一个 topic 中有 100 条数据, 我消费了 50 条并且提交了, 那么此时的 kafka 服务端记录提交的 offset 就是 49(offset 从 0 开始), 那么下次消费的时候 offset 就从 50 开始消费
测试
说了这么, 那么我们开始进行手动提交测试
首先, 使用 kafka 的 producer 程序往 kafka 集群发送了 100 条测试数据
程序打印中已经成功发送了, 这里我们在 kafka 服务器使用命令中来查看是否成功发送.
命令如下:
kafka - console - consumer.sh--zookeeper master: 2181--topic KAFKA_TEST2--from - beginning
注:
1.master 是我在 linux 中做了 IP 映射的关系, 实际可以换成 IP
2. 因为 kafka 是集群, 所以也可以在集群的其他机器进行消费
可以看到已经成功发送了 100 条
成功发送消息之后, 我们再使用 kafka 的 consumer 进行数据消费
因为是用来测试手动提交
所以 将 enable.auto.commit 改成 false 进行手动提交
并且设置每次拉取最大 10 条
- props.put("enable.auto.commit", "false");
- props.put("max.poll.records", 10);
将提交方式改成 false 之后
需要手动提交只需加上这段代码
consumer.commitSync();
那么首先尝试消费不提交, 测试能不能重复消费
右键运行 main 方法进行消费, 不提交 offset 下标
成功消费之后, 结束程序, 再次运行 main 方法进行消费, 也不提交 offset 下标
并未手动进行提交, 而且并未更改消费组名, 但是可以看到已经重复消费了!
接下来, 开始测试手动提交
测试目的:
1. 测试手动提交之后的 offset, 能不能再次消费
2. 测试未提交的 offset, 能不能再次进行消费
测试方法: 当消费到 50 条的时候, 进行手动提交, 然后剩下的 50 条不进行提交
希望达成的目的: 手动提交的 offset 不能再次消费, 未提交的可以再次进行消费
为了达到上述目的, 我们测试只需添加如下代码即可:
- if (list.size() == 50) {
- consumer.commitSync();
- }
更改代码之后, 开始运行程序
测试示例图如下:
简单的一看, 和之前未提交的一样, 貌似没有什么问题
但是正常来说, 未提交的下标不应该重复进行消费, 直到它提交为止吗?
因为要进行重复消费, 但是 messageNo 会一直累加, 只会手动的提交前 50 条 offset,
后面的 50 条 offset 会一直无法消费, 所以打印的条数不应该是 100, 而是应该一直打印
那么测试的结果和预想的为什么不一致呢?
之前不是已经测试过可以重复消费未提交的 offset 吗?
其实这点可以根据两次启动方式的不同而得出结论
开始测试未提交重复消费的时候, 实际我是启动 - 暂停 - 启动, 那么本地的 consumer 实际是被初始化过两次
而刚刚测试的实际 consumer 只有初始化一次
至于为什么初始化一次就不行呢?
因为 kafka 的 offset 下标的记录实际会有两份, 服务端会自己记录一份, 本地的消费者客户端也会记录一份, 提交的 offset 会告诉服务端已经消费到这了, 但是本地的并不会因此而改变 offset 进行再次消费
简单的来说假如有 10 条数据, 在第 5 条的时候进行提交了 offset 下标, 那么服务端就知道该组消费的下标到第 5 条了, 如果同组其他的 consumer 进行消费的时候就会从第 6 条开始进行消费但是本地的消费者客户端并不会因此而改变, 它还是会继续消费下去, 并不会再次从第 6 条开始消费, 所以会出现上图情况
但是项目中运行之后, 是不会因此而重启的, 所以这时我们可以换一种思路
就是如果触发某个条件, 所以导致 offset 未提交, 我们就可以关闭之前的 consumer, 然后新 new 一个 consumer, 这样就可以再次进行消费了! 当然配置要和之前的一样
那么将之前的提交代码更改如下:
- if (list.size() == 50) {
- consumer.commitSync();
- } else if (list.size() > 50) {
- consumer.close();
- init();
- list.clear();
- list2.clear();
- }
注: 这里因为是测试, 为了简单明了, 所以条件我写的很简单实际情况请根据个人的为准
示例图如下:
说明:
1. 因为每次是拉取 10 条, 所以在 60 条的时候 kafka 的配置初始化了, 然后又从新拉取了 50-60 条的数据, 但是没有提交, 所以并不会影响实际结果
2. 这里为了方便截图展示, 所以打印条件改了, 但是不影响程序!
从测试结果中, 我们达到了之前想要测试的目的, 未提交的 offset 可以重复进行消费
这种做法一般也可以满足大部分需求
例如从 kafka 获取数据入库, 如果一批数据入库成功, 就提交 offset, 否则不提交, 然后再次拉取
但是这种做法并不能最大的保证数据的完整性比如在运行的时候, 程序挂了之类的
所以还有一种方法是手动的指定 offset 下标进行获取数据, 直到 kafka 的数据处理成功之后, 将 offset 记录下来, 比如写在数据库中那么这种做法, 等到下一篇再进行尝试吧!
来源: https://www.cnblogs.com/xuwujing/p/8432984.html