在很多的流处理框架的介绍中, 都会说 kafka 是一个可靠的数据源, 并且推荐使用 Kafka 当作数据源来进行使用. 这是因为与其他消息引擎系统相比, kafka 提供了可靠的数据保存及备份机制. 并且通过消费者位移这一概念, 可以让消费者在因某些原因宕机而重启后, 可以轻易得回到宕机前的位置.
但其实 kafka 的可靠性也只能说是相对的, 在整条数据链条中, 总有可以让数据出现丢失的情况, 今天就来讨论如何避免 kafka 数据丢失, 以及实现精确一致处理的语义.
kafka 无消息丢失处理
在讨论如何实现 kafka 无消息丢失的时候, 首先要先清楚大部分情况下消息丢失是在什么情况下发生的. 为什么是大部分, 因为总有一些非常特殊的情况会被人忽略, 而我们只需要关注普遍的情况就足够了. 接下来我们来讨论如何较为普遍的数据丢失情况.
1.1 生产者丢失
前面介绍 Kafka 分区和副本的时候, 有提到过一个 producer 客户端有一个 acks 的配置, 这个配置为 0 的时候, producer 是发送之后不管的, 这个时候就很有可能因为网络等原因造成数据丢失, 所以应该尽量避免. 但是将 ack 设置为 1 就没问题了吗, 那也不一定, 因为有可能在 leader 副本接收到数据, 但还没同步给其他副本的时候就挂掉了, 这时候数据也是丢失了. 并且这种时候是客户端以为消息发送成功, 但 kafka 丢失了数据.
要达到最严格的无消息丢失配置, 应该是要将 acks 的参数设置为 - 1(也就是 all), 并且将 min.insync.replicas 配置项调高到大于 1, 这部分内容在上一篇副本机制有介绍详细解析 kafka 之 kafka 分区和副本.
同时还需要使用带有回调的 producer API, 来发送数据. 注意这里讨论的都是异步发送消息, 同步发送不在讨论范围.
- public class send{
- ......
- public static void main(){
- ...
- /*
- * 第一个参数是 ProducerRecord 类型的对象, 封装了目标 Topic, 消息的 kv
- * 第二个参数是一个 CallBack 对象, 当生产者接收到 Kafka 发来的 ACK 确认消息的时候,
- * 会调用此 CallBack 对象的 onCompletion() 方法, 实现回调功能
- */
- producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
- new DemoCallBack(startTime, messageNo, messageStr));
- ...
- }
- ......
- }
- class DemoCallBack implements Callback {
- /* 开始发送消息的时间戳 */
- private final long startTime;
- private final int key;
- private final String message;
- public DemoCallBack(long startTime, int key, String message) {
- this.startTime = startTime;
- this.key = key;
- this.message = message;
- }
- /**
- * 生产者成功发送消息, 收到 Kafka 服务端发来的 ACK 确认消息后, 会调用此回调函数
- * @param metadata 生产者发送的消息的元数据, 如果发送过程中出现异常, 此参数为 null
- * @param exception 发送过程中出现的异常, 如果发送成功为 null
- */
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (metadata != null) {
- System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n",
- key, message, metadata.partition(), metadata.offset(), elapsedTime);
- } else {
- exception.printStackTrace();
- }
- }
- }
更详细的代码可以参考这里: Kafka 生产者分析 --KafkaProducer.
我们之前提到过, producer 发送到 kafka broker 的时候, 是有多种可能会失败的, 而回调函数能准确告诉你是否确认发送成功, 当然这依托于 acks 和 min.insync.replicas 的配置. 而当数据发送丢失的时候, 就可以进行手动重发或其他操作, 从而确保生产者发送成功.
1.2 kafka 内部丢失
有些时候, kafka 内部因为一些不大好的配置, 可能会出现一些极为隐蔽的数据丢失情况, 那么我们分别讨论下大致都有哪几种情况.
首先是 replication.factor 配置参数, 这个配置决定了副本的数量, 默认是 1. 注意这个参数不能超过 broker 的数量. 说这个参数其实是因为如果使用默认的 1, 或者不在创建 topic 的时候指定副本数量(也就是副本数为 1), 那么当一台机器出现磁盘损坏等情况, 那么数据也就从 kafka 里面丢失了. 所以 replication.factor 这个参数最好是配置大于 1, 比如说 3.
接下来要说的还是和副本相关的, 也是上一篇副本中提到的 unclean.leader.election.enable 参数, 这个参数是在主副本挂掉, 然后在 ISR 集合中没有副本可以成为 leader 的时候, 要不要让进度比较慢的副本成为 leader 的. 不用多说, 让进度比较慢的副本成为 leader, 肯定是要丢数据的. 虽然可能会提高一些可用性, 但如果你的业务场景丢失数据更加不能忍受, 那还是将 unclean.leader.election.enable 设置为 false 吧.
1.3 消费者丢失
消费者丢失的情况, 其实跟消费者位移处理不当有关. 消费者位移提交有一个参数, enable.auto.commit, 默认是 true, 决定是否要让消费者自动提交位移. 如果开启, 那么 consumer 每次都是先提交位移, 再进行消费, 比如先跟 broker 说这 5 个数据我消费好了, 然后才开始慢慢消费这 5 个数据.
这样处理的话, 好处是简单, 坏处就是漏消费数据, 比如你说要消费 5 个数据, 消费了 2 个自己就挂了. 那下次该 consumer 重启后, 在 broker 的记录中这个 consumer 是已经消费了 5 个的.
所以最好的做法就是将 enable.auto.commit 设置为 false, 改为手动提交位移, 在每次消费完之后再手动提交位移信息. 当然这样又有可能会重复消费数据, 毕竟 exactly once 处理一直是一个问题呀(/ 摊手). 遗憾的是 kafka 目前没有保证 consumer 幂等消费的措施, 如果确实需要保证 consumer 的幂等, 可以对每条消息维持一个全局的 id, 每次消费进行去重, 当然耗费这么多的资源来实现 exactly once 的消费到底值不值, 那就得看具体业务了.
1.4 无消息丢失小结
那么到这里先来总结下无消息丢失的主要配置吧:
producer 的 acks 设置位 - 1, 同时 min.insync.replicas 设置大于 1. 并且使用带有回调的 producer API 发生消息.
默认副本数 replication.factor 设置为大于 1, 或者创建 topic 的时候指定大于 1 的副本数.
unclean.leader.election.enable 设置为 false, 防止定期副本 leader 重选举
消费者端, 自动提交位移 enable.auto.commit 设置为 false. 在消费完后手动提交位移.
那么接下来就来说说 kafka 实现精确一次 (exactly once) 处理的方法吧.
实现精确一次 (exactly once) 处理
在分布式环境下, 要实现消息一致与精确一次 (exactly once) 语义处理是很难的. 精确一次处理意味着一个消息只处理一次, 造成一次的效果, 不能多也不能少.
那么 kafka 如何能够实现这样的效果呢? 在介绍之前, 我们先来介绍其他两个语义, 至多一次 (at most once) 和至少一次(at least once).
最多一次和至少一次
最多一次就是保证一条消息只发送一次, 这个其实最简单, 异步发送一次然后不管就可以, 缺点是容易丢数据, 所以一般不采用.
至少一次语义是 kafka 默认提供的语义, 它保证每条消息都能至少接收并处理一次, 缺点是可能有重复数据.
前面有介绍过 acks 机制, 当设置 producer 客户端的 acks 是 1 的时候, broker 接收到消息就会跟 producer 确认. 但 producer 发送一条消息后, 可能因为网络原因消息超时未达, 这时候 producer 客户端会选择重发, broker 回应接收到消息, 但很可能最开始发送的消息延迟到达, 就会造成消息重复接收.
那么针对这些情况, 要如何实现精确一次处理的语义呢?
幂等的 producer
要介绍幂等的 producer 之前, 得先了解一下幂等这个词是什么意思. 幂等这个词最早起源于函数式编程, 意思是一个函数无论执行多少次都会返回一样的结果. 比如说让一个数加 1 就不是幂等的, 而让一个数取整就是幂等的. 因为这个特性所以幂等的函数适用于并发的场景下.
但幂等在分布式系统中含义又做了进一步的延申, 比如在 kafka 中, 幂等性意味着一个消息无论重复多少次, 都会被当作一个消息来持久化处理.
kafka 的 producer 默认是支持最少一次语义, 也就是说不是幂等的, 这样在一些比如支付等要求精确数据的场景会出现问题, 在 0.11.0 后, kafka 提供了让 producer 支持幂等的配置操作. 即:
props.put("enable.idempotence", ture)
在创建 producer 客户端的时候, 添加这一行配置, producer 就变成幂等的了. 注意开启幂等性的时候, acks 就自动是 "all" 了, 如果这时候手动将 ackss 设置为 0, 那么会报错.
而底层实现其实也很简单, 就是对每条消息生成一个 id 值, broker 会根据这个 id 值进行去重, 从而实现幂等, 这样一来就能够实现精确一次的语义了.
但是! 幂等的 producery 也并非万能. 有两个主要是缺陷:
幂等性的 producer 仅做到单分区上的幂等性, 即单分区消息不重复, 多分区无法保证幂等性.
只能保持单会话的幂等性, 无法实现跨会话的幂等性, 也就是说如果 producer 挂掉再重启, 无法保证两个会话间的幂等(新会话可能会重发). 因为 broker 端无法获取之前的状态信息, 所以无法实现跨会话的幂等.
事务的 producer
当遇到上述幂等性的缺陷无法解决的时候, 可以考虑使用事务了. 事务可以支持多分区的数据完整性, 原子性. 并且支持跨会话的 exactly once 处理语义, 也就是说如果 producer 宕机重启, 依旧能保证数据只处理一次.
开启事务也很简单, 首先需要开启幂等性, 即设置 enable.idempotence 为 true. 然后对 producer 发送代码做一些小小的修改.
- // 初始化事务
- producer.initTransactions();
- try {
- // 开启一个事务
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- // 提交
- producer.commitTransaction();
- } catch (KafkaException e) {
- // 出现异常的时候, 终止事务
- producer.abortTransaction();
- }
但无论开启幂等还是事务的特性, 都会对性能有一定影响, 这是必然的. 所以 kafka 默认也并没有开启这两个特性, 而是交由开发者根据自身业务特点进行处理.
以上~
来源: http://www.bubuko.com/infodetail-3464630.html