1. Mapper 与 Reducer 数量
对于一个默认的 MapReduce Job 来说, map 任务的数量等于输入文件被划分成的分块数, 这个取决于输入文件的大小以及文件块的大小(如果此文件在 HDFS 中). 但是对于 reduce 的任务, 并不会自动决定 reducer 数目的大小, 若未指定, 则默认为 1. 例如:
但单个 reducer 任务执行效率不尽人意, 在实际场景中会将它设置为一个较大的数值. 此时, 决定 Key 条目被送往哪个 reducer 由方法 setPartitionerClass() 指定: job.setPartitionerClass(HashPartitioner.class);
默认为 HashPartitioner, 它会将每条 Key 做 Hash, 然后与最大的整型值做一次按位与操作, 以得到一个非负整数. 然后对分区数做取模 (mod) 操作, 将 key 分配到其中一个分区. 这里的分区数即为 reducer 数目. HashPartitioner 源码如下:
- public class HashPartitioner<K, V> extends Partitioner<K, V> {
- public HashPartitioner() {
- }
- public int getPartition(K key, V value, int numReduceTasks) {
- return (key.hashCode() & 2147483647) % numReduceTasks;
- }
- }
若是为 reducer 数目设置为默认值 1, 则所有的中间数据都会被放入到一个 reducer 中, 作业处理效率会非常低效. 若是设置了过大的值, 则每个 reducer 都会输出一个文件, 会导致过多的小文件.
在为一个任务选择多少个 reducer 个数时, 应遵循的原则为: 目标 reducer 保持在每个运行 5 分钟左右, 且产生至少一个 HDFS 块的输出比较合适.
记录在发送给 reducer 之前, 会被 MapReduce 系统进行排序. 因此输入文件中的行会被交叉放入一个合并后的输出文件.
2. 输入格式
我们已经了解到 map 的输入是分片(split), 一个分片对应一个 mapper, 且仅被一个 mapper 处理. 分片里面是多条记录(item)."输入分片" 在 Hadoop 中以 InputSplit 接口的方式提供:
- public interface InputSplit extends Writable {
- long getLength() throws IOException;
- String[] getLocations() throws IOException;
- }
它包含两个方法, 分别为 getLength() 与 getLocations(). 其中 getLength() 用于获取数据的长度 (以字节为单位);getLocations() 用于获取一组存储位置(也就是一组主机名). 其中 getLocations() 的返回值由 mapreduce 系统获取后, 实现 data locality, 也就是尽量将 map 任务放在离数据节点近的地方. 而 getLength() 的返回值用于排序分片, 将最大的分片优先处理, 以最小化整个作业运行的时间.
InputSplit(mapreduce 中的分片)由 InputFormat 创建, 它负责创建输入分片, 并将它们分成一条条记录(item). 首先简单看一下 InputFormat 抽象类:
- public abstract class InputFormat<K, V> {
- public InputFormat() {
- }
- public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
- public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
- }
这里 getSplits() 方法计算分片, 然后将计算得到的 List 结果发给 application master.Application master 根据其分片所在节点信息, 调度 map 任务到离分片数据最近的节点. 在 map 任务端, 会把输入分片传给 InputFormat 的 createRecordReader() 方法, 此方法会返回一个 RecordReader 对象, 用于迭代读取这个分片上的记录(item), 并生成记录的键值对, 之后传递给 map 函数. 通过查看 Mapper 类中的 run() 方法, 更好的了解此过程:
- public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
- this.setup(context);
- try {
- while(context.nextKeyValue()) {
- this.map(context.getCurrentKey(), context.getCurrentValue(), context);
- }
- } finally {
- this.cleanup(context);
- }
- }
这里, 先运行 setup() 操作, 然后从 context 不断迭代获取分片的内容, 并传给 map() 方法, 并由 map() 方法进一步对 key-value 对进行处理.
3. FileInputFormat 类
在 Hadoop 中, 数据源一般为文件, 而 FileInputFormat 类就是用于处理数据源为文件的一个(继承于)InputFormat 类:
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
可以看到它是一个抽象类, 它的实现类有 CombineFileInputFormat,TextInputFormat,KeyValueTextInputFormat,NLineInputFormat 以及 SequenceFileInputFormat.
FileInputFormat 类提供两个功能: 1. 指出作业的输入文件位置; 2. 为输入文件生成分片的代码实现.
在 FileInputFormat 中, 作业的输入可以是一个文件, 一个目录, 也可以是目录与文件的集合. 它提供了四种方法用于设置 Job 的输入路径:
- public static void addInputPath(JobConf conf, Path path)
- public static void addInputPaths(JobConf conf, String commaSeparatedPaths)
- public static void setInputPaths(JobConf conf, Path... inputPaths)
- public static void setInputPaths(JobConf conf, String commaSeparatedPaths)
其中 addInputPath() 和 addInputPaths() 用于添加路径, 以构成路径列表. 而 setInputPath() 用于一次性设置完整的路径列表(会替换前面所有路径设置).
在设置路径后, 也可以指定需要排除的特定文件, 此功能由 setInputPathFilter() 实现:
- public static void setInputPathFilter(JobConf conf, Class<? extends PathFilter> filter) {
- conf.setClass("mapreduce.input.pathFilter.class", filter, PathFilter.class);
- }
它可以设置一个过滤器 PathFilter, 默认的实现是过滤掉隐藏文件(以 . 和 _ 开头的文件). 如果通过 setInputPathFilter() 设置过滤器, 它会在默认过滤器的基础上进行过滤, 也就是说, 仅会在非隐藏文件中再次进行过滤.
输入路径的设置可以通过属性与选项进行配置, 在属性配置中相关配置为:
- mapreduce.input.fileinputformat.inputdir (逗号分隔属性, 无默认值)
- mapreduce.input.pathFilter.class (PathFilter 类名, 无默认值)
4. FileInputFormat 类处理输入分片
在设置了一组文件后, FileInputFormat 会将文件转换为输入分片. 这里需要注意的是: 在 HDFS 中, 一个文件可以占用 (分布到) 多个 block, 但是不会存在一个 block 中存多个文件. 对于小文件 (小于一个 HDFS 块大小的文件) 来说, 一个文件就是占用一个 block, 但是不会占据整个 block 的空间. 例如, 当一个 1MB 的文件存储在一个 128MB 的块中时, 文件只使用 1MB 的磁盘空间, 而不是 128MB).
FileInputFormat 只分割大文件, 也就是文件超过 HDFS 块的大小. 在 FileInputFormat 中, 控制分片大小的属性有:
mapreduce.input.fileinputformat.split.minsize 一个文件分片最小的有效字节数(int 类型), 默认值为 1(字节)
mapreduce.input.fileinputformat.split.maxsize 一个文件分片中最大的有效字节数(long 类型), 默认值为 Long.MAX_VALUE, 即 9223372036854775807
dfs.blocksize HDFS 中的块大小(按字节), 默认为 128MB(即 134217728)
最小分片通常为 1 个字节, 用户可以设置最小分片的大小超过 HDFS 块大小, 这样会强制分片比 HDFS 块大. 但是如果数据存储在 HDFS 上, 则这样对 data locality 来说, 并不友好, 以至于延长任务执行时间.
最大分片默认是 Java Long 类型的最大值, 只有把它的值设置为小于 HDFS Block 大小才有效, 此时会强制分片比块小.
在 FileInputFormat 中, 分片的大小由以下公式计算:
- protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
- }
其中参数部分为:
- long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
- long maxSize = getMaxSplitSize(job);
- protected long getFormatMinSplitSize() {
- return 1L;
- }
- public static long getMinSplitSize(JobContext job) {
- return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
- }
- public static long getMaxSplitSize(JobContext context) {
- return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
- }
minSize 若未指定, 则默认为 1.MaxSize 默认为 Java Long 类型最大值. 再计算时, 先取 maxSize 和 blockSize 的最小值, 然后再取结果与 minSize 的最大值.
在默认情况下: minSize <blockSize < maxSize
所以分片的大小就是 blockSize 大小.
5. 小文件与 CombineFileInputFormat
相对于大量的小文件, Hadoop 更适合处理少量的大文件. 其中一个原因是: 对于每个小文件(远小于 HDFS 块大小),FileInputFormat 都会生成一个分片(生成的分片要么是文件的整个内容, 要么是文件的部分内容), 这样会产生大量的 map 任务, 并且每个 map 任务仅处理一小部分数据, 这样会导致任务执行效率低下, 时间过长.
CombineFileInputFormat 可以缓解此问题, 它针对小文件而设计. FileInputFormat 为每个小文件产生一个分片, 而 CombineFileInpurtFormat 把多个文件打包到一个分片中, 以便于每个 mapper 可以处理更多的数据. 更重要的是: CombineFileInputFormat 在分配多个 block 到同一个 split 时, 会考虑到 node locality 以及 Rack locality. 所以它的速度在一个典型的 mr 任务中, 处理输入的速度并不会下降.
不过尽可能要避免小文件过多的情况, 原因有:
1. 处理小文件会增加运行作业而必须的寻址次数
2. 浪费 namenode 的内存
可以尝试使用顺序文件 (sequence file) 将这些小文件合并成一个或多个大文件: 例如将文件名作为 key, 文件内容作为 value. 但是如果集群里已经有了大量小文件, 可以尝试一下 CombineFileInputFormat 方法.
CombinedFileInputFormat 不仅处理小文件有好处, 处理大文件时也有益处. 例如, 如果 mapper 在处理一个 block 时仅花费很少的时间, 则可以考虑使用 CombineFileInputFormat, 并将 maximum split size 设置为 HDFS block 大小的几倍(参数为 mapred.max.split.size). 这样每个 mapper 会处理多个 block, 使得整个处理时间下降.
6. 避免分片
有时候可能需要计算整个文件里的顺序关系, 这种任务无法分布式处理, 所以只能让文件由一个 mapper 处理, 此时需要避免文件被分片.
有两种方式可以避免文件被分片, 而是当作一个单独分片处理:
1. 设置最小分片大小 split.minsize 为 Java Long 类型最大值(long.MAX_VALUE)
2. 使用 FileInputFormat 具体子类时, 重写 isSplitable() 方法, 把返回值设置为 false
使用第二种方法时, 以 TextInputFormat 类为例:
- public class TextInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {
- private CompressionCodecFactory compressionCodecs = null;
- public TextInputFormat() {
- }
- public void configure(JobConf conf) {
- this.compressionCodecs = new CompressionCodecFactory(conf);
- }
- protected boolean isSplitable(FileSystem fs, Path file) {
- CompressionCodec codec = this.compressionCodecs.getCodec(file);
- return null == codec ? true : codec instanceof SplittableCompressionCodec;
- }
- ....
- }
默认会根据 CompressionCodec 类型判断是否切分, 也可以直接指定 return 为 false, 使得输入文件不可切分.
References:Hadoop 权威指南第 4 版
来源: https://www.cnblogs.com/zackstang/p/10776857.html