任务相关
set mapred.job.name=my_job_20180405; // 设置任务名字
set mapred.job.queue.name = 队列名称; // 设置使用队列
set mapred.job.priority=VERY_HIGH; // 设置任务优先级
启用压缩
- set mapred.output.compress=true; // 启用压缩
- set hive.exec.compress.output=true; // 输出压缩
- set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; // 压缩格式
- set mapred.output.compression.type=BLOCK; // 压缩类型
支持的压缩方式:
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.SnappyCodec
com.hadoop.compression.lzo.LzopCodec
org.apache.hadoop.io.compress.Lz4Codec
mapreduce 内存配置
- set mapreduce.map.memory.mb=2048; // map 可用的最大内存
- set mapreduce.reduce.memory.mb=2048; // reduce 可用最大内存
reduce 相关配置
- set mapred.reduce.tasks=-1 ; // 强制设置 Reduce 的个数,-1 为默认, 表示由 hive 自动分配管理
- set hive.exec.reducers.max=100; // 设置 reducer 个数的上限
- set hive.exec.reducers.bytes.per.reducer = 1000000000; // 也就是每个节点的 reduce 默认是处理 1G 大小的数据
计算 reducer 数的公式
N=min(hive.exec.reducers.max , 总输入数据量 / hive.exec.reducers.bytes.per.reducer)
输入小文件合并
- set mapred.max.split.size = 256000000; // map 最大的输入大小
- set mapred.min.split.size=1 ; // 最小分割
- set mapred.min.split.size.per.node=300; // 一个节点上 split 的最小大小, 这个值决定了多个 datanode 上文件是否需要合并
- set mapred.min.split.size.per.rack=100; // 一个交换机下 split 的最小大小, 这个值决定了过个交互及上的文件是否需要合并
- set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; //Map 端输入, 合并文件之后按照 block 的大小分割(默认)
输入小文件合并可以减小 map 的个数
hive.input.format 设置:
- org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; //Map 端输入, 合并文件之后按照 block 的大小分割(默认)
- org.apache.hadoop.hive.ql.io.HiveInputFormat; //Map 端输入, 不合并 一个文件起一个 Map
新版本配置
- set mapreduce.input.fileinputformat.split.minsize=134217728
- set mapreduce.input.fileinputformat.split.maxsize=134217728;
- set mapreduce.input.fileinputformat.split.minsize.per.node=134217728;
- set mapreduce.input.fileinputformat.split.minsize.per.rack=134217728;
输出小文件合并
- set hive.merge.mapfiles=true; // 是否开启合并 Map 端输出小文件
- set hive.merge.mapredfiles=true; // 是否开启合并 Map/Reduce 端输出小文件
- set hive.merge.size.per.task=268435456; // 合并文件大小
- set hive.merge.smallfiles.avgsize=16000000; // 当输出文件的平均大小小于该值时, 启动一个独立的 map-reduce 任务进行文件 merge
并行计算参数
- set hive.exec.parallel=true; // 无关的计算可以并行调起 JOB
- set hive.exec.parallel.thread.number=8 ; // 并发提交时的并发线程的个数
map 聚合
- set hive.map.aggr=true; // 是否在 map 端进行聚合, 默认为 true
- set hive.groupby.mapaggr.checkinterval=100000; // 在 map 端聚合操作记录的数目
Group By 相关
- set hive.optimize.groupby=true; // 是否优化 group by. 默认 true
- set hive.groupby.skewindata=false ; // 决定 group by 操作是否支持倾斜的数据. 只能对单个字段聚合
分区相关
- set hive.exec.dynamic.partition = false ; // 是否打开动态分区, 默认 false
- set hive.exec.dynamic.partition.mode = strict; // 打开动态分区后, 动态分区的模式, 有 strict 和 nonstrict 两个值可选, strict 要求至少包含一个静态分区列, nonstrict 则无此要求.
- set hive.exec.max.dynamic.partitions = 1000; // 所允许的最大的动态分区的个数.
- set hive.exec.max.dynamic.partitions.pernode = 100; // 单个 reduce 结点所允许的最大的动态分区的个数.
- set hive.exec.default.partition.name = '__HIVE_DEFAULT_PARTITION__'; // 默认的动态分区的名称, 当动态分区列为''或者 null 时, 使用此名称.
本地模式相关
- set hive.exec.mode.local.auto=true; // 决定 hive 是否应该自动的根据输入文件大小, 在本地运行
- set hive.exec.mode.local.auto.inputbytes.max=134217728; // 当输入文件大小小于词阈值的时可以自动在本地模式运行, 默认 128M.
- set hive.exec.mode.local.auto.tasks.max=4; // 当任务数小于此阈值时, 可以自动在本地模式运行
- set hive.mapred.local.mem=0; // 本地模式启动 JVM 内存大小
- Strict Mode
- set hive.mapred.mode = strict; // 严格模式
- set hive.mapred.mode = nostrict; // 非严格模式
严格模式不允许执行以下操作:
分区表没有指定分区
没有 limit 限制的 order by
笛卡尔积 join 时没有 on 语句
- Map-Side Join
- set hive.auto.convert.join = true ; // 是否根据输入小表的大小, 自动将 reduce 端的 common join 转化为 map join, 将小表刷入内存中.
- set hive.mapjoin.smalltable.filesize = 2500000 ; // 刷入内存表的大小(字节)
- set hive.mapjoin.maxsize=1000000; // Map Join 所处理的最大的行数. 超过此行数, Map Join 进程会异常退出
Map Join 的计算步骤分两步, 将小表的数据变成 hashtable 广播到所有的 map 端, 将大表的数据进行合理的切分, 然后在 map 阶段的时候用大表的数据一行一行的去探测(probe) 小表的 hashtable. 如果 join key 相等, 就写入 HDFS.
map join 之所以叫做 map join 是因为它所有的工作都在 map 端进行计算.
hive 在 map join 上做了几个优化: hive 0.6 的时候默认认为写在 select 后面的是大表, 前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示进行设定. hive 0.7 的时候这个计算是自动化的, 它首先会自动判断哪个是小表, 哪个是大表, 这个参数由 (hive.auto.convert.join=true) 来控制. 然后控制小表的大小由 (hive.smalltable.filesize=25000000L) 参数控制(默认是 25M), 当小表超过这个大小, hive 会默认转化成 common join. 你可以查看 HIVE-1642. 首先小表的 Map 阶段它会将自己转化成 MapReduce Local Task , 然后从 HDFS 取小表的所有数据, 将自己转化成 Hashtable file 并压缩打包放入 DistributedCache 里面. 目前 hive 的 map join 有几个限制, 一个是它打算用 BloomFilter 来实现 hashtable , BloomFilter 大概比 hashtable 省 8-10 倍的内存, 但是 BloomFilter 的大小比较难控制. 现在 DistributedCache 里面 hashtable 默认的复制是 3 份, 对于一个有 1000 个 map 的大表来说, 这个数字太小, 大多数 map 操作都等着 DistributedCache 复制.
- Skew Join
- set hive.exec.reducers.bytes.per.reducer = 1000000000; // 也就是每个节点的 reduce 默认是处理 1G 大小的数据
- set hive.optimize.skewjoin = true; // 是否优化数据倾斜的 join, 对于倾斜的 join 会开启新的 MR job 处理, 默认 false
- set hive.skewjoin.key=100000; // 倾斜数目阈值, 超过此值则判定为一个倾斜的 Join 查询, 默认 100000.
- set hive.skewjoin.mapjoin.map.tasks=10000; // 处理数据倾斜的 Map join 的 Map 数上限
- set hive.skewjoin.mapjoin.min.split=33554432; // 处理数据倾斜 Map Join 的最小数据切分大小, 以字节为单位, 默认 32M.
hive 在运行的时候没有办法判断哪个 key 会产生多大的倾斜, 所以使用这个参数控制倾斜的阈值, 如果超过这个值, 新的值会发送给那些还没有达到的 reduce, 一般可以设置成你 (处理的总记录数 / reduce 个数) 的 2-4 倍都可以接受.
倾斜是经常会存在的, 一般 select 的层数超过 2 层, 翻译成执行计划多于 3 个以上的 mapreduce job 都很容易产生倾斜, 建议每次运行比较复杂的 sql 之前都可以设一下这个参数. 如果你不知道设置多少, 可以就按官方默认的 1 个 reduce 只处理 1G 的算法, 那么 skewkeythreshold = 1G / 平均行长. 或者默认直接设成 250000000 (差不多算平均行长 4 个字节)
推测执行
- set mapred.map.tasks.speculative.execution=true;
- set mapred.reduce.tasks.speculative.execution=true;
- set hive.mapred.reduce.tasks.speculative.execution=true;
推测执行 (Speculative Execution) 是指在集群环境下运行 MapReduce, 可能是程序 Bug, 负载不均或者其他的一些问题, 导致在一个 JOB 下的多个 TASK 速度不一致, 比如有的任务已经完成, 但是有些任务可能只跑了 10%, 根据木桶原理, 这些任务将成为整个 JOB 的短板, 如果集群启动了推测执行, 这时为了最大限度的提高短板, Hadoop 会为该 task 启动备份任务, 让 speculative task 与原始 task 同时处理一份数据, 哪个先运行完, 则将谁的结果作为最终结果, 并且在运行完成后 Kill 掉另外一个任务.
推测执行的目的是减少作业执行时间, 但这是以集群效率为代价的, 在一个繁忙的集群中, 推测执行会减少整体的吞吐量, 因为冗余的任务执行时会减少作业的执行时间, 因此一些集群管理员倾向于在集群上关闭这个选项, 而让用户根据个别的作业需要而开启该功能.
不建议开启推测执行机制的场景:
Task 之间存在固有的不均衡特点, 比如有的 Reduce Task 处理的数据量远大于其他几个
不允许一个任务存在多个实例, 比如将 MapReduce 结果写到 Mysql 数据库中, 不允许一条数据写多分产生冗余
hive 使用 spark 引擎
- set spark.master=yarn;
- set hive.execution.engine=spark;
- set spark.serializer=org.apache.spark.serializer.KryoSerializer;
- set spark.executor.memory=5g;
- set spark.executor.cores=4;
- set spark.executor.instances=10;
其他
- set hive.optimize.cp=true; // 列裁剪
- set hive.optimize.prunner; // 分区裁剪
- set hive.limit.optimize.enable=true; // 优化 LIMIT n 语句
- set hive.limit.row.max.size=1000000;
- set hive.limit.optimize.limit.file=10; // 最大文件数
- -------- END --------
来源: http://mp.weixin.qq.com/s/XfZ9kkS_kWp4htxHO5StyQ