一, 创建表并导入日志数据, 引出问题
- ## 建表
- hive (default)> create table IF NOT EXISTS default.bf_log_src(
- > remote_addr string,
- > remote_user string,
- > time_local string,
- > request string,
- > status string,
- > body_bytes_sent string,
- > request_body string,
- > http_referer string,
- > http_user_agent string,
- > http_x_forwarded_for string,
- > host string
- > )
- > ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
- > stored as textfile;
- OK
- Time taken: 0.037 seconds
- ## 加载数据
- hive (default)> load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table default.bf_log_src ;
- ##select
- hive (default)> select * from bf_log_src limit 5 ;
- ## 出现了一个问题, 原文件有 11 列数据, 但是此时查出来只有 8 列
二, 使用 RegexSerDe 处理 Apache 或者 Ngnix 日志文件
正则测试网站: http://tool.chinaz.com/regex/
- # 删除原先的表, 并重新创建
- hive (default)> drop table IF EXISTS default.bf_log_src;
- hive (default)> create table IF NOT EXISTS default.bf_log_src(
- > remote_addr string,
- > remote_user string,
- > time_local string,
- > request string,
- > status string,
- > body_bytes_sent string,
- > request_body string,
- > http_referer string,
- > http_user_agent string,
- > http_x_forwarded_for string,
- > host string
- > )
- > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
- > WITH SERDEPROPERTIES (
- > "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
- > )
- > STORED AS TEXTFILE;
- OK
- Time taken: 0.056 seconds
- # 加载数据
- hive (default)> load data local inpath '/opt/datas/moodle.ibeifeng.access.log' into table default.bf_log_src ;
- # 查询
- hive (default)> select * from bf_log_src limit 5 ;
- # 此时查询出来的数据字段数量就和原文件一样了;
- # 此时就有了原表, 下面就可以根据原表处理数据了;
三, 依据原表创建子表及设置 orcfile 存储和 snappy 压缩数据
此时假如我们需要对原表中的部分字段进行分析: IP, 访问时间, 请求地址, 转入连接
需要建立一个字表, 将需要的字段查询出来, 插到子表中;
- # 建表
- hive (default)> drop table if exists default.bf_log_comm ;
- OK
- Time taken: 0.011 seconds
- hive (default)> create table IF NOT EXISTS default.bf_log_comm (
- > remote_addr string,
- > time_local string,
- > request string,
- > http_referer string
- > )
- > ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- > STORED AS orc tblproperties ("orc.compress"="SNAPPY");
- OK
- Time taken: 0.034 seconds
- # 插入数据
- hive (default)> insert into table default.bf_log_comm select remote_addr, time_local, request, http_referer from default.bf_log_src ;
- ## 查询
- hive (default)> select * from bf_log_comm limit 5 ;
- # 此时我们需要的字段已经被插到了字表中
四, 数据清洗之自定义 UDF 去除数据双引号
源码:
- package com.beifeng.senior.hive.udf;
- import org.apache.hadoop.hive.ql.exec.UDF;
- import org.apache.hadoop.io.Text;
- /**
- * 1. Implement one or more methods named
- * "evaluate" which will be called by Hive.
- *
- * 2."evaluate" should never be a void method. However it can return "null" if
- * needed.
- * @author root
- *
- */
- public class RemoveQuotesUDF extends UDF{
- public Text evaluate(Text str) {
- //validate
- if(null == str) {
- return null;
- }
- if(null == str.toString()) {
- return null;
- }
- //remove
- return new Text (str.toString().replaceAll("\"", "")) ;
- }
- public static void main(String[] args) {
- System.out.println(new RemoveQuotesUDF().evaluate(new Text("\"31/Aug/2015:23:57:46 +0800\"")));
- }
- }
添加为 function:
- hive (default)> add jar /opt/datas/jars/hiveudf2.jar ;
- Added /opt/datas/jars/hiveudf2.jar to class path
- Added resource: /opt/datas/jars/hiveudf2.jar
- hive (default)> create temporary function my_removequotes as "com.beifeng.senior.hive.udf.RemoveQuotesUDF" ;
- OK
- Time taken: 0.013 seconds
重新插入:
- ## 插入
- hive (default)> insert overwrite table default.bf_log_comm select my_removequotes(remote_addr), my_removequotes(time_local),
- > my_removequotes(request), my_removequotes(http_referer) from default.bf_log_src ;
- ## 查询, 引号已经去掉了
- hive (default)> select * from bf_log_comm limit 5 ;
五, 自定义 UDF 转换日期时间数据
源码:
- package com.beifeng.senior.hive.udf;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Locale;
- import org.apache.hadoop.hive.ql.exec.UDF;
- import org.apache.hadoop.io.Text;
- /**
- * 1. Implement one or more methods named
- * "evaluate" which will be called by Hive.
- *
- * 2."evaluate" should never be a void method. However it can return "null" if
- * needed.
- * @author root
- *
- */
- public class DateTransformUDF extends UDF{
- private final SimpleDateFormat inputFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
- private final SimpleDateFormat outputFormat = new SimpleDateFormat("yyyyMMddHHmmss");
- /**
- * 31/Aug/2015:00:04:37 +0800
- *
- * 20150831000437
- *
- * @param str
- * @return
- */
- public Text evaluate(Text input) {
- Text output = new Text();
- //validate
- if(null == input) {
- return null;
- }
- if(null == input.toString()) {
- return null;
- }
- String inputDate = input.toString().trim();
- if(null == inputDate) {
- return null;
- }
- try {
- //parse
- Date parseDate = inputFormat.parse(inputDate);
- //tranform
- String outputDate = outputFormat.format(parseDate);
- //set
- output.set(outputDate);
- } catch (Exception e) {
- e.printStackTrace();
- }
- //lower
- return output;
- }
- public static void main(String[] args) {
- System.out.println(new DateTransformUDF().evaluate(new Text("31/Aug/2015:00:04:37 +0800")));
- }
- }
添加 function:
- hive (default)> add jar /opt/datas/jars/hiveudf3.jar ;
- Added /opt/datas/jars/hiveudf3.jar to class path
- Added resource: /opt/datas/jars/hiveudf3.jar
- hive (default)> create temporary function my_datetransform as "com.beifeng.senior.hive.udf.DateTransformUDF" ;
- OK
- Time taken: 0.013 seconds
重新插入:
- ## 插入
- hive (default)> insert overwrite table default.bf_log_comm select my_removequotes(remote_addr), my_datetransform(my_removequotes(time_local)),
- > my_removequotes(request), my_removequotes(http_referer) from default.bf_log_src ;
- ## 查询, 时间已经格式化
- hive (default)> select * from bf_log_comm limit 5 ;
六, MovieLens 数据分析采用 python 脚本进行数据清洗和统计
1, 准备
下载数据样本: wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
解压: unzip ml-100k.zip
- [[email protected]-senior datas]# cd ml-100k
- [[email protected]-senior ml-100k]# ls
- allbut.pl README u1.test u2.test u3.test u4.test u5.test ua.test ub.test u.genre u.item u.user
- mku.sh u1.base u2.base u3.base u4.base u5.base ua.base ub.base u.data u.info u.occupation
- [[email protected]-senior ml-100k]# head u.data
- userid moveid rate time
- 196 242 3 881250949
- 186 302 3 891717742
- 22 377 1 878887116
- 244 51 2 880606923
- 166 346 1 886397596
- 298 474 4 884182806
- 115 265 2 881171488
- 253 465 5 891628467
- 305 451 3 886324817
- 6 86 3 883603013
2, 准备原表
- ## 建表
- hive (default)> CREATE TABLE u_data (
- > userid INT,
- > movieid INT,
- > rating INT,
- > unixtime STRING)
- > ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- > STORED AS TEXTFILE;
- OK
- Time taken: 0.073 seconds
- ## 导入数据
- hive (default)> LOAD DATA LOCAL INPATH '/opt/datas/ml-100k/u.data' OVERWRITE INTO TABLE u_data;
3, 用 python 脚本处理数据
- ##VIM weekday_mapper.py
- import sys
- import datetime
- for line in sys.stdin:
- line = line.strip()
- userid, movieid, rating, unixtime = line.split('\t')
- weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
- print '\t'.join([userid, movieid, rating, str(weekday)])
- ## 创建新表
- hive (default)> CREATE TABLE u_data_new (
- > userid INT,
- > movieid INT,
- > rating INT,
- > weekday INT)
- > ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
- OK
- Time taken: 0.027 seconds
- ## 添加脚本
- hive (default)> add FILE /opt/datas/ml-100k/weekday_mapper.py;
- Added resource: /opt/datas/ml-100k/weekday_mapper.py
- ## 插入数据
- hive (default)> INSERT OVERWRITE TABLE u_data_new
- > SELECT
- > TRANSFORM (userid, movieid, rating, unixtime) #input from source table, 要处理的数据来源于原表
- > USING 'python weekday_mapper.py' #用的 python 脚本
- > AS (userid, movieid, rating, weekday) #python 脚本处理后的输出数据
- > FROM u_data;
- ##select
- hive (default)> SELECT weekday, COUNT(*) FROM u_data_new GROUP BY weekday;
来源: http://www.bubuko.com/infodetail-3047556.html