MapTask阶段shuffle源码分析
1.收集阶段
在Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollector的wirte方法
publicvoidwrite(KEYOUTkey,VALUEOUTvalue
)throwsIOException,InterruptedException{
output.write(key,value);
}
实际调用的是MapOutPutBuffer的collect(),在进行收集前,调用partitioner来计算每个key-value的分区号
@Override
publicvoidwrite(Kkey,Vvalue)throwsIOException,InterruptedException{
collector.collect(key,value,
partitioner.getPartition(key,value,partitions));
}
2.NewOutPutCollector对象的创建
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContextjobContext,
JobConfjob,
TaskUmbilicalProtocolumbilical,
TaskReporterreporter
)throwsIOException,ClassNotFoundException{
//创建实际用来收集key-value的缓存区对象
collector=createSortingCollector(job,reporter);
//获取总的分区个数
partitions=jobContext.getNumReduceTasks();
if(partitions>1){
partitioner=(org.apache.hadoop.mapreduce.Partitioner)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(),job);
}else{
//默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区
partitioner=neworg.apache.hadoop.mapreduce.Partitioner(){
@Override
publicintgetPartition(Kkey,Vvalue,intnumPartitions){
returnpartitions-1;
}
};
}
}
3.创建环形缓冲区对象
@SuppressWarnings("unchecked")
privateMapOutputCollector
createSortingCollector(JobConfjob,TaskReporterreporter)
throwsIOException,ClassNotFoundException{
MapOutputCollector.Contextcontext=
newMapOutputCollector.Context(this,job,reporter);
//从当前Job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用MapOutputBuffer.class
Class>[]collectorClasses=job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,MapOutputBuffer.class);
intremainingCollectors=collectorClasses.length;
ExceptionlastException=null;
for(Classclazz:collectorClasses){
try{
if(!MapOutputCollector.class.isAssignableFrom(clazz)){
thrownewIOException("Invalidoutputcollectorclass:"+clazz.getName()+
"(doesnotimplementMapOutputCollector)");
}
Classsubclazz=
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Tryingmapoutputcollectorclass:"+subclazz.getName());
//创建缓冲区对象
MapOutputCollectorcollector=
ReflectionUtils.newInstance(subclazz,job);
//创建完缓冲区对象后,执行初始化
collector.init(context);
LOG.info("Mapoutputcollectorclass="+collector.getClass().getName());
returncollector;
}catch(Exceptione){
Stringmsg="UnabletoinitializeMapOutputCollector"+clazz.getName();
if(--remainingCollectors>0){
msg+="("+remainingCollectors+"morecollector(s)totry)";
}
lastException=e;
LOG.warn(msg,e);
}
}
thrownewIOException("Initializationofallthecollectorsfailed."+
"Errorinlastcollectorwas:"+lastException.getMessage(),lastException);
}
3.MapOutPutBuffer的初始化 环形缓冲区对象
@SuppressWarnings("unchecked")
publicvoidinit(MapOutputCollector.Contextcontext
)throwsIOException,ClassNotFoundException{
job=context.getJobConf();
reporter=context.getReporter();
mapTask=context.getMapTask();
mapOutputFile=mapTask.getMapOutputFile();
sortPhase=mapTask.getSortPhase();
spilledRecordsCounter=reporter.getCounter(TaskCounter.SPILLED_RECORDS);
//获取分区总个数,取决于ReduceTask的数量
partitions=job.getNumReduceTasks();
rfs=((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanitychecks
//从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8
finalfloatspillper=
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(float)0.8);
//获取mapreduce.task.io.sort.mb,如果没设置,就是100MB
finalintsortmb=job.getInt(JobContext.IO_SORT_MB,100);
indexCacheMemoryLimit=job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if(spillper>(float)1.0||spillper<=(float)0.0){
thrownewIOException("Invalid\""+JobContext.MAP_SORT_SPILL_PERCENT+
"\":"+spillper);
}
if((sortmb&0x7FF)!=sortmb){
thrownewIOException(
"Invalid\""+JobContext.IO_SORT_MB+"\":"+sortmb);
}
//在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引
sorter=ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class,IndexedSorter.class),job);
//buffersandaccounting
intmaxMemUsage=sortmb<<20;
maxMemUsage-=maxMemUsage%METASIZE;
//存放key-value
kvbuffer=newbyte[maxMemUsage];
bufvoid=kvbuffer.length;
//存储key-value的属性信息,分区号,索引等
kvmeta=ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
setEquator(0);
bufstart=bufend=bufindex=equator;
kvstart=kvend=kvindex;
maxRec=kvmeta.capacity()/NMETA;
softLimit=(int)(kvbuffer.length*spillper);
bufferRemaining=softLimit;
LOG.info(JobContext.IO_SORT_MB+":"+sortmb);
LOG.info("softlimitat"+softLimit);
LOG.info("bufstart="+bufstart+";bufvoid="+bufvoid);
LOG.info("kvstart="+kvstart+";length="+maxRec);
//k/vserialization
//获取快速排序的Key的比较器,排序只按照key进行排序!
comparator=job.getOutputKeyComparator();
//获取key-value的序列化器
keyClass=(Class)job.getMapOutputKeyClass();
valClass=(Class)job.getMapOutputValueClass();
serializationFactory=newSerializationFactory(job);
keySerializer=serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer=serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
//outputcounters
mapOutputByteCounter=reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter=
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter=reporter
.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
//溢写到磁盘,可以使用一个压缩格式!获取指定的压缩编解码器
//compression
if(job.getCompressMapOutput()){
ClasscodecClass=
job.getMapOutputCompressorClass(DefaultCodec.class);
codec=ReflectionUtils.newInstance(codecClass,job);
}else{
codec=null;
}
//获取Combiner组件
//combiner
finalCounters.CountercombineInputCounter=
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner=CombinerRunner.create(job,getTaskID(),
combineInputCounter,
reporter,null);
if(combinerRunner!=null){
finalCounters.CountercombineOutputCounter=
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector=newCombineOutputCollector(combineOutputCounter,reporter,job);
}else{
combineCollector=null;
}
spillInProgress=false;
minSpillsForCombine=job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS,3);
//设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程!
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try{
//启动线程
spillThread.start();
while(!spillThreadRunning){
spillDone.await();
}
}catch(InterruptedExceptione){
thrownewIOException("Spillthreadfailedtoinitialize",e);
}finally{
spillLock.unlock();
}
if(sortSpillException!=null){
thrownewIOException("Spillthreadfailedtoinitialize",
sortSpillException);
}
}
4.Paritionner的获取
从配置中读取mapreduce.job.partitioner.class,如果没有指定,采用HashPartitioner.class
如果reduceTask>1,还没有设置分区组件,使用HashPartitioner
@SuppressWarnings("unchecked")
publicClass>getPartitionerClass()
throwsClassNotFoundException{
return(Class>)
conf.getClass(PARTITIONER_CLASS_ATTR,HashPartitioner.class);
}
publicclassHashPartitionerextendsPartitioner { /**Use{@linkObject#hashCode()}topartition.**/ publicintgetPartition(Kkey,Vvalue, intnumReduceTasks){ return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks; } }
分区号的限制:0<=分区号<总的分区数(reduceTask的个数)
if(partition<0||partition>=partitions){
thrownewIOException("Illegalpartitionfor"+key+"("+
partition+")");
}
5.MapTaskshuffle的流程
①在map()调用context.write()
②调用MapoutPutBuffer的collect()
- 调用分区组件Partitionner计算当前这组key-value的分区号
③将当前key-value收集到MapOutPutBuffer中
- 如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!
④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!
- 排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!
⑤开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中
- 如果没有定义Combiner,直接溢写!
- 如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理后再次溢写!
⑥多次溢写后,每次溢写都会产生一个临时文件
⑦最后,执行一次flush(),将剩余的key-value进行溢写
⑧MergeParts:将多次溢写的结果,保存为一个总的文件!
- 在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!
- 如果定义了Conbiner,Conbiner会再次运行(前提是溢写的文件个数大于3)!
- 否则,就直接溢写!
⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据
6.Combiner
combiner其实就是Reducer类型:
Class>cls= (Class>)job.getCombinerClass();
Combiner的运行时机:
MapTask:
- ①每次溢写前,如果指定了Combiner,会运行
- ②将多个溢写片段,进行合并为一个最终的文件时,也会运行Combiner,前提是片段数>=3
ReduceTask:
③reduceTask在运行时,需要启动shuffle进程拷贝MapTask产生的数据!
- 数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!
- 数据过多,内部不够,将部分数据溢写在磁盘!
- 如果有溢写的过程,那么combiner会再次运行!
①一定会运行,②,③需要条件!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。如果你想了解更多相关内容请查看下面相关链接