ctrl+H
new 它的实现类
ctrl+r 替换
格式化 ctrl+alt+l
- ctrl+f
- ctrl+alt+v
替换
< "
< <>>
Kafka 生产者 Java API
创建生产者
不带回调函数的
- public class CustomProducer {
- public static void main(String[] args) throws InterruptedException {
- Properties properties = new Properties();
- //kafka 地址
- properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092");
- //acks=-1
- properties.put("acks", "all");
- properties.put("retries", 0);
- // 基于大小的批处理
- properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- // 基于时间的批处理
- properties.put("linger.ms", 1);
- // 客户端缓存大小
- properties.put("buffer.memory", 33554432);
- //k v 序列化
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer<String, String> producer = new KafkaProducer<String, String>(properties);
- for (int i = 0; i <9; i++){
- producer.send(new ProducerRecord<String, String>("first",""+ i,"Hello" + i ));
- }
- //Thread.sleep(1000);
- producer.close(); // 忘记 close 关了, 它就是基于批处理的条件 ( 基于大小的批处理; 基于时间的批处理, 看是否达到, 没有达到就不会 send;)
- }
- }
- new producer<String, String>( "主题", 分区 int, "key", "value" )
带回调函数
带回调函数的 producer, 每发一条消息调用一次回调函数
不管有没有发送成功
- public class CustomProducerCompletion {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092");
- properties.put("acks", "all");
- properties.put("retries", 2);
- properties.put("batch.size", 16384);
- properties.put("linger.ms", 1);
- properties.put("buffer.memory", 33554432);
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 自定义分区 ProducerConfig.PARTITIONER_CLASS_CONFIG
- //properties.put("partitioner.class", "com.atguigu.kafka.producer.CustomPartitioner");
- // 拦截器
- properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
- Arrays.asList("com.atguigu.kafka.interceptor.TimeStampInterceptor","com.atguigu.kafka.interceptor.CountInterceptor"));
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
- for (int i = 0; i <9; i++){
- kafkaProducer.send(new ProducerRecord<String, String>("first", "1", "Hi" + i), new Callback() {
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (recordMetadata != null){
- System.out.println("Topic:" + recordMetadata.topic() + "\t" +
- "Partition:" + recordMetadata.partition() + "\t" + "offset:" + recordMetadata.offset()
- );
- }
- }
- });
- }
- kafkaProducer.close();
- }
- }
自定义分区
指定分区重写 key 的规则
- public class CustomPartitioner implements Partitioner {
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- return 0; // 控制分区
- }
- public void close() {
- }
- /**
- * 可以添加属性
- * @param config
- */
- public void configure(Map<String, ?> config) {
- }
- }
Kafka 消费者 Java API
高级 API
不需手动管理 offset
poll 超时时间
subscribe 订阅主题
可同时消费多个主题
数组 - Arrays.asList-> 集合
- // 高级 API
- public class CustomConsumer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- // 定义 kafka 集群地址
- properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092");
- // 消费者组 id
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kris");
- // 是否自动提交偏移量:(kafka 集群管理)
- properties.put("enable.auto.commit", "true");
- // 间隔多长时间提交一次 offset
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- //key,value 的反序列化
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
- kafkaConsumer.subscribe(Arrays.asList("first"));
- while (true){
- ConsumerRecords<String, String> records = kafkaConsumer.poll(100); // 定义 Consumer, poll 拉数据
- for (ConsumerRecord<String, String> record : records) {
- System.out.println("Topic:" + record.topic() + "\t" +
- "Partition:" + record.partition() + "\t" + "Offset:" +record.offset()
- + "\t" + "key:" + record.key() + "\t" + "value:" + record.value());
- }
- }
- }
- }
低级 API
leader
offset
保存 offset
消息
- public class LowLevelConsumer {
- public static void main(String[] args) {
- //1. 集群
- ArrayList<String> list = new ArrayList<>();
- list.add("hadoop101");
- list.add("hadoop102");
- list.add("hadoop103");
- //2. 主题
- String topic = "first";
- //3. 分区
- int partition = 2;
- //4.offset
- long offset = 10;
- //5. 获取 leader
- String leader = getLeader(list, topic, partition);
- //6. 连接 leader 获取数据
- getData(leader, topic, partition, offset);
- }
- private static void getData(String leader, String topic, int partition, long offset) {
- //1. 创建 SimpleConsumer
- SimpleConsumer consumer = new SimpleConsumer(leader, 9092, 2000, 1024 * 1024 * 2, "getData");
- //2. 发送请求
- //3. 构建请求对象 FetchRequestBuilder
- FetchRequestBuilder builder = new FetchRequestBuilder();
- FetchRequestBuilder requestBuilder = builder.addFetch(topic, partition, offset, 1024 * 1024);
- FetchRequest fetchRequest = requestBuilder.build();
- //4. 获取响应
- FetchResponse fetchResponse = consumer.fetch(fetchRequest);
- //5. 解析响应
- ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
- //6. 遍历
- for (MessageAndOffset messageAndOffset : messageAndOffsets) {
- long message_offset = messageAndOffset.offset();
- Message message = messageAndOffset.message();
- //7. 解析 message
- ByteBuffer byteBuffer = message.payload(); //payload 是有效负载
- byte[] bytes = new byte[byteBuffer.limit()];
- byteBuffer.get(bytes);
- //8. 获取数据
- System.out.println("offset:" + message_offset + "\t" + "value:" + new String(bytes));
- }
- }
- private static String getLeader(ArrayList<String> list, String topic, int partition) {
- //1. 循环发送请求, 获取 leader
- for (String host : list) {
- //2. 创建 SimpleConsumer 对象
- SimpleConsumer consumer = new SimpleConsumer(
- host,
- 9092,
- 2000,
- 1024*1024,
- "getLeader"
- );
- //3. 发送获取 leader 请求
- //4. 构造请求 TopicMetadataRequest
- TopicMetadataRequest request = new TopicMetadataRequest(Arrays.asList(topic));
- //5. 获取响应 TopicMetadataResponse
- TopicMetadataResponse response = consumer.send(request);
- //6. 解析响应
- List<TopicMetadata> topicsMetadata = response.topicsMetadata();
- //7. 遍历 topicsMetadata
- for (TopicMetadata topicMetadata : topicsMetadata) {
- List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
- //8. 遍历 partitionsMetadata
- for (PartitionMetadata partitionMetadata : partitionsMetadata) {
- //9. 判断
- if (partitionMetadata.partitionId() == partition){
- BrokerEndPoint endPoint = partitionMetadata.leader();
- return endPoint.host();
- }
- }
- }
- }
- return null;
- }
- }
Kafka producer 拦截器
flume - 事件
flume 的拦截器链:
kafka - 消息
每发送一条数据调用一次 onSend 方法
接收数据调用回调函数之前调用 onAcknoeledgement
https://blog.csdn.net/stark_summer/article/details/50144591
Kafka 与 Flume 比较
在企业中必须要清楚流式数据采集框架 flume 和 kafka 的定位是什么:
flume:cloudera 公司研发:
适合多个生产者;
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与 Hadoop 生态圈对接的操作.
kafka:linkedin 公司研发:
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作, 支持 replication.
因此我们常用的一种模型是:
线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS
- VIM flume-kafka.conf
- # define
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
- a1.sources.r1.shell = /bin/bash -c
- # sink
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.kafka.Bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
- a1.sinks.k1.kafka.topic = first
- a1.sinks.k1.kafka.flumeBatchSize = 20
- a1.sinks.k1.kafka.producer.acks = 1
- a1.sinks.k1.kafka.producer.linger.ms = 1
- # channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # bind
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
tail -F 动态实时 -c 0 从 0 行开始监控
- [kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-kafka.conf
- [kris@hadoop101 datas]$ cat> flume.log
- Hello
- [kris@hadoop101 kafka]$ bin/kafka-console-consumer.sh --Bootstrap-server hadoop101:9092 --topic first
- Hello
来源: http://www.bubuko.com/infodetail-2974878.html