目录
1,KafkaProducer 概述
2,KafkaProducer 类图
3,KafkaProducer 简单示例
温馨提示: 整个 Kafka Client 专栏基于 kafka-2.3.0 版本.
@(本节目录)
1,KafkaProducer 概述
根据 KafkaProducer 类上的注释上来看 KafkaProducer 具有如下特征:
KafkaProducer 是线程安全的, 可以被多个线程交叉使用.
KafkaProducer 内部包含一个缓存池, 存放待发送消息, 即 ProducerRecord 队列, 与此同时会开启一个 IO 线程将 ProducerRecord 对象发送到 Kafka 集群.
KafkaProducer 的消息发送 API send 方法是异步, 只负责将待发送消息 ProducerRecord 发送到缓存区中, 立即返回, 并返回一个结果凭证 Future.
acks
KafkaProducer 提供了一个核心参数 acks 用来定义消息 "已提交" 的条件(标准), 就是 Broker 端向客户端承偌已提交的条件, 可选值如下:
0
表示生产者不关系该条消息在 broker 端的处理结果, 只要调用 KafkaProducer 的 send 方法返回后即认为成功, 显然这种方式是最不安全的, 因为 Broker 端可能压根都没有收到该条消息或存储失败.
all 或 -1
表示消息不仅需要 Leader 节点已存储该消息, 并且要求其副本 (准确的来说是 ISR 中的节点) 全部存储才认为已提交, 才向客户端返回提交成功. 这是最严格的持久化保障, 当然性能也最低.
1
表示消息只需要写入 Leader 节点后就可以向客户端返回提交成功.
retries
kafka 在生产端提供的另外一个核心属性, 用来控制消息在发送失败后的重试次数, 设置为 0 表示不重试, 重试就有可能造成消息在发送端的重复.
batch.size
kafka 消息发送者为每一个分区维护一个未发送消息积压缓存区, 其内存大小由 batch.size 指定, 默认为 16K.
但如果缓存区中不足 100 条, 但发送线程此时空闲, 是需要等到缓存区中积满 100 条才能发送还是可以立即发送呢? 默认是立即发送, 即 batch.size 的作用其实是客户端一次发送到 broker 的最大消息数量.
linger.ms
为了提高 kafka 消息发送的高吞吐量, 即控制在缓存区中未积满 batch.size 时来控制 消息发送线程的行为, 是立即发送还是等待一定时间, 如果 linger.ms 设置为 0 表示立即发送, 如果设置为大于 0, 则消息发送线程会等待这个值后才会向 broker 发送. 该参数者会增加响应时间, 但有利于增加吞吐量. 有点类似于 TCP 领域的 Nagle 算法.
buffer.memory
用于控制消息发送者缓存的总内存大小, 如果超过该值, 往缓存区中添加消息会被阻塞, 具体会在下文的消息发送流程中详细介绍, 阻塞的最大时间可通过参数 max.block.ms 设置, 阻塞超过该值会抛出超时异常.
key.serializer
指定 key 的序列化处理器.
value.serializer
指定 消息体的序列化处理器.
enable.idempotence
从 kafka0.11 版本开始, 支持消息传递幂等, 可以做到消息只会被传递一次, 通过 enable.idempotence 为 true 来开启. 如果该值设置为 true, 其 retries 将设置为 Integer.MAX_VALUE,acks 将被设置为 all. 为了确保消息发送幂等性, 必须避免应用程序端的任何重试, 并且如果消息发送 API 如果返回错误, 应用端应该记录最后成功发送的消息, 避免消息的重复发送.
从 Kafka 0.11 开始, kafka 也支持事务消息.
2,KafkaProducer 类图
在 Kafka 中, 生产者通过接口 Producer 定义, 通过该接口的方法, 我们基本可以得知 KafkaProducer 将具备如下基本能力:
void initTransactions()
初始化事务, 如果需要使用事务方法, 该方法必须首先被调用.
void beginTransaction()
开启事务.
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
- package persistent.prestige.demo.kafka;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import java.util.Properties;
- import java.util.concurrent.Future;
- public class KafkaProducerTest {
- public static void main(String[] args){
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new KafkaProducer<>(props);
- try {
- for (int i = 0; i <100; i++) {
- Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
- RecordMetadata recordMetadata = future.get();
- System.out.printf("offset:" + recordMetadata.offset());
- }
- } catch (Throwable e) {
- e.printStackTrace();
- } finally {
- producer.close();
- }
- }
- }
来源: https://www.cnblogs.com/dingwpmz/p/12127451.html