一. 监控端口数据
首先启动 Flume 任务, 监控本机 44444 端口, 服务端;
然后通过 netcat 工具向本机 44444 端口发送消息, 客户端;
最后 Flume 将监听的数据实时显示在控制台.
1. 安装 netcat
sudo yum install -y nc
功能描述: netstat 命令是一个监控 TCP/IP 网络的非常有用的工具, 它可以显示路由表, 实际的网络连接以及每一个网络接口设备的状态信息.
基本语法: netstat [选项]
选项参数:
-t 或 --tcp: 显示 TCP 传输协议的连线状况;
-u 或 --udp: 显示 UDP 传输协议的连线状况;
-n 或 --numeric: 直接使用 ip 地址, 而不通过域名服务器;
-l 或 --listening: 显示监控中的服务器的 Socket;
-p 或 --programs: 显示正在使用 Socket 的程序识别码 (PID) 和程序名称;
2. 判断端口是否被占用
sudo netstat -tunlp | grep 44444
3. 创建 Flume Agent 配置文件 flume-netcat-logger.conf
- # 在 flume 目录下创建 job 文件夹并进入 job 文件夹.
- mkdir job
- cd job/
- # 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
- touch flume-netcat-logger.conf
在 flume-netcat-logger.conf 文件中添加如下内容.
- # Name the components on this agent
- #a1 表示 agent 的名称
- a1.sources = r1 #r1 表示 a1 的输入源 source
- a1.sinks = k1 #k1 表示 a1 的输出目的地 sink
- a1.channels = c1 #c1 表示 a1 的缓冲区 channel
- # Describe/configure the source
- a1.sources.r1.type = netcat #表示 a1 的输入源为 netcat 端口类型
- a1.sources.r1.bind = localhost #表示 a1 监听的主机地址
- a1.sources.r1.port = 44444 #表示 a1 监听的端口
- # Describe the sink
- a1.sinks.k1.type = logger #表示 a1 的输出目的地是控制台的 logger 类型
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory #表示 a1 的 channel 类型为 memory 类型
- a1.channels.c1.capacity = 1000 #表示 a1 的 channel 总容量是 1000 个 event
- a1.channels.c1.transactionCapacity = 100 #表示 a1 的 channel 传输时收集到 100 条 event 后再去提交事务
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1 #表示将 r1 和 c1 连接起来
- a1.sinks.k1.channel = c1 #表示将 k1 和 c1 连接起来
其他参数或参数详解, 请参阅官方手册 http://flume.apache.org/FlumeUserGuide.html
4. 开启 Flume 监听端口
- # 第一种写法:
- bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
- # 第二种写法:
- bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
开启后会阻塞
参数说明:
--conf conf/ : 表示配置文件存储在 conf / 目录
--name a1 : 表示给 agent 起名为 a1
--conf-file job/flume-netcat.conf :flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件.
-Dflume.root.logger==INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值, 并将控制台日志打印级别设置为 INFO 级别. 日志级别包括: log,info,warn,error.
5. 使用 netcat 工具向本机的 44444 端口发送内容
6. 在 Flume 监听页查看接收数据
二. 实时读取本地文件到 HDFS
1. 让 Flume 持有 Hadoop 相关 jar 包
将 commons-configuration-1.6.jar,
- hadoop-auth-2.7.2.jar,
- hadoop-common-2.7.2.jar,
- hadoop-hdfs-2.7.2.jar,
- commons-io-2.4.jar,
- htrace-core-3.1.0-incubating.jar
拷贝到 / opt/module/flume/lib 文件夹下(如果已经持有的话, 略过).
2. 创建 flume-file-hdfs.conf 文件
- # 在 hob 目录下创建文件
- touch flume-file-hdfs.conf
要想读取 Linux 系统中的文件, 就得按照 Linux 命令的规则执行命令. 由于 Hive 日志在 Linux 系统中所以读取文件的类型选择: exec 即 execute 执行的意思. 表示执行 Linux 命令来读取文件
在 flume-file-hdfs.conf 中添加如下内容
- # Name the components on this agent
- a2.sources = r2
- a2.sinks = k2
- a2.channels = c2
- # Describe/configure the source
- a2.sources.r2.type = exec #定义 source 类型为 exec 可执行文件
- a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log #要执行的 Linux 命令
- a2.sources.r2.shell = /bin/bash -c #执行 shell 脚本的绝对路径
- # Describe the sink
- a2.sinks.k2.type = hdfs #sink 类型为 hdfs
- a2.sinks.k2.hdfs.path = hdfs://hadoop100:9000/flume/%Y%m%d/%H #上传文件再 hdfs 上的路径 转义序列的详解见下表
- # 上传文件的前缀
- a2.sinks.k2.hdfs.filePrefix = logs-
- # 是否按照时间滚动文件夹
- a2.sinks.k2.hdfs.round = true
- # 多少时间单位创建一个新的文件夹
- a2.sinks.k2.hdfs.roundValue = 1
- # 重新定义时间单位
- a2.sinks.k2.hdfs.roundUnit = hour
- # 是否使用本地时间戳
- a2.sinks.k2.hdfs.useLocalTimeStamp = true
- # 积攒多少个 Event 才 flush 到 HDFS 一次
- a2.sinks.k2.hdfs.batchSize = 1000
- # 设置文件类型, 可支持压缩
- a2.sinks.k2.hdfs.fileType = DataStream
- # 多久生成一个新的文件
- a2.sinks.k2.hdfs.rollInterval = 60
- # 设置每个文件的滚动大小
- a2.sinks.k2.hdfs.rollSize = 134217700
- # 文件的滚动与 Event 数量无关
- a2.sinks.k2.hdfs.rollCount = 0
- # Use a channel which buffers events in memory
- a2.channels.c2.type = memory
- a2.channels.c2.capacity = 1000
- a2.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r2.channels = c2
- a2.sinks.k2.channel = c2
注意 : 对于所有与时间相关的转义序列, Event Header 中必须存在以 "timestamp" 的 key(除非 hdfs.useLocalTimeStamp 设置为 true, 此方法会使用 TimestampInterceptor 自动添加 timestamp).
3. 开启 Flume 监控
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
4. 开启 hdfs 和 hive, 操作 hive 产生日志
- # 开启 hdfs
- sbin/start-dfs.sh
- # 开启 hive 产生日志
- bin/hive
5. 在 HDFS 上查看文件
三. 实时读取目录文件到 HDFS
1. 创建配置文件 flume-dir-hdfs.conf
- # 再 job 目录下创建文件
- touch flume-dir-hdfs.conf
添加以下内容
- a3.sources = r3
- a3.sinks = k3
- a3.channels = c3
- # Describe/configure the source
- #source 类型为 spooldir
- a3.sources.r3.type = spooldir
- # 监控的目录
- a3.sources.r3.spoolDir = /opt/module/flume/upload
- # 文件上传完后的文件后缀
- a3.sources.r3.fileSuffix = .COMPLETED
- # 是否有文件头
- a3.sources.r3.fileHeader = true
- # 忽略所有以. tmp 结尾的文件, 不上传
- a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
- # Describe the sink
- a3.sinks.k3.type = hdfs
- a3.sinks.k3.hdfs.path = hdfs://hadoop100:9000/flume/upload/%Y%m%d/%H
- # 上传文件的前缀
- a3.sinks.k3.hdfs.filePrefix = upload-
- # 是否按照时间滚动文件夹
- a3.sinks.k3.hdfs.round = true
- # 多少时间单位创建一个新的文件夹
- a3.sinks.k3.hdfs.roundValue = 1
- # 重新定义时间单位
- a3.sinks.k3.hdfs.roundUnit = hour
- # 是否使用本地时间戳
- a3.sinks.k3.hdfs.useLocalTimeStamp = true
- # 积攒多少个 Event 才 flush 到 HDFS 一次
- a3.sinks.k3.hdfs.batchSize = 100
- # 设置文件类型, 可支持压缩
- a3.sinks.k3.hdfs.fileType = DataStream
- # 多久生成一个新的文件
- a3.sinks.k3.hdfs.rollInterval = 60
- # 设置每个文件的滚动大小大概是 128M
- a3.sinks.k3.hdfs.rollSize = 134217700
- # 文件的滚动与 Event 数量无关
- a3.sinks.k3.hdfs.rollCount = 0
- # Use a channel which buffers events in memory
- a3.channels.c3.type = memory
- a3.channels.c3.capacity = 1000
- a3.channels.c3.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r3.channels = c3
- a3.sinks.k3.channel = c3
2. 启动监控
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
说明: 在使用 Spooling Directory Source 时, 不要在监控目录中创建并持续修改文件; 上传完成的文件会以. COMPLETED 结尾; 被监控文件夹每 500 毫秒扫描一次文件变动
3. 向 upload 文件夹中添加文件
4. 查看 HDFS
5. 查看 upload 文件夹
四. 单数据源多出口(选择器)
使用 Flume-1 监控文件变动, Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS.
同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem.
1. 准备工作
- # 在 / opt/module/flume/job 目录下创建 group1 文件夹
- mkdir group1
- # 在 / opt/module/datas / 目录下创建 flume3 文件夹
- mkdir flume3
2. 创建 flume-file-flume.conf
配置 1 个接收日志文件的 source 和两个 channel, 两个 sink, 分别输送给 flume-flume-hdfs 和 flume-flume-dir.
进入 group1 文件夹, 创建 flume-file-flume.conf, 添加如下内容
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
- # 将数据流复制给所有 channel
- a1.sources.r1.selector.type = replicating
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /opt/module/hive-1.2.1/logs/hive.log
- a1.sources.r1.shell = /bin/bash -c
- # Describe the sink
- # sink 端的 avro 是一个数据发送者
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop100
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type = avro
- a1.sinks.k2.hostname = hadoop100
- a1.sinks.k2.port = 4142
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- a1.channels.c2.type = memory
- a1.channels.c2.capacity = 1000
- a1.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1 c2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c2
3. 创建 flume-flume-hdfs.conf
配置上级 Flume 输出的 Source, 输出是到 HDFS 的 Sink. 在 group1 目录下创建 flume-flume-hdfs.conf, 添加以下内容
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
- # Describe/configure the source
- # source 端的 avro 是一个数据接收服务
- a2.sources.r1.type = avro
- a2.sources.r1.bind = hadoop100
- a2.sources.r1.port = 4141
- # Describe the sink
- a2.sinks.k1.type = hdfs
- a2.sinks.k1.hdfs.path = hdfs://hadoop100:9000/flume2/%Y%m%d/%H
- # 上传文件的前缀
- a2.sinks.k1.hdfs.filePrefix = flume2-
- # 是否按照时间滚动文件夹
- a2.sinks.k1.hdfs.round = true
- # 多少时间单位创建一个新的文件夹
- a2.sinks.k1.hdfs.roundValue = 1
- # 重新定义时间单位
- a2.sinks.k1.hdfs.roundUnit = hour
- # 是否使用本地时间戳
- a2.sinks.k1.hdfs.useLocalTimeStamp = true
- # 积攒多少个 Event 才 flush 到 HDFS 一次
- a2.sinks.k1.hdfs.batchSize = 100
- # 设置文件类型, 可支持压缩
- a2.sinks.k1.hdfs.fileType = DataStream
- # 多久生成一个新的文件
- a2.sinks.k1.hdfs.rollInterval = 600
- # 设置每个文件的滚动大小大概是 128M
- a2.sinks.k1.hdfs.rollSize = 134217700
- # 文件的滚动与 Event 数量无关
- a2.sinks.k1.hdfs.rollCount = 0
- # Describe the channel
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
4. 创建 flume-flume-dir.conf
配置上级 Flume 输出的 Source, 输出是到本地目录的 Sink. 在 group1 目录下, 创建 flume-flume-dir.conf, 添加以下内容
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c2
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop100
- a3.sources.r1.port = 4142
- # Describe the sink
- a3.sinks.k1.type = file_roll
- a3.sinks.k1.sink.directory = /opt/module/datas/flume3
- # Describe the channel
- a3.channels.c2.type = memory
- a3.channels.c2.capacity = 1000
- a3.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c2
- a3.sinks.k1.channel = c2
注: 输出的本地目录必须是已经存在的目录, 如果该目录不存在, 并不会创建新的目录.
5. 执行配置文件
分别开启对应配置文件: flume-flume-dir,flume-flume-hdfs,flume-file-flume.
- bin/flume-ng agent --conf conf/ --name a3 --conf-file jobs/group1/flume-flume-dir.conf
- bin/flume-ng agent --conf conf/ --name a2 --conf-file jobs/group1/flume-flume-hdfs.conf
- bin/flume-ng agent --conf conf/ --name a1 --conf-file jobs/group1/flume-file-flume.conf
6. 启动 Hadoop 和 Hive
- # 启动 hdfs
- start-dfs.sh
- # 进入到 hive 目录下, 启动 hive
- bin/hive
7. 检查 HDFS 上数据和 / opt/module/datas/flume3 目录中数据
为什么会有 6 个文件?
file_roll 的默认配置是每 30 秒滚动一次文件. 只要没有停止监控, 隔 30 秒去 ll 一下, 就会看到文件又多了
五. 单数据源多出口(Sink 组)
使用 Flume-1 监控文件变动, Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS. 同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 也负责存储到 HDFS
1. 准备工作
- # 在 / opt/module/flume/jobs 目录下创建 group2 文件夹
- mkdir group2
2. 创建 flume-netcat-flume.conf
配置 1 个接收日志文件的 source 和 1 个 channel, 两个 sink, 分别输送给 flume-flume-console1 和 flume-flume-console2.
进入 group2 文件夹, 创建 flume-netcat-flume.conf, 添加以下内容
- # Name the components on this agent
- a1.sources = r1
- a1.channels = c1
- a1.sinkgroups = g1
- a1.sinks = k1 k2
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
- #The component type name, needs to be default, failover or load_balance
- a1.sinkgroups.g1.processor.type = load_balance
- a1.sinkgroups.g1.processor.backoff = true
- # Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
- a1.sinkgroups.g1.processor.selector = round_robin
- a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop100
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type = avro
- a1.sinks.k2.hostname = hadoop100
- a1.sinks.k2.port = 4142
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinkgroups.g1.sinks = k1 k2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c1
3. 创建 flume-flume-console1.conf
配置上级 Flume 输出的 Source, 输出是到本地控制台.
在 group2 目录下, 创建 flume-flume-console1.conf, 添加以下内容
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
- # Describe/configure the source
- a2.sources.r1.type = avro
- a2.sources.r1.bind = hadoop100
- a2.sources.r1.port = 4141
- # Describe the sink
- a2.sinks.k1.type = logger
- # Describe the channel
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
4. 创建 flume-flume-console2.conf
配置上级 Flume 输出的 Source, 输出是到本地控制台.
在 group2 目录下. 创建 flume-flume-console2.conf, 添加以下内容
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c2
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop100
- a3.sources.r1.port = 4142
- # Describe the sink
- a3.sinks.k1.type = logger
- # Describe the channel
- a3.channels.c2.type = memory
- a3.channels.c2.capacity = 1000
- a3.channels.c2.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c2
- a3.sinks.k1.channel = c2
5. 执行配置文件
分别开启对应配置文件: flume-flume-console2,flume-flume-console1,flume-netcat-flume.
- bin/flume-ng agent --conf conf/ --name a3 --conf-file jobs/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
- bin/flume-ng agent --conf conf/ --name a2 --conf-file jobs/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
- bin/flume-ng agent --conf conf/ --name a1 --conf-file jobs/group2/flume-netcat-flume.conf
6. 使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
7. 查看 Flume2 及 Flume3 的控制台打印日志
六. 多数据源汇总(常用)
hadoop101 上的 Flume-1 监控文件 / opt/module/group.log,
hadoop100 上的 Flume-2 监控某一个端口的数据流,
Flume-1 与 Flume-2 将数据发送给 hadoop102 上的 Flume-3,Flume-3 将最终数据打印到控制台
1. 准备工作
如果 hadoop101 和 hadoop102 没有安装 flume, 用分发脚本将 flume 分发一下
xsync flume-1.7.0/
在 hadoop100,hadoop101 以及 hadoop102 的 / opt/module/flume/jobs 目录下创建一个 group3 文件夹.
2. 创建 flume1-logger-flume.conf
配置 Source 用于监控 hive.log 文件, 配置 Sink 输出数据到下一级 Flume.
在 hadoop101 上创建配置文件 flume1-logger-flume.conf, 并添加以下内容
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /opt/module/group.log
- a1.sources.r1.shell = /bin/bash -c
- # Describe the sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop102
- a1.sinks.k1.port = 4141
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
3. 创建创建 flume2-netcat-flume.conf
配置 Source 监控端口 44444 数据流, 配置 Sink 数据到下一级 Flume:
在 hadoop100 上创建配置文件 flume2-netcat-flume.conf, 并添加以下内容
- # Name the components on this agent
- a2.sources = r1
- a2.sinks = k1
- a2.channels = c1
- # Describe/configure the source
- a2.sources.r1.type = netcat
- a2.sources.r1.bind = hadoop100
- a2.sources.r1.port = 44444
- # Describe the sink
- a2.sinks.k1.type = avro
- a2.sinks.k1.hostname = hadoop102
- a2.sinks.k1.port = 4141
- # Use a channel which buffers events in memory
- a2.channels.c1.type = memory
- a2.channels.c1.capacity = 1000
- a2.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a2.sources.r1.channels = c1
- a2.sinks.k1.channel = c1
4. 创建 flume3-flume-logger.conf
配置 source 用于接收 flume1 与 flume2 发送过来的数据流, 最终合并后 sink 到控制台.
在 hadoop102 上创建配置文件 flume3-flume-logger.conf, 并添加以下内容
- # Name the components on this agent
- a3.sources = r1
- a3.sinks = k1
- a3.channels = c1
- # Describe/configure the source
- a3.sources.r1.type = avro
- a3.sources.r1.bind = hadoop102
- a3.sources.r1.port = 4141
- # Describe the sink
- # Describe the sink
- a3.sinks.k1.type = logger
- # Describe the channel
- a3.channels.c1.type = memory
- a3.channels.c1.capacity = 1000
- a3.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a3.sources.r1.channels = c1
- a3.sinks.k1.channel = c1
5. 执行配置文件
分别开启对应配置文件: flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf.
- #hadoop102
- bin/flume-ng agent --conf conf/ --name a3 --conf-file jobs/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
- #hadoop100
- bin/flume-ng agent --conf conf/ --name a2 --conf-file jobs/group3/flume2-netcat-flume.conf
- #hadoop101
- bin/flume-ng agent --conf conf/ --name a1 --conf-file jobs/group3/flume1-logger-flume.conf
6. 在 hadoop101 上向 / opt/module 目录下的 group.log 追加内容
7. 在 hadoop100 上向 44444 端口发送数据
8. 观察 hadoop102 上的数据
来源: https://www.cnblogs.com/duoduotouhenying/p/10210696.html