在 Kafka0.9 版本之前消费者保存的偏移量是在 zookeeper 中 / consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID. 新版消费者不在保存偏移量到 zookeeper 中, 而是保存在 Kafka 的一个内部主题中 "__consumer_offsets", 该主题默认有 50 个分区, 每个分区 3 个副本, 分区数量有参数 offset.topic.num.partition 设置. 通过消费者组 ID 的哈希值和该参数取模的方式来确定某个消费者组已消费的偏移量保存到__consumer_offsets 主题的哪个分区中.
Kafka 消费者 API 提供两种方法用来查询偏移量. 一个是 committed(TopicPartition partition) 方法, 这个方法返回一个 OffsetAndMetadata 对象, 通过这个对象可以获取指定分区已提交的偏移量; 另外一个方法 position(TopicPartition partition) 返回的是下一次拉取位置.
同时 Kafka 消费者还提供了重置消费偏移量的方法, seek(TopicPartition partition, long offset), 该方法用于指定消费起始位置, 另外还有 seekToBeginning() 和 seekToEnd(), 从名字就能看出来是干嘛的.
偏移量提交有自动和手动, 默认是自动 (enable.auto.commit = true). 自动提交的话每隔多久自动提交一次呢? 这个由消费者协调器参数 auto.commit.interval.ms 毫秒执行一次提交. 有些场景我们需要手动提交偏移量, 尤其是在一个长事务中并且保证消息不被重复消费以及消息不丢失, 比如生产者一个订单提交消息, 消费者拿到后要扣减库存, 扣减成功后该消息才能提交, 所以在这种场景下需要手动提交, 因为库存扣减失败这个消息就不能消费, 同时客户这个订单状态也不能是成功. 手动提交也有两种一个是同步提交一个是异步提交, 其区别就是消费者线程是否阻塞. 如果使用手动提交就要关闭自动提交, 因为自动提交默认是开启的.
来源: http://www.bubuko.com/infodetail-2695395.html