架构分布式日志队列, 标题自己都看着唬人, 其实就是一个日志收集的功能, 只不过中间加了一个 Kafka 做消息队列罢了
kafka 介绍
Kafka 是由 Apache 软件基金会开发的一个开源流处理平台, 由 Scala 和 Java 编写 Kafka 是一种高吞吐量的分布式发布订阅消息系统, 它可以处理消费者规模的网站中的所有动作流数据 这种动作 (网页浏览, 搜索和其他用户的行动) 是在现代网络上的许多社会功能的一个关键因素 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决
特性
Kafka 是一种高吞吐量的分布式发布订阅消息系统, 有如下特性:
通过 O(1)的磁盘数据结构提供消息的持久化, 这种结构对于即使数以 TB 的消息存储也能够保持长时间的稳定性能
高吞吐量: 即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息
支持通过 Kafka 服务器和消费机集群来分区消息
支持 Hadoop 并行数据加载
主要功能
发布和订阅消息流, 这个功能类似于消息队列, 这也是 kafka 归类为消息队列框架的原因
以容错的方式记录消息流, kafka 以文件的方式来存储消息流
可以再消息发布的时候进行处理
使用场景
在系统或应用程序之间构建可靠的用于传输实时数据的管道, 消息队列功能
构建实时的流数据处理程序来变换或处理数据流, 数据处理功能
消息传输流程
相关术语介绍
Broker
Kafka 集群包含一个或多个服务器, 这种服务器被称为 broker
Topic
每条发布到 Kafka 集群的消息都有一个类别, 这个类别被称为 Topic(物理上不同 Topic 的消息分开存储, 逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
Partition
Partition 是物理上的概念, 每个 Topic 包含一个或多个 Partition.
Producer
负责发布消息到 Kafka broker
Consumer
消息消费者, 向 Kafka broker 读取消息的客户端
Consumer Group
每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name, 若不指定 group name 则属于默认的 group)
Kafka 安装
环境
LinuxJDKZookeeper
下载二进制程序
wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
安装
- tar -zxvf kafka_2.11-0.10.0.1.tgz
- cd kafka_2.11-0.10.0.1
目录说明
bin 启动, 停止等命令
config 配置文件
libs 类库
参数说明
- ######################### 参数解释 ##############################
- broker.id=0 #当前机器在集群中的唯一标识, 和 zookeeper 的 myid 性质一样
- port=9092 #当前 kafka 对外提供服务的端口默认是 9092
- host.name=192.168.1.170 #这个参数默认是关闭的
- num.network.threads=3 #这个是 borker 进行网络处理的线程数
- num.io.threads=8 #这个是 borker 进行 I/O 处理的线程数
- log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录, 这个目录可以配置为, 逗号分割的表达式, 上面的 num.io.threads 要大于这个目录的个数这个目录, 如果配置多个目录, 新创建的 topic 他把消息持久化的地方是, 当前以逗号分割的目录中, 那个分区数最少就放那一个
- socket.send.buffer.bytes=102400 #发送缓冲区 buffer 大小, 数据不是一下子就发送的, 先回存储到缓冲区了到达一定的大小后在发送, 能提高性能
- socket.receive.buffer.bytes=102400 #kafka 接收缓冲区大小, 当数据到达一定大小后在序列化到磁盘
- socket.request.max.bytes=104857600 #这个参数是向 kafka 请求消息或者向 kafka 发送消息的请请求的最大数, 这个值不能超过 java 的堆栈大小
- num.partitions=1 #默认的分区数, 一个 topic 默认 1 个分区数
- log.retention.hours=168 #默认消息的最大持久化时间, 168 小时, 7 天
- message.max.byte=5242880 #消息保存的最大值 5M
- default.replication.factor=2 #kafka 保存消息的副本数, 如果一个副本失效了, 另一个还可以继续提供服务
- replica.fetch.max.bytes=5242880 #取消息的最大直接数
- log.segment.bytes=1073741824 #这个参数是: 因为 kafka 的消息是以追加的形式落地到文件, 当超过这个值的时候, kafka 会新起一个文件
- log.retention.check.interval.ms=300000 #每隔 300000 毫秒去检查上面配置的 log 失效时间(log.retention.hours=168 ), 到目录查看是否有过期的消息如果有, 删除
- log.cleaner.enable=false #是否启用 log 压缩, 一般不用启用, 启用的话可以提高性能
- zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218 #设置 zookeeper 的连接端口如果非集群配置一个地址即可
- ######################### 参数解释 ##############################
启动 kafka
启动 kafka 之前要启动相应的 zookeeper 集群自行安装, 这里不做说明
- # 进入到 kafka 的 bin 目录
- ./kafka-server-start.sh -daemon ../config/server.properties
Kafka 集成
环境
spring-bootelasticsearchkafka
pom.xml 引入:
- <!-- kafka 消息队列 -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>1.1.1.RELEASE</version>
- </dependency>
生产者
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- /**
- * 生产者
- * 创建者 科帮网
- * 创建时间 2018 年 2 月 4 日
- */
- @Configuration
- @EnableKafka
- public class KafkaProducerConfig {
- @Value("${kafka.producer.servers}")
- private String servers;
- @Value("${kafka.producer.retries}")
- private int retries;
- @Value("${kafka.producer.batch.size}")
- private int batchSize;
- @Value("${kafka.producer.linger}")
- private int linger;
- @Value("${kafka.producer.buffer.memory}")
- private int bufferMemory;
- public Map<String, Object> producerConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
- props.put(ProducerConfig.RETRIES_CONFIG, retries);
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
- props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return props;
- }
- public ProducerFactory<String, String> producerFactory() {
- return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
- return new KafkaTemplate<String, String>(producerFactory());
- }
- }
消费者
- mport java.util.HashMap;
- import java.util.Map;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
- /**
- * 消费者
- * 创建者 科帮网
- * 创建时间 2018 年 2 月 4 日
- */
- @Configuration
- @EnableKafka
- public class KafkaConsumerConfig {
- @Value("${kafka.consumer.servers}")
- private String servers;
- @Value("${kafka.consumer.enable.auto.commit}")
- private boolean enableAutoCommit;
- @Value("${kafka.consumer.session.timeout}")
- private String sessionTimeout;
- @Value("${kafka.consumer.auto.commit.interval}")
- private String autoCommitInterval;
- @Value("${kafka.consumer.group.id}")
- private String groupId;
- @Value("${kafka.consumer.auto.offset.reset}")
- private String autoOffsetReset;
- @Value("${kafka.consumer.concurrency}")
- private int concurrency;
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(concurrency);
- factory.getContainerProperties().setPollTimeout(1500);
- return factory;
- }
- public ConsumerFactory<String, String> consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> propsMap = new HashMap<>();
- propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
- propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
- propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
- propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
- propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
- return propsMap;
- }
- @Bean
- public Listener listener() {
- return new Listener();
- }
- }
日志监听
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- import com.itstyle.es.common.utils.JsonMapper;
- import com.itstyle.es.log.entity.SysLogs;
- import com.itstyle.es.log.repository.ElasticLogRepository;
- /**
- * 扫描监听
- * 创建者 科帮网
- * 创建时间 2018 年 2 月 4 日
- */
- @Component public class Listener {
- protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowired private ElasticLogRepository elasticLogRepository;@KafkaListener(topics = {
- "itstyle"
- }) public void listen(ConsumerRecord < ?, ?>record) {
- logger.info("kafka 的 key:" + record.key());
- logger.info("kafka 的 value:" + record.value());
- if (record.key().equals("itstyle_log")) {
- try {
- SysLogs log = JsonMapper.fromJsonString(record.value().toString(), SysLogs.class);
- logger.info("kafka 保存日志:" + log.getUsername());
- elasticLogRepository.save(log);
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
测试日志传输
- /**
- * kafka 日志队列测试接口
- */
- @GetMapping(value = "kafkaLog") public@ResponseBody String kafkaLog() {
- SysLogs log = new SysLogs();
- log.setUsername("红薯");
- log.setOperation("开源中国社区");
- log.setMethod("com.itstyle.es.log.controller.kafkaLog()");
- log.setIp("192.168.1.80");
- log.setGmtCreate(new Timestamp(new Date().getTime()));
- log.setExceptionDetail("开源中国社区");
- log.setParams("{'name':'码云','type':'开源'}");
- log.setDeviceType((short) 1);
- log.setPlatFrom((short) 1);
- log.setLogType((short) 1);
- log.setDeviceType((short) 1);
- log.setId((long) 200000);
- log.setUserId((long) 1);
- log.setTime((long) 1);
- // 模拟日志队列实现
- String json = JsonMapper.toJsonString(log);
- kafkaTemplate.send("itstyle", "itstyle_log", json);
- return "success";
- }
Kafka 与 Redis
之前简单的介绍过, Javaweb 项目架构之 Redis 分布式日志队列, 有小伙伴们聊到, Redis PUB/SUB 没有任何可靠性保障, 也不会持久化当然了, 原项目中仅仅是记录日志, 并不是十分重要的信息, 可以有一定程度上的丢失
Kafka 与 Redis PUB/SUB 之间最大的区别在于 Kafka 是一个完整的分布式发布订阅消息系统, 而 Redis PUB/SUB 只是一个组件而已
使用场景
Redis PUB/SUB
消息持久性需求不高吞吐量要求不高可以忍受数据丢失
Kafka
高可用高吞吐持久性多样化的消费处理模型
开源项目源码(参考):https://gitee.com/52itstyle/spring-boot-elasticsearch
作者: 小柒
来源: https://yq.aliyun.com/articles/435779