Apache Flume 是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种 Source 和 Sink 插件。本文将介绍如何使用 Apache Flume 的将日志数据实时上传到 Datahub。
- $ wget https: //github.com/aliyun/aliyun-odps-flume-plugin/releases/download/1.1.0/flume-datahub-sink-1.1.0.tar.gz
- $ tar zxvf flume - datahub - sink - 1.1.0.tar.gz $ ls flume - datahub - sink lib libext
将解压后的插件文件夹 flume-datahub-sink 移动到 Apache Flume 安装目录下
- $ mkdir {
- YOUR_FLUME_DIRECTORY
- }
- /plugins.d
- $ mv flume-datahub-sink {YOUR_FLUME_DIRECTORY}/plugins.d /
移动后,核验 Datahub Sink 插件是否已经在相应目录:
- $ ls {
- YOUR_APACHE_FLUME_DIR
- }
- /plugins.d
- flume-datahub-sink
- /
Flume 的原理、架构,以及核心组件的介绍请参考 。本文将构建一个使用 Datahub Sink 的 Flume 实例,对日志文件中的结构化数据进行解析,并上传到 Datahub Topic 中。
需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):
- #test_basic.log some,
- log,
- line1 some,
- log,
- line2...
下面将创建 Datahub Topic,并把每行日志的第一列和第三列作为一条记录写入 Topic 中。
使用创建好 Topic,schema 为 (string c1, string c2),下面假设建好的 Topic 名为 test_topic。
在 Flume 安装目录的 conf / 文件夹下创建名为 datahub_basic.conf 的文件,并输入内容如下:
- #A single - node Flume configuration
- for Datahub#Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1#Describe / configure the source a1.sources.r1.type = exec a1.sources.r1.command = cat {
- YOUR_LOG_DIRECTORY
- }
- /test_basic.log
- # Describe the sink
- a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
- a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID}
- a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
- a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY}
- a1.sinks.k1.datahub.project = test_project
- a1.sinks.k1.datahub.topic = test_topic
- a1.sinks.k1.batchSize = 1
- a1.sinks.k1.serializer = DELIMITED
- a1.sinks.k1.serializer.delimiter = ,
- a1.sinks.k1.serializer.fieldnames = c1,c2,
- a1.sinks.k1.serializer.charset = UTF-8
- a1.sinks.k1.shard.number = 1
- a1.sinks.k1.shard.maxTimeOut = 60
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 1000
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
- /
这里 serializer 配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。
配置完成后,启动 Flume 并指定 agent 的名称和配置文件路径,添加 **-Dflume.root.logger=INFO,console** 选项可以将日志实时输出到控制台。
- $ cd {
- YOUR_FLUME_DIRECTORY
- }
- $ bin / flume - ng agent - n a1 - c conf - f conf / datahub_basic.conf - Dflume.root.logger = INFO,
- console
写入成功,显示日志如下:
- ...Write success.Event count: 2...
日志数据通过 Flume 上传到 Datahub 后,可以使用 StreamCompute 流计算来进行实时分析,例如对于一些 web 网站的日志,可以实时统计各个页面的 PV/UV 等。另外,导入 Datahub 的数据也可以将数据归档至 MaxCompute 中,方便后续的离线分析。
来源: https://yq.aliyun.com/articles/66112