Elasticsearch 作为当前主流的全文检索引擎, 除了强大的全文检索能力和高扩展性之外, 对多种数据源的兼容能力也是其成功的秘诀之一. 而 Elasticsearch 强大的数据源兼容能力, 主要来源于其核心组件之一的 Logstash, Logstash 通过插件的形式实现了对多种数据源的输入和输出. Kafka 是一种高吞吐量的分布式发布订阅消息系统, 是一种常见的数据源, 也是 Logstash 支持的众多输入输出源的其中一个. 本文将从实践的角度, 研究使用 Logstash Kafka Input 插件实现将 Kafka 中数据导入到 Elasticsearch 的过程.
使用 Logstash Kafka 插件连接 Kafka 和 Elasticsearch
1 Logstash Kafka input 插件简介
Logstash Kafka Input 插件使用 Kafka API 从 Kafka topic 中读取数据信息, 使用时需要注意 Kafka 的版本及对应的插件版本是否一致. 该插件支持通过 SSL 和 Kerveros SASL 方式连接 Kafka. 另外该插件提供了 group 管理, 并使用默认的 offset 管理策略来操作 Kafka topic.
Logstash 默认情况下会使用一个单独的 group 来订阅 Kafka 消息, 每个 Logstash Kafka Consumer 会使用多个线程来增加吞吐量. 当然也可以多个 Logstash 实例使用同一个 group_id, 来均衡负载. 另外建议把 Consumer 的个数设置为 Kafka 分区的大小, 以提供更好的性能.
2 测试环境准备
2.1 创建 Elasticsearch 集群
为了简化搭建过程, 本文使用了腾讯云 Elasticsearch service https://cloud.tencent.com/product/es . 腾讯云 Elasticsearch service 不仅可以实现 Elasticsearch 集群的快速搭建, 还提供了内置 Kibana, 集群监控, 专用主节点, Ik 分词插件等功能, 极大的简化了 Elasticsearch 集群的创建和管理工作.
2.2 创建 Kafka 服务
Kafka 服务的搭建采用腾讯云 CKafka https://cloud.tencent.com/product/CKafka 来完成. 与 Elasticsearch Service 一样, 腾讯云 CKafka 可以实现 Kafka 服务的快速创建, 100% 兼容开源 Kafka API(0.9 版本).
2.3 服务器
除了准备 Elasticsearch 和 Kafka, 另外还需要准备一台服务器, 用于运行 Logstash 以连接 Elasticsearch 和 Kafka. 本文采用腾讯云 CVM https://cloud.tencent.com/product/cvm 服务器
2.4 注意事项
1) 需要将 Elasticsearch,Kafka 和服务器创建在同一个网络下, 以便实现网络互通. 由于本文采用的是腾讯云相关的技术服务, 因此只需要将 Elasticsearch service,CKafka 和 CVM 创建在同一个私有网路 (VPC) 下即可.
2) 注意获取 Elasticsearch serivce,CKafka 和 CVM 的内网地址和端口, 以便后续服务使用
本次测试中:
服务 | ip | port |
---|---|---|
Elasticsearch service | 192.168.0.8 | 9200 |
Ckafka | 192.168.13.10 | 9092 |
CVM | 192.168.0.13 | - |
3 使用 Logstash 连接 Elasticsearch 和 Kafka
3.1 Kafka 准备
可以参考[CKafka 使用入门]
按照上面的教程
1) 创建名为 kafka_es_test 的 topic
2) 安装 JDK
3) 安装 Kafka 工具包
4) 创建 producer 和 consumer 验证 kafka 功能
3.2 安装 Logstash
Logstash 的安装和使用可以参考[一文快速上手 Logstash]
3.3 配置 Logstash Kafka input 插件
创建 kafka_test_pipeline.conf 文件内容如下:
- input{
- kafka{
- bootstrap_servers=>"192.168.13.10:9092"
- topics=>["kafka_es_test"]
- group_id=>"logstash_kafka_test"
- }
- }
- output{
- Elasticsearch{
- hosts=>["192.168.0.8:9200"]
- }
- }
其中定义了一个 kafka 的 input 和一个 Elasticsearch 的 output
对于 Kafka input 插件上述三个参数为必填参数, 除此之外还有一些对插件行为进行调整的一些参数如:
auto_commit_interval_ms 用于设置 Consumer 提交 offset 给 Kafka 的时间间隔
consumer_threads 用于设置 Consumer 的线程数, 默认为 1, 实际中应设置与 Kafka Topic 分区数一致
fetch_max_wait_ms 用于指定 Consumer 等待一个 fetch 请求达到 fetch_min_bytes 的最长时间
fetch_min_bytes 用于指定 Consumer fetch 请求应返回的最小数据量
topics_pattern 用于通过正则订阅符合某一规则的一组 topic
更多参数参考:[Kafka Input Configuration Options]
3.4 启动 Logstash
以下操作在 Logstash 根目录中进行
1) 验证配置
./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit
如有错误, 根据提示修改配置文件. 若配置正确会得到如下结果
- Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties
- [2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {
- :module_name=>"netflow", :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"
- }
- [2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {
- :module_name=>"fb_apache", :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"
- }
- Configuration OK
- [2018-11-11T15:24:01,746][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
2) 启动 Logstash
./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic
观察日志是否有错误提示, 并及时处理
3.4 启动 Kafka Producer
以下操作在 Kafka 工具包根目录下进行
./bin/kafka-console-producer.sh --broker-list 192.168.13.10:9092 --topic kafka_es_test
写入测试数据
This is a message
3.5 Kibana 验证结果
登录 Elasticsearch 对应 Kibana, 在 Dev Tools 中进行如下操作
1) 查看索引
GET _cat/indices
可以看到一个名为 logstash-xxx.xx.xx 的索引被创建成功
- green open .kibana QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb
- green open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb
2) 查看写入的数据
GET logstash-2018.11.11/_search
可以看到数据已经被成功写入
- {
- "took": 0,
- "timed_out": false,
- "_shards": {
- "total": 5,
- "successful": 5,
- "skipped": 0,
- "failed": 0
- },
- "hits": {
- "total": 1,
- "max_score": 1,
- "hits": [
- {
- "_index": "logstash-2018.11.11",
- "_type": "logs",
- "_id": "AWcBsEegMu-Dkjm1ap3H",
- "_score": 1,
- "_source": {
- "message": "This is a message",
- "@version": "1",
- "@timestamp": "2018-11-11T07:33:09.079Z"
- }
- }
- ]
- }
- }
4 总结
Logstash 作为 Elastic Stack 中数据采集和处理的核心组件, 为 Elasticsearch 提供了强大的数据源兼容能力. 从测试过程可以看出, 使用 Logstash 实现 kafka 和 Elaticsearch 的连接过程相当简单方便. 另外 Logstash 的数据处理功能, 也使得采用该架构的系统对数据映射和处理有天然的优势.
然而, 使用 Logstash 实现 Kafka 和 Elasticsearch 的连接, 并不是连接 Kafka 和 Elasticsearch 的唯一方案, 另一种常见的方案是使用 Kafka Connect, 可以参考 "当 Elasticsearch 遇见 Kafka--Kafka Connect"
来源: https://www.qcloud.com/developer/article/1362320