当我们使用 kafka 向指定 Topic 发送消息时, 如果该 Topic 具有多个 partition, 无论消费者有多少, 最终都会保证一个 partition 内的消息只会被一个 Consumer group 中的一个 Consumer 消费, 也就是说同一 Consumer group 中的多个 Consumer 自动会起到负载均衡的效果.
1, 消息构造
下面我们就针对调用 kafka API 发送消息到 Topic 时 partition 的分配策略, 分析下其内部具体的源码码实现.
首先看下 kafka API 中消息体 ProducerRecord 类的构造函数, 可以看到构造消息时可指定该消息要发送的 Topic,partition,key,value 等关键信息.
- /**
- * Creates a record to be sent to a specified topic and partition
- *
- * @param topic The topic the record will be appended to
- * @param partition The partition to which the record should be sent
- * @param key The key that will be included in the record
- * @param value The record contents
- * @param headers The headers that will be included in the record
- */
- public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
- this(topic, partition, null, key, value, headers);
- }
- /**
- * Creates a record to be sent to a specified topic and partition
- *
- * @param topic The topic the record will be appended to
- * @param partition The partition to which the record should be sent
- * @param key The key that will be included in the record
- * @param value The record contents
- */
- public ProducerRecord(String topic, Integer partition, K key, V value) {
- this(topic, partition, null, key, value, null);
- }
- /**
- * Create a record to be sent to Kafka
- *
- * @param topic The topic the record will be appended to
- * @param key The key that will be included in the record
- * @param value The record contents
- */
- public ProducerRecord(String topic, K key, V value) {
- this(topic, null, null, key, value, null);
- }
2, 分发策略
在实际使用中, 我们一般不会指定消息发送的具体 partition, 最多只会传入 key 值, 类似下面这种方式:
producer.send(new ProducerRecord<Object, Object>(topic, key, data));
而 kafka 也会根据你传入 key 的 hash 值, 通过取余的方法, 尽可能保证消息能够相对均匀的分摊到每个可用的 partition 上;
下面是 kafka 内部默认的分发策略:
- public class DefaultPartitioner implements Partitioner {
- private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
- public void configure(Map<String, ?> configs) {}
- /**
- * Compute the partition for the given record.
- *
- * @param topic The topic name
- * @param key The key to partition on (or null if no key)
- * @param keyBytes serialized key to partition on (or null if no key)
- * @param value The value to partition on or null
- * @param valueBytes serialized value to partition on or null
- * @param cluster The current cluster metadata
- */
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // 获取该 topic 的分区列表
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- // 如果 key 值为 null
- if (keyBytes == null) {
- // 维护一个 key 为 topic 的 ConcurrentHashMap, 并通过 CAS 操作的方式对 value 值执行递增 + 1 操作
- int nextValue = nextValue(topic);
- // 获取该 topic 的可用分区列表
- List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
- if (availablePartitions.size()> 0) {// 如果可用分区大于 0
- // 执行求余操作, 保证消息落在可用分区上
- int part = Utils.toPositive(nextValue) % availablePartitions.size();
- return availablePartitions.get(part).partition();
- } else {
- // 没有可用分区的话, 就给出一个不可用分区
- return Utils.toPositive(nextValue) % numPartitions;
- }
- } else {
- // 通过计算 key 的 hash, 确定消息分区
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
- }
- private int nextValue(String topic) {
- // 获取一个 AtomicInteger 对象
- AtomicInteger counter = topicCounterMap.get(topic);
- if (null == counter) {// 如果为空
- // 生成一个随机数
- counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
- // 维护到 topicCounterMap 中
- AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
- if (currentCounter != null) {
- counter = currentCounter;
- }
- }
- // 返回值并执行递增
- return counter.getAndIncrement();
- }
- public void close() {}
- }
3, 自定义负载策略
我们也可以通过实现 Partitioner 接口, 自定义分发策略, 看下具体实现
自定义实现 Partitioner 接口
- /**
- * 自定义实现 Partitioner 接口
- *
- */
- public class KeyPartitioner implements Partitioner {
- /**
- * 实现具体分发策略
- */
- @Override
- public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
- List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);// 拉取可用的 partition
- if (key == null||key.equals("")) {
- int random = (int) (Math.random() * 10);
- int part = random % availablePartitions.size();
- return availablePartitions.get(part).partition();
- }
- return Math.abs(key.toString().hashCode() % 6);
- }
- @Override
- public void configure(Map<String, ?> configs) {
- // TODO Auto-generated method stub
- }
- @Override
- public void close() {
- // TODO Auto-generated method stub
- }
- }
同时在初始化 kafka 生产者时, 增加自定义配置
- Properties properties = new Properties();
- properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,KeyPartitioner.class); // 加入自定义的配置
- producer = new KafkaProducer<Object, Object>(properties);
4, 总结
以上是对 kafka 消息分发的策略进行一定的分析与自定义扩展, 希望对大家在使用 kafka 时有所帮助, 其中如有不足与不正确的地方还望指出与海涵.
来源: https://www.cnblogs.com/dafanjoy/p/11394485.html