MapTask阶段shuffle源码分析
1.收集阶段
在Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollector的wirte方法
publicvoidwrite(KEYOUTkey,VALUEOUTvalue )throwsIOException,InterruptedException{ output.write(key,value); }
实际调用的是MapOutPutBuffer的collect(),在进行收集前,调用partitioner来计算每个key-value的分区号
@Override publicvoidwrite(Kkey,Vvalue)throwsIOException,InterruptedException{ collector.collect(key,value, partitioner.getPartition(key,value,partitions)); }
2.NewOutPutCollector对象的创建
@SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContextjobContext, JobConfjob, TaskUmbilicalProtocolumbilical, TaskReporterreporter )throwsIOException,ClassNotFoundException{ //创建实际用来收集key-value的缓存区对象 collector=createSortingCollector(job,reporter); //获取总的分区个数 partitions=jobContext.getNumReduceTasks(); if(partitions>1){ partitioner=(org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils.newInstance(jobContext.getPartitionerClass(),job); }else{ //默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区 partitioner=neworg.apache.hadoop.mapreduce.Partitioner (){ @Override publicintgetPartition(Kkey,Vvalue,intnumPartitions){ returnpartitions-1; } }; } }
3.创建环形缓冲区对象
@SuppressWarnings("unchecked") privateMapOutputCollector createSortingCollector(JobConfjob,TaskReporterreporter) throwsIOException,ClassNotFoundException{ MapOutputCollector.Contextcontext= newMapOutputCollector.Context(this,job,reporter); //从当前Job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用MapOutputBuffer.class Class>[]collectorClasses=job.getClasses( JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,MapOutputBuffer.class); intremainingCollectors=collectorClasses.length; ExceptionlastException=null; for(Classclazz:collectorClasses){ try{ if(!MapOutputCollector.class.isAssignableFrom(clazz)){ thrownewIOException("Invalidoutputcollectorclass:"+clazz.getName()+ "(doesnotimplementMapOutputCollector)"); } Classsubclazz= clazz.asSubclass(MapOutputCollector.class); LOG.debug("Tryingmapoutputcollectorclass:"+subclazz.getName()); //创建缓冲区对象 MapOutputCollector collector= ReflectionUtils.newInstance(subclazz,job); //创建完缓冲区对象后,执行初始化 collector.init(context); LOG.info("Mapoutputcollectorclass="+collector.getClass().getName()); returncollector; }catch(Exceptione){ Stringmsg="UnabletoinitializeMapOutputCollector"+clazz.getName(); if(--remainingCollectors>0){ msg+="("+remainingCollectors+"morecollector(s)totry)"; } lastException=e; LOG.warn(msg,e); } } thrownewIOException("Initializationofallthecollectorsfailed."+ "Errorinlastcollectorwas:"+lastException.getMessage(),lastException); }
3.MapOutPutBuffer的初始化 环形缓冲区对象
@SuppressWarnings("unchecked") publicvoidinit(MapOutputCollector.Contextcontext )throwsIOException,ClassNotFoundException{ job=context.getJobConf(); reporter=context.getReporter(); mapTask=context.getMapTask(); mapOutputFile=mapTask.getMapOutputFile(); sortPhase=mapTask.getSortPhase(); spilledRecordsCounter=reporter.getCounter(TaskCounter.SPILLED_RECORDS); //获取分区总个数,取决于ReduceTask的数量 partitions=job.getNumReduceTasks(); rfs=((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); //sanitychecks //从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8 finalfloatspillper= job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(float)0.8); //获取mapreduce.task.io.sort.mb,如果没设置,就是100MB finalintsortmb=job.getInt(JobContext.IO_SORT_MB,100); indexCacheMemoryLimit=job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if(spillper>(float)1.0||spillper<=(float)0.0){ thrownewIOException("Invalid\""+JobContext.MAP_SORT_SPILL_PERCENT+ "\":"+spillper); } if((sortmb&0x7FF)!=sortmb){ thrownewIOException( "Invalid\""+JobContext.IO_SORT_MB+"\":"+sortmb); } //在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引 sorter=ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class,IndexedSorter.class),job); //buffersandaccounting intmaxMemUsage=sortmb<<20; maxMemUsage-=maxMemUsage%METASIZE; //存放key-value kvbuffer=newbyte[maxMemUsage]; bufvoid=kvbuffer.length; //存储key-value的属性信息,分区号,索引等 kvmeta=ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(0); bufstart=bufend=bufindex=equator; kvstart=kvend=kvindex; maxRec=kvmeta.capacity()/NMETA; softLimit=(int)(kvbuffer.length*spillper); bufferRemaining=softLimit; LOG.info(JobContext.IO_SORT_MB+":"+sortmb); LOG.info("softlimitat"+softLimit); LOG.info("bufstart="+bufstart+";bufvoid="+bufvoid); LOG.info("kvstart="+kvstart+";length="+maxRec); //k/vserialization //获取快速排序的Key的比较器,排序只按照key进行排序! comparator=job.getOutputKeyComparator(); //获取key-value的序列化器 keyClass=(Class)job.getMapOutputKeyClass(); valClass=(Class )job.getMapOutputValueClass(); serializationFactory=newSerializationFactory(job); keySerializer=serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer=serializationFactory.getSerializer(valClass); valSerializer.open(bb); //outputcounters mapOutputByteCounter=reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter= reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter=reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); //溢写到磁盘,可以使用一个压缩格式!获取指定的压缩编解码器 //compression if(job.getCompressMapOutput()){ ClasscodecClass= job.getMapOutputCompressorClass(DefaultCodec.class); codec=ReflectionUtils.newInstance(codecClass,job); }else{ codec=null; } //获取Combiner组件 //combiner finalCounters.CountercombineInputCounter= reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner=CombinerRunner.create(job,getTaskID(), combineInputCounter, reporter,null); if(combinerRunner!=null){ finalCounters.CountercombineOutputCounter= reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector=newCombineOutputCollector (combineOutputCounter,reporter,job); }else{ combineCollector=null; } spillInProgress=false; minSpillsForCombine=job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS,3); //设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程! spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try{ //启动线程 spillThread.start(); while(!spillThreadRunning){ spillDone.await(); } }catch(InterruptedExceptione){ thrownewIOException("Spillthreadfailedtoinitialize",e); }finally{ spillLock.unlock(); } if(sortSpillException!=null){ thrownewIOException("Spillthreadfailedtoinitialize", sortSpillException); } }
4.Paritionner的获取
从配置中读取mapreduce.job.partitioner.class,如果没有指定,采用HashPartitioner.class
如果reduceTask>1,还没有设置分区组件,使用HashPartitioner
@SuppressWarnings("unchecked") publicClass>getPartitionerClass() throwsClassNotFoundException{ return(Class>) conf.getClass(PARTITIONER_CLASS_ATTR,HashPartitioner.class); }
publicclassHashPartitionerextendsPartitioner { /**Use{@linkObject#hashCode()}topartition.**/ publicintgetPartition(Kkey,Vvalue, intnumReduceTasks){ return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks; } }
分区号的限制:0<=分区号<总的分区数(reduceTask的个数)
if(partition<0||partition>=partitions){ thrownewIOException("Illegalpartitionfor"+key+"("+ partition+")"); }
5.MapTaskshuffle的流程
①在map()调用context.write()
②调用MapoutPutBuffer的collect()
- 调用分区组件Partitionner计算当前这组key-value的分区号
③将当前key-value收集到MapOutPutBuffer中
- 如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!
④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!
- 排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!
⑤开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中
- 如果没有定义Combiner,直接溢写!
- 如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理后再次溢写!
⑥多次溢写后,每次溢写都会产生一个临时文件
⑦最后,执行一次flush(),将剩余的key-value进行溢写
⑧MergeParts:将多次溢写的结果,保存为一个总的文件!
- 在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!
- 如果定义了Conbiner,Conbiner会再次运行(前提是溢写的文件个数大于3)!
- 否则,就直接溢写!
⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据
6.Combiner
combiner其实就是Reducer类型:
Class>cls= (Class>)job.getCombinerClass();
Combiner的运行时机:
MapTask:
- ①每次溢写前,如果指定了Combiner,会运行
- ②将多个溢写片段,进行合并为一个最终的文件时,也会运行Combiner,前提是片段数>=3
ReduceTask:
③reduceTask在运行时,需要启动shuffle进程拷贝MapTask产生的数据!
- 数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!
- 数据过多,内部不够,将部分数据溢写在磁盘!
- 如果有溢写的过程,那么combiner会再次运行!
①一定会运行,②,③需要条件!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。如果你想了解更多相关内容请查看下面相关链接