最近一段时间,负责公司的产品日志埋点与收集工作,搭建了基于 Flume+HDFS+Hive 日志搜集系统。
一、日志搜集系统架构:
简单画了一下日志搜集系统的架构图,可以看出,flume 承担了 agent 与 collector 角色,HDFS 承担了数据持久化存储的角色。
作者搭建的服务器是个 demo 版,只用到了一个 flume_collector,数据只存储在 HDFS。当然高可用的日志搜集处理系统架构是需要多台 flume collector 做负载均衡与容错处理的。
二、日志产生:
1、log4j 配置,每隔 1 分钟 roll 一个文件,如果 1 分钟之内文件大于 5M,则再生成一个文件。
- <!-- 产品数据分析日志 按分钟分 -->
- <RollingRandomAccessFile name="RollingFile_product_minute" fileName="${STAT_LOG_HOME}/${SERVER_NAME}_product.log"
- filePattern="${STAT_LOG_HOME}/${SERVER_NAME}_product.log.%d{yyyy-MM-dd-HH-mm}-%i">
- <PatternLayout charset="UTF-8" pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %level - %msg%xEx%n"
- />
- <Policies>
- <TimeBasedTriggeringPolicy interval="1" modulate="true" />
- <SizeBasedTriggeringPolicy size="${EVERY_FILE_SIZE}" />
- </Policies>
- <Filters>
- <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="NEUTRAL" />
- </Filters>
- </RollingRandomAccessFile>
roll 后的文件格式如下
2、日志内容
json 格式文件,最外层 json 按顺序为: tableName,logRequest,timestamp,statBody,logResponse,resultCode,resultMsg
- 2016 - 11 - 30 09 : 18 : 21.916 INFO - {
- "tableName": "ReportView",
- "logRequest": { * **
- },
- "timestamp": 1480468701432,
- "statBody": { * **
- },
- "logResponse": { * **
- },
- "resultCode": 1,
- "resultFailMsg": ""
- }
三、flume 配置
虚拟机环境,请见我的博客
hadoop 环境,请见我的另一篇博客
此处 flume 环境是
centos1:flume-agent
centos2:flume-collector
1、flume agent 配置,conf 文件
- a1.sources = skydataSource a1.channels = skydataChannel a1.sinks = skydataSink a1.sources.skydataSource.type = spooldir a1.sources.skydataSource.channels = skydataChannel#日志目录a1.sources.skydataSource.spoolDir = /opt/flumeSpool a1.sources.skydataSource.fileHeader = true#日志内容处理完后,会生成.COMPLETED后缀的文件,同时.log文件每一分钟roll一个,此处忽略.log文件与.COMPLETED文件a1.sources.skydataSource.ignorePattern = ([ ^ _] + ) | (. * (\.log) $) | (. * (\.COMPLETED) $) a1.sources.skydataSource.basenameHeader = true a1.sources.skydataSource.deserializer.maxLineLength = 102400#自定义拦截器,对json格式的源日志进行字段分隔,并添加timestamp,为后面的hdfsSink做处理,拦截器代码见后面a1.sources.skydataSource.interceptors = i1 a1.sources.skydataSource.interceptors.i1.type = com.skydata.flume_interceptor.HiveLogInterceptor2$Builder a1.sinks.skydataSink.type = avro a1.sinks.skydataSink.channel = skydataChannel a1.sinks.skydataSink.hostname = centos2 a1.sinks.skydataSink.port = 4545#此处配置deflate压缩后,hive collector那边一定也要相应配置解压缩a1.sinks.skydataSink.compression - type = deflate a1.channels.skydataChannel.type = memory a1.channels.skydataChannel.capacity = 10000 a1.channels.skydataChannel.transactionCapacity = 1000
2、flume collector 配置
- a1.sources = avroSource a1.channels = memChannel a1.sinks = hdfsSink a1.sources.avroSource.type = avro a1.sources.avroSource.channels = memChannel a1.sources.avroSource.bind = centos2 a1.sources.avroSource.port = 4545#与flume agent配置对应a1.sources.avroSource.compression - type = deflate a1.sinks.hdfsSink.type = hdfs a1.sinks.hdfsSink.channel = memChannel#skydata_hive_log为hive表,按年 - 月 - 日分区存储,a1.sinks.hdfsSink.hdfs.path = hdfs: //centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d
- a1.sinks.hdfsSink.hdfs.batchSize = 10000 a1.sinks.hdfsSink.hdfs.fileType = DataStream a1.sinks.hdfsSink.hdfs.writeFormat = Text a1.sinks.hdfsSink.hdfs.rollSize = 10240000 a1.sinks.hdfsSink.hdfs.rollCount = 0 a1.sinks.hdfsSink.hdfs.rollInterval = 300 a1.channels.memChannel.type = memory a1.channels.memChannel.capacity = 100000 a1.channels.memChannel.transactionCapacity = 10000
四、hive 表创建与分区
1、hive 表创建
在 hive 中执行建表语句后,hdfs://centos1:9000/flume / 目录下新生成了 skydata_hive_log 目录。(建表语句里面有 location 关键字)
\u0001 表示 hive 通过该分隔符进行字段分离,该字符在 linux 用 vim 编辑器打开是 ^A。
由于日志格式是 JSON 格式,因为需要将 JSON 格式转换成 \ u0001 字符分隔,并通过 dt 进行分区。这一步通过 flume 自定义拦截器来完成。
- CREATE TABLE`skydata_hive_log` (`tableNmae`string, `logRequest`string, `timestamp`bigint, `statBody`string, `logResponse`string, `resultCode`int, `resultFailMsg`string) PARTITIONED BY(`dt`string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001'STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION 'hdfs://centos1:9000/flume/skydata_hive_log';
2、hive 表分区
日志 flume sink 到 hdfs 上时,如果没有对 hive 表预先进行分区,会出现日志已经上传到 hdfs 目录,但是 hive 表却无法加载数据的情况。 这是因为 hive 表的分区没有创建。因此要对表进行分区添加,这里对最近一年左右时间进行分区添加 分区脚本 init_flume_hive_table.sh
- for ((i = -1; i <= 365; i++)) do dt = $(date - d "$(date +%F) ${i} days" + %Y - %m - %d) echo date = $dt hive - e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs / init_skydata_hive_log.out 2 >> logs / init_skydata_hive_log.err done
五、自定义 flume 拦截器
新建 maven 工程,拦截器 HiveInterceptor2 代码如下。
- package com.skydata.flume_interceptor;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
- import org.apache.flume.interceptor.TimestampInterceptor.Constants;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.alibaba.fastjson.JSONObject;
- import com.google.common.base.Charsets;
- import com.google.common.base.Joiner;
- public class HiveLogInterceptor2 implements Interceptor {
- private static Logger logger = LoggerFactory.getLogger(HiveLogInterceptor2.class);
- public static final String HIVE_SEPARATOR = "\001";
- public void close() {
- // TODO Auto-generated method stub
- }
- public void initialize() {
- // TODO Auto-generated method stub
- }
- public Event intercept(Event event) {
- String orginalLog = new String(event.getBody(), Charsets.UTF_8);
- try {
- String log = this.parseLog(orginalLog);
- // 设置时间,用于hdfsSink
- long now = System.currentTimeMillis();
- Map headers = event.getHeaders();
- headers.put(Constants.TIMESTAMP, Long.toString(now));
- event.setBody(log.getBytes());
- } catch(Throwable throwable) {
- logger.error(("errror when intercept,log [ " + orginalLog + " ] "), throwable);
- return null;
- }
- return event;
- }
- public List intercept(List list) {
- List events = new ArrayList();
- for (Event event: list) {
- Event interceptedEvent = this.intercept(event);
- if (interceptedEvent != null) {
- events.add(interceptedEvent);
- }
- }
- return events;
- }
- private static String parseLog(String log) {
- List logFileds = new ArrayList();
- String dt = log.substring(0, 10);
- String keyStr = "INFO - ";
- int index = log.indexOf(keyStr);
- String content = "";
- if (index != -1) {
- content = log.substring(index + keyStr.length(), log.length());
- }
- //针对不同OS,使用不同回车换行符号
- content = content.replaceAll("\r", "");
- content = content.replaceAll("\n", "\\\\" + System.getProperty("line.separator"));
- JSONObject jsonObj = JSONObject.parseObject(content);
- String tableName = jsonObj.getString("tableName");
- String logRequest = jsonObj.getString("logRequest");
- String timestamp = jsonObj.getString("timestamp");
- String statBody = jsonObj.getString("statBody");
- String logResponse = jsonObj.getString("logResponse");
- String resultCode = jsonObj.getString("resultCode");
- String resultFailMsg = jsonObj.getString("resultFailMsg");
- //字段分离
- logFileds.add(tableName);
- logFileds.add(logRequest);
- logFileds.add(timestamp);
- logFileds.add(statBody);
- logFileds.add(logResponse);
- logFileds.add(resultCode);
- logFileds.add(resultFailMsg);
- logFileds.add(dt);
- return Joiner.on(HIVE_SEPARATOR).join(logFileds);
- }
- public static class Builder implements Interceptor.Builder {
- public Interceptor build() {
- return new HiveLogInterceptor2();
- }
- public void configure(Context arg0) {}
- }
- }
pom.xml 增加如下配置,将 flume 拦截器工程进行 maven 打包,jar 包与依赖包均拷到 ${flume-agent}/lib 目录
- <build>
- <plugins>
- <plugin>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
- maven-dependency-plugin
- </artifactId>
- <configuration>
- <outputDirectory>
- ${project.build.directory}
- </outputDirectory>
- </configuration>
- </plugin>
- <plugin>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
- maven-dependency-plugin
- </artifactId>
- <executions>
- <execution>
- <id>
- copy-dependencies
- </id>
- <phase>
- prepare-package
- </phase>
- <goals>
- <goal>
- copy-dependencies
- </goal>
- </goals>
- <configuration>
- <outputDirectory>
- ${project.build.directory}/lib
- </outputDirectory>
- <overWriteReleases>
- true
- </overWriteReleases>
- <overWriteSnapshots>
- true
- </overWriteSnapshots>
- <overWriteIfNewer>
- true
- </overWriteIfNewer>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
对日志用分隔符 "\001" 进行分隔,。经拦截器处理后的日志格式如下,^A 即是 "\001"
- ReportView ^ A {
- "request": {},
- "requestBody": {
- "detailInfos": [],
- "flag": "",
- "reportId": 7092,
- "pageSize": 0,
- "searchs": [],
- "orders": [],
- "pageNum": 1
- }
- } ^ A1480468701432 ^ A {
- "sourceId": 22745,
- "reportId": 7092,
- "projectId": 29355,
- "userId": 2532
- } ^ A {
- "responseBody": {
- "statusCodeValue": 200,
- "httpHeaders": {},
- "body": {
- "msg": "请求成功",
- "httpCode": 200,
- "timestamp": 1480468701849
- },
- "statusCode": "OK"
- },
- "response": {}
- } ^ A1 ^ A ^ A2016 - 11 - 30
至此,flume+Hdfs+Hive 的配置均已完成。
六、启动运行与结果
1、启动 hadoop hdfs
参考我的前一篇文章: http://www.cnblogs.com/xckk/p/6124553.html
2、启动 flume_collector 和 flume_agent,由于 flume 启动命令参数太多,自己写了一个启动脚本
start-Flume.sh
- # ! /bin/bash jps - l | grep org.apache.flume.node.Application | awk '{print $1}' | xargs kill - 9 2 > &1 > /dev/null cd "$(dirname "$0 ")"cd..nohup bin / flume - ng agent--conf conf--conf - file conf / flume - conf.properties--name a1 2 > &1 > /dev/null &
3、hdfs 查看数据
可以看到搜集的日志已经上传到 HDFS 上
- [root@centos1 bin]#rm - rf FlumeData.1480587273016.tmp[root@centos1 bin]#hadoop fs - ls / flume / skydata_hive_log / dt = 2016 - 12 - 01 / Found 3 items - rw - r--r--3 root supergroup 5517 2016 - 12 - 01 08 : 12 / flume / skydata_hive_log / dt = 2016 - 12 - 01 / FlumeData.1480608753042.tmp - rw - r--r--3 root supergroup 5517 2016 - 12 - 01 08 : 40 / flume / skydata_hive_log / dt = 2016 - 12 - 01 / FlumeData.1480610453116 - rw - r--r--3 root supergroup 5517 2016 - 12 - 01 08 : 44 / flume / skydata_hive_log / dt = 2016 - 12 - 01 / FlumeData.1480610453117[root@centos1 bin]#
4、启动 hive,查看数据,可以看到 hive 已经可以加载 hdfs 数据
- [root@centos1 lib]#hive Logging initialized using configuration in file: /root/apache - hive - 1.2.1 - bin / conf / hive - log4j.properties hive > select * from skydata_hive_log limit 2;
- OK ReportView {
- "request": {},
- "requestBody": {
- "detailInfos": [],
- "flag": "",
- "reportId": 7092,
- "pageSize": 0,
- "searchs": [],
- "orders": [],
- "pageNum": 1
- }
- }
- 1480468701432 {
- "sourceId": 22745,
- "reportId": 7092,
- "projectId": 29355,
- "userId": 2532
- } {
- "responseBody": {
- "statusCodeValue": 200,
- "httpHeaders": {},
- "body": {
- "msg": "请求成功",
- "httpCode": 200,
- "timestamp": 1480468701849
- },
- "statusCode": "OK"
- },
- "response": {}
- }
- 1 2016 - 12 - 01 ReportDesignResult {
- "request": {},
- "requestBody": {
- "sourceId": 22745,
- "detailInfos": [{
- "colName": "月份",
- "flag": "0",
- "reportId": 7092,
- "colCode": "col_2_22745",
- "pageSize": 20,
- "type": "1",
- "pageNum": 1,
- "rcolCode": "col_25538",
- "colType": "string",
- "formula": "",
- "id": 25538,
- "position": "row",
- "colId": 181664,
- "dorder": 1,
- "pColName": "月份",
- "pRcolCode": "col_25538"
- },
- {
- "colName": "综合利率(合计)",
- "flag": "1",
- "reportId": 7092,
- "colCode": "col_11_22745",
- "pageSize": 20,
- "type": "1",
- "pageNum": 1,
- "rcolCode": "sum_col_25539",
- "colType": "number",
- "formula": "sum",
- "id": 25539,
- "position": "group",
- "colId": 181673,
- "dorder": 1,
- "pColName": "综合利率",
- "pRcolCode": "col_25539"
- }],
- "flag": "bar1",
- "reportId": 7092,
- "reportName": "iiiissszzzV",
- "pageSize": 100,
- "searchs": [],
- "orders": [],
- "pageNum": 1,
- "projectId": 29355
- }
- }
- 1480468703586 {
- "reportType": "bar1",
- "sourceId": 22745,
- "reportId": 7092,
- "num": 5,
- "usedFields": "月份$$综合利率(合计)$$",
- "projectId": 29355,
- "userId": 2532
- } {
- "responseBody": {
- "statusCodeValue": 200,
- "httpHeaders": {},
- "body": {
- "msg": "请求成功",
- "reportId": 7092,
- "httpCode": 200,
- "timestamp": 1480468703774
- },
- "statusCode": "OK"
- },
- "response": {}
- }
- 1 2016 - 12 - 01 Time taken: 2.212 seconds,
- Fetched: 2 row(s) hive >
七、常见异常
1、FATAL: Spool Directory source skydataSource: {spoolDir: /opt/flumeSpool}: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
可能原因:
1、字符编码问题,spoolDir 目录下的日志文件必须是 UTF-8
2、使用 Spooling Directory Source 的时候,一定要避免同时读写一个文件的情况, conf 文件增加如下配置
a1.sources.skydataSource.ignorePattern=([^_]+)|(.*(\.log)$)|(.*(\.COMPLETED)$)
2、日志导入到 hadoop 目录,但是 hive 表查询无数据。如 hdfs://centos1:9000/flume/skydata_hive_log/dt=2016-12-01 / 下面有数据,
hive 查询 select * from skydata_hive_log 却无数据
可能原因:
1、建表的时候,没有建立分区。即使 flume 进行了配置(a1.sinks.hdfsSink.hdfs.path=hdfs://centos1:9000/flume/skydata_hive_log/dt=%Y-%m-%d),但是表的分区结构没有建立,因此文件导入到 HDFS 上后,HIVE 并不能读取。
解决方法:先创建分区,建立 shell 可执行文件,将该表的分区先建好
- for ((i = -10; i <= 365; i++)) do dt = $(date - d "$(date +%F) ${i} days" + %Y - %m - %d) echo date = $dt hive - e "ALTER TABLE skydata_hive_log ADD PARTITION(dt='${dt}')" >> logs / init_skydata_hive_log.out 2 >> logs / init_skydata_hive_log.err done
2、也可能是文件在 hdfs 上还是. tmp 文件,仍然被 hdfs 在写入。.tmp 文件 hive 暂时无法读取,只能读取非. tmp 文件。
解决方法:等待 hdfs 配置的 roll 间隔时间,或者达到一定大小后 tmp 文件重命名为 hdfs 上的日志文件后,再查询 hive,即可查到。
秀才坤坤出品
转载请注明
来源: http://www.cnblogs.com/xckk/p/6125838.html