一. Mapreduce 中的 Combiner
- package com.gec.demo;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
- public class WcCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
- private IntWritable sum=new IntWritable();
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- int count=0;
- for (IntWritable value : values) {
- count+=value.get();
- }
- sum.set(count);
- context.write(key,sum);
- }
- }
在 job 类中声明如下:
二. MapTask 工作机制
主要的核心类:
读:
- FileInputFormat
- TextInputFormat
- createRecordReader
- LineRecordReader
- nextKeyValue
写:
- context.write
- RecordWriter.write(k,value)
- NewOutputCollector.write(key,value)
- MapOutputCollector.collect(key,value,partitions)
- MapOutputBuffer.collect(key,value,partitions)
核心 map 输出源代码分析类
NewOutputCollector 类
构造器:
实例化 MapOutputBuffer 对象
调用 MapOutputBuffer 对象 init 方法
将 MapOutputBuffer 对象赋值给 collector 对象
解决分区值问题
- // 如果没有自定义分区类, 则默认使用 HashPartitioner
- partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
- ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
环形缓冲区实现原理
MapOutputBuffer 实现缓冲区的核心实现
在这一头存储 key 和 value,key 和 value 依次排列, 而那一头存储索引, 向中间出发, 当储存的空间占比百分之八十的时候, 则溢出, 两者的方向改变, 分别开始从另外一头开始存储
如上图, 从赤道分别向不同方向出发
如上图, 到达溢出时, 产生新赤道, 又分别从新赤道往回走
init 方法
1, 分配溢出比
- final float spillper =
- job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
2, 分配环形缓存区的大小
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
3, 实例化快排对象
- sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
- QuickSort.class, IndexedSorter.class), job);
4, 定义环形缓存区数组
- kvmeta = ByteBuffer.wrap(kvbuffer)
- .order(ByteOrder.nativeOrder())
- .asIntBuffer();
5, 开始化赤道
setEquator(0);
6, 获取 key 的比较器对象
comparator = job.getOutputKeyComparator();
7, 是否定义 combineCollector 对象
8, 启动 spillThread 线程, 监听溢出比, 触发此 sortAndSpill()
对接 Reducer 类的方法:
reduce(Text key, Iterable<IntWritable> values, Context context)
ReduceTask 工作机制
三. shuffer
shuffer 缓存流程
来源: http://www.bubuko.com/infodetail-2983466.html