1, 搭建部署好 zookeeper 集群和 kafka 集群, 这里省略.
启动 zk:
bin/zkServer.sh start conf/zoo.cfg.
验证 zk 是否启动成功:
bin/zkServer.sh status conf/zoo.cfg.
启动 kafka:
bin/kafka-server-start.sh -daemon config/server.properties.
2, 生产者和消费者代码如下所示:
- package com.bie.kafka.producer;
- import java.util.Properties;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- //import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- /**
- *
- * @Description TODO
- * @author biehl
- * @Date 2019 年 4 月 6 日 上午 11:27:34
- *
- */
- public class ProducerTest {
- public static void main(String[] args) {
- // 构造一个 java.util.Properties 对象
- Properties props = new Properties();
- // 指定 Bootstrap.servers 属性. 必填, 无默认值. 用于创建向 kafka broker 服务器的连接.
- props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092");
- // 指定 key.serializer 属性. 必填, 无默认值. 被发送到 broker 端的任何消息的格式都必须是字节数组.
- // 因此消息的各个组件都必须首先做序列化, 然后才能发送到 broker. 该参数就是为消息的 key 做序列化只用的.
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 指定 value.serializer 属性. 必填, 无默认值. 和 key.serializer 类似. 此被用来对消息体即消息 value 部分做序列化.
- // 将消息 value 部分转换成字节数组.
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //acks 参数用于控制 producer 生产消息的持久性 (durability). 参数可选值, 0,1,-1(all).
- props.put("acks", "-1");
- //props.put(ProducerConfig.ACKS_CONFIG, "1");
- // 在 producer 内部自动实现了消息重新发送. 默认值 0 代表不进行重试.
- props.put("retries", 3);
- //props.put(ProducerConfig.RETRIES_CONFIG, 3);
- // 调优 producer 吞吐量和延时性能指标都有非常重要作用. 默认值 16384 即 16KB.
- props.put("batch.size", 323840);
- //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840);
- // 控制消息发送延时行为的, 该参数默认值是 0. 表示消息需要被立即发送, 无须关系 batch 是否被填满.
- props.put("linger.ms", 10);
- //props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
- // 指定了 producer 端用于缓存消息的缓冲区的大小, 单位是字节, 默认值是 33554432 即 32M.
- props.put("buffer.memory", 33554432);
- //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- props.put("max.block.ms", 3000);
- //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);
- // 设置 producer 段是否压缩消息, 默认值是 none. 即不压缩消息. GZIP,Snappy,LZ4
- //props.put("compression.type", "none");
- //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
- // 该参数用于控制 producer 发送请求的大小. producer 端能够发送的最大消息大小.
- //props.put("max.request.size", 10485760);
- //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
- //producer 发送请求给 broker 后, broker 需要在规定时间范围内将处理结果返还给 producer. 默认 30s
- //props.put("request.timeout.ms", 60000);
- //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
- // 使用上面创建的 Properties 对象构造 KafkaProducer 对象
- // 如果采用这种方式创建 producer, 那么就不需要显示的在 Properties 中指定 key 和 value 序列化类了呢.
- // Serializer<String> keySerializer = new StringSerializer();
- // Serializer<String> valueSerializer = new StringSerializer();
- // Producer<String, String> producer = new KafkaProducer<String, String>(props,
- // keySerializer, valueSerializer);
- Producer<String, String> producer = new KafkaProducer<>(props);
- for (int i = 0; i <100; i++) {
- // 构造好 kafkaProducer 实例以后, 下一步就是构造消息实例.
- producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));
- // 构造待发送的消息对象 ProduceRecord 的对象, 指定消息要发送到的 topic 主题, 分区以及对应的 key 和 value 键值对.
- // 注意, 分区和 key 信息可以不用指定, 由 kafka 自行确定目标分区.
- //ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-topic",
- // Integer.toString(i), Integer.toString(i));
- // 调用 kafkaProduce 的 send 方法发送消息
- //producer.send(producerRecord);
- }
- System.out.println("消息生产结束......");
- // 关闭 kafkaProduce 对象
- producer.close();
- System.out.println("关闭生产者......");
- }
- }
消费者代码如下所示:
- package com.bie.kafka.consumer;
- import java.util.Arrays;
- import java.util.Properties;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- /**
- *
- * @Description TODO
- * @author biehl
- * @Date 2019 年 4 月 6 日 下午 8:12:28
- *
- */
- public class ConsumerTest {
- public static void main(String[] args) {
- String topicName = "topic1";
- String groupId = "group1";
- // 构造 java.util.Properties 对象
- Properties props = new Properties();
- // 必须指定属性.
- props.put("bootstrap.servers", "192.168.110.130:9092,192.168.110.131:9092,192.168.110.132:9092");
- // 必须指定属性.
- props.put("group.id", groupId);
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- // 从最早的消息开始读取
- props.put("auto.offset.reset", "earliest");
- // 必须指定
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- // 必须指定
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- // 使用创建的 Properties 实例构造 consumer 实例
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- // 订阅 topic. 调用 kafkaConsumer.subscribe 方法订阅 consumer group 所需的 topic 列表
- consumer.subscribe(Arrays.asList(topicName));
- try {
- while (true) {
- // 循环调用 kafkaConsumer.poll 方法获取封装在 ConsumerRecord 的 topic 消息.
- ConsumerRecords<String, String> records = consumer.poll(1000);
- // 获取到封装在 ConsumerRecords 消息以后, 处理获取到 ConsumerRecord 对象.
- for (ConsumerRecord<String, String> record : records) {
- // 简单的打印输出
- System.out.println(
- "offset =" + record.offset()
- + ",key =" + record.key()
- + ",value =" + record.value());
- }
- }
- } catch (Exception e) {
- // 关闭 kafkaConsumer
- System.out.println("消息消费结束......");
- consumer.close();
- }
- System.out.println("关闭消费者......");
- }
- }
遇到的坑, 一开始报的错误莫名其妙, 一开始以为使用的 jar 包版本问题, 又是报 slf4j 的错误, 又是报 log4j 的错误, 又是报空指针的异常. 最后百度意外遇到了可能是本地没有将 ip 地址放到 hosts 文件里面, 果然是这个问题.
添加如下所示即可:
然后就可以开心的生产消息和消费消息了啊. 开心.
项目结构如下所示:
待续.....
来源: http://www.bubuko.com/infodetail-3061412.html