1:order by, sort by, distribute by, cluster by
1.1 order by
hive 中的 order by 语句会对查询结果做一次全局排序, 即, 所有的 mapper 产生的结果都会交给一个 reducer 去处理, 无论数据量大小, job 任务只会启动一个 reducer, 如果数据量巨大, 则会耗费大量的时间. 提示: 如果在严格模式下, order by 需要指定 limit 数据条数, 不然数据量巨大的情况下会造成崩溃无输出结果. 涉及属性: set hive.mapred.mode=nonstrict/strict
select * from company_info order by money desc;
1.2 sort by
hive 中的 sort by 语句会对每一块局部数据进行局部排序, 即, 每一个 reducer 处理的数据都是有序的, 但是不能保证全局有序.
1.3 distribute by
hive 中的 distribute by 一般要和 sort by 一起使用, 即将某一块数据归给 (distribute by) 某一个 reducer 处理, 然后在指定的 reducer 中进行 sort by 排序.
提示: distribute by 必须写在 sort by 之前
提示: 涉及属性
mapreduce.job.reduces, hive.exec.reducers.bytes.per.reducer 例如: 不同的人 (personId) 分为不同的组, 每组按照 money 排序
select * from company_info distribute by personId sort by personId, money desc;
1.4 cluster by
hive 中的 cluster by 在 distribute by 和 sort by 排序字段一致的情况下是等价的. 同时, cluster by 指定的列只能是降序, 即默认的 descend, 而不能是 ascend. 例如: 写一个等价于 distribute by 与 sort by 的例子
- select * from company_info distribute by personId sort by personId;
- select * from compnay_info cluster by personId;
2: 行转列, 列转行(UDAF 与 UDTF)
2.1 行转列(concat_ws)
- create table person_info(
- name string,
- constellation string,
- blood_type string)
- row format delimited fields terminated by "\t";
- load data local inpath "person_info.tsv" into table person_info;
- #collect_set(t1.name) 表示把分组后的多行值转化为集合
- select
- t1.base,
- concat_ws('|', collect_set(t1.name)) name
- from
- (select
- name,
- concat(constellation, ",", blood_type) base
- from
- person_info) t1
- group by
- t1.base;
2.2 列转行(array<string > 数组结构)
- create table movie_info(
- movie string,
- category array<string>)
- row format delimited fields terminated by "\t"
- collection items terminated by ",";
- load data local inpath "movie_info.tsv" into table movie_info;
将电影分类中的数组数据展开
- select
- movie,
- category_name
- from
- movie_info lateral view explode(category) table_tmp as category_name;
"fields terminated by": 字段与字段之间的分隔符.
"collection items terminated by": 一个字段中各个子元素 item 的分隔符
orc 即 Optimized Row Columnar (ORC) file, 在 RCFile 的基础上演化而来, 可以提供一种高 效的方法在 Hive 中存储数据, 提升了读, 写, 处理数据的效率.
2.3 分桶
直接分桶
开始操作之前, 需要将 hive.enforce.bucketing 属性设置为 true, 以标识 Hive 可以识别桶.
- create table music(
- id int,
- name string,
- size float)
- row format delimited
- fields terminated by "\t"
- clustered by (id) into 4 buckets;
在分区中分桶
当数据量过大, 需要庞大分区数量时, 可以考虑桶, 因为分区数量太大的情况可能会导致文 件系统挂掉, 而且桶比分区有更高的查询效率. 数据最终落在哪一个桶里, 取决于 clustered by 的那个列的值的 hash 数与桶的个数求余来决定. 虽然有一定离散性, 但不能保证每个桶 中的数据量是一样的.
- create table music2(
- id int,
- name string,
- size float)
- partitioned by (date string)
- clustered by (id) sorted by(size) into 4 bucket
- row format delimited
- fields terminated by "\t";
- load data local inpath 'demo/music.txt' into table music2 partition(date='2017-08-30');
3 Hive 综合项目:
- package com.z.YouTube.util;
- public class ETLUtils {
- /**
- * 1, 过滤不合法数据
- * 2, 去掉 & 符号左右两边的空格
- * 3, \t 换成 & 符号
- * @param ori
- * @return
- */
- public static String getETLString(String ori){
- String[] splits = ori.split("\t");
- //1, 过滤不合法数据
- if(splits.length <9) return null;
- //2, 去掉 & 符号左右两边的空格
- splits[3] = splits[3].replaceAll("","");
- StringBuilder sb = new StringBuilder();
- //3, \t 换成 & 符号
- for(int i = 0; i < splits.length; i++){
- sb.append(splits[i]);
- if(i < 9){
- if(i != splits.length - 1){
- sb.append("\t");
- }
- }else{
- if(i != splits.length - 1){
- sb.append("&");
- }
- }
- }
- return sb.toString();
- }
- }
- #youtube_ori
- create table youtube_ori(
- videoId string,
- uploader string,
- age int,
- category array<string>,
- length int,
- views int,
- rate float,
- ratings int,
- comments int,
- relatedId array<string>)
- row format delimited
- fields terminated by "\t"
- collection items terminated by "&"
- stored as textfile;
- #youtube_user_ori:
- create table youtube_user_ori(
- uploader string,
- videos int,
- friends int)
- clustered by (uploader) into 24 buckets
- row format delimited
- fields terminated by "\t"
- stored as textfile;
- #youtube_orc
- create table youtube_orc(
- videoId string,
- uploader string,
- age int,
- category array<string>,
- length int,
- views int,
- rate float,
- ratings int,
- comments int,
- relatedId array<string>)
- clustered by (uploader) into 8 buckets
- row format delimited fields terminated by "\t"
- collection items terminated by "&"
- stored as orc;
- #youtube_user_orc:
- create table youtube_user_orc(
- uploader string,
- videos int,
- friends int)
- clustered by (uploader) into 24 buckets
- row format delimited
- fields terminated by "\t"
- stored as orc;
- load data inpath "/youtube/output/video/2008/0222" into table youtube_ori;
- load data inpath "/youtube/user/2008/0903" into table youtube_user_ori;
- insert into table youtube_orc select * from youtube_ori;
- insert into table youtube_user_orc select * from youtube_user_ori;
- select
- videoId,
- uploader,
- age,
- category,
- length,
- views,
- rate,
- ratings,
- comments
- from
- youtube_orc
- order by
- views
- desc limit
- select
- category_name as category,
- count(t1.videoId) as hot
- from (
- select
- videoId,
- category_name
- from
- youtube_orc lateral view explode(category) t_catetory as category_name) t1
- group by
- t1.category_name
- order by
- hot
- desc limit
- 10;
- select
- category_name as category,
- count(t2.videoId) as hot_with_views
- from (
- select
- videoId,
- category_name
- from (
- select
- *
- from
- youtube_orc
- order by
- views
- desc limit
- 20) t1 lateral view explode(category) t_catetory as category_name) t2
- group by
- category_name
- order by
- hot_with_views
- desc;
- select
- *
- from
- youtube_orc
- order by
- views
- desc limit
- 50;
- select
- explode(relatedId) as videoId
- from
- t1;
- (select
- distinct(t2.videoId),
- t3.category
- from
- t2
- inner join
- youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as
- category_name;
- select
- category_name as category,
- count(t5.videoId) as hot
- from (
- select
- videoId,
- category_name
- from (
- select
- distinct(t2.videoId),
- t3.category
- from (
- select
- explode(relatedId) as videoId
- from (
- select
- *
- from
- youtube_orc
- order by
- views
- desc limit
- 50) t1) t2
- inner join
- youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category)
- t_catetory as category_name) t5
- group by
- category_name
- order by
- hot
- desc;
- create table youtube_category(
- videoId string,
- uploader string,
- age int,
- categoryId string,
- length int,
- views int,
- rate float,
- ratings int,
- comments int,
- relatedId array<string>)
- row format delimited
- fields terminated by "\t"
- collection items terminated by "&"
- stored as orc;
- insert into table youtube_category
- select
- videoId,
- uploader,
- age,
- categoryId,
- length,
- views,
- rate,
- ratings,
- comments,
- relatedId
- from
- youtube_orc lateral view explode(category) catetory as categoryId;
- select
- videoId,
- views
- from
- youtube_category
- where
- categoryId = "Music"
- order by
- views
- desc limit
- 10;
- select
- *
- from
- youtube_user_orc
- order by
- videos
- desc limit
- 10;
- select
- t2.videoId,
- t2.views,
- t2.ratings,
- t1.videos,
- t1.friends
- from (
- select
- *
- from
- youtube_user_orc
- order by
- videos desc
- limit
- 10) t1
- join
- youtube_orc t2
- on
- t1.uploader = t2.uploader
- order by
- views desc
- limit
- 20;
- <name>mapred.child.java.opts</name>
- <value>-Xmx200m</value>
- <property>
- <name>yarn.scheduler.maximum-allocation-mb</name>
- <value>2048</value>
- </property>
- <property>
- <name>yarn.scheduler.minimum-allocation-mb</name>
- <value>2048</value>
- </property>
- <property>
- <name>yarn.nodemanager.vmem-pmem-ratio</name>
- <value>2.1</value>
- </property>
- <property>
- <name>mapred.child.java.opts</name>
- <value>-Xmx1024m</value>
- </property>
- select /*+streamtable(s)*/ s.ymd,d.dividend
- from stocks s inner join dividends d on s.ymd=d.ymd and s.symbol=d.symbol
- where s.symbol='aapl'
- set hive.auto.convert.join = true; // 是否根据输入小表的大小, 自动将 reduce 端的 common join 转化为 map join, 将小表刷入内存中.
- set hive.mapjoin.smalltable.filesize = 2500000; // 刷入内存表的大小(字节)
- set hive.mapjoin.maxsize=1000000; // Map Join 所处理的最大的行数. 超过此行数, Map Join 进程会异常退出
- set hive.exec.parallel=true; // 可以开启并发执行.
- set hive.exec.parallel.thread.number=16; // 同一个 sql 允许最大并行度, 默认为 8.
- set hive.exec.compress.intermediate=true;
- set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
- set hive.intermediate.compression.type=BLOCK;
- set hive.exec.compress.output=true;
- set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
- set mapred.output.compression.type=BLOCK:
- org.apache.hadoop.io.compress.GzipCodec
- org.apache.hadoop.io.compress.SnappyCodec
- com.hadoop.compression.lzo.LzopCodec
- org.apache.hadoop.io.compress.Lz4Codec
- set mapred.job.tracker=local;
- set hive.exec.mode.local.auto=true;
- hive.map.aggr=true; // 用于设定是否在 map 端进行聚合, 默认值为真
- hive.groupby.mapaggr.checkinterval=100000; // 用于设定 map 端进行聚合操作的条目数
- https://mp.weixin.qq.com/s?__biz=MzU5OTM5NjQ2NA==&mid=2247483659&idx=1&sn=28a7d3e2c0bd87fa4239719b1b360aed&chksm=feb4d814c9c35102ad9f018342307e3fe86f06cfeb14d522bf563b0c61bba061907065490eba&scene=21#wechat_redirect
- set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; // Map 端输入, 合并文件之后按照 block 的大小分割(默认)
- set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; // Map 端输入, 不合并 一个文件起一个 Map
- default_mapper_num=total_size/dfs_block_size
- set mapred.map.tasks=10;
- split_size=max(mapred.min.split.size, dfs_block_size)
- split_num=total_size/split_size
- compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks))
来源: https://juejin.im/post/5c1e38a4e51d45369208ac1e