Hadoop对文本文件的快速全局排序实现方法及分析
一、背景
Hadoop中实现了用于全局排序的InputSampler类和TotalOrderPartitioner类,调用示例是org.apache.hadoop.examples.Sort。
但是当我们以Text文件作为输入时,结果并非按Text中的string列排序,而且输出结果是SequenceFile。
原因:
1)hadoop在处理Text文件时,key是行号LongWritable类型,InputSampler抽样的是key,TotalOrderPartitioner也是用key去查找分区。这样,抽样得到的partition文件是对行号的抽样,结果自然是根据行号来排序。
2)大数据量时,InputSampler抽样速度会非常慢。比如,RandomSampler需要遍历所有数据,IntervalSampler需要遍历文件数与splits数一样。SplitSampler效率比较高,但它只抽取每个文件前面的记录,不适合应用于文件内有序的情况。
二、功能
1.实现了一种局部抽样方法PartialSampler,适用于输入数据各文件是独立同分布的情况
2.使RandomSampler、IntervalSampler、SplitSampler支持对文本的抽样
3.实现了针对Text文件string列的TotalOrderPartitioner
三、实现
1.PartialSampler
PartialSampler从第一份输入数据中随机抽取第一列文本数据。PartialSampler有两个属性:freq(采样频率),numSamples(采样总数)。
publicK[]getSample(InputFormatinf,JobConfjob)throwsIOException{ InputSplit[]splits=inf.getSplits(job,job.getNumMapTasks()); ArrayList samples=newArrayList (numSamples); Randomr=newRandom(); longseed=r.nextLong(); r.setSeed(seed); LOG.debug("seed:"+seed); //对splits【0】抽样 for(inti=0;i<1;i++){ System.out.println("PartialSamplerwillgetSamplesplits["+i+"]"); RecordReader reader=inf.getRecordReader(splits[i],job, Reporter.NULL); Kkey=reader.createKey(); Vvalue=reader.createValue(); while(reader.next(key,value)){ if(r.nextDouble()<=freq){ if(samples.size() 首先通过InputFormat的getSplits方法得到所有的输入分区;
然后扫描第一个分区中的记录进行采样。
记录采样的具体过程如下:
从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq
如果大于则放弃这条记录;
如果小于,则判断当前的采样数是否小于最大采样数,
如果小于则这条记录被选中,被放进采样集合中;
否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。
然后依次遍历分区中的其它记录。
note:
1)PartialSampler只适用于输入数据各文件是独立同分布的情况。
2)自带的三种Sampler通过修改samples.add(key)为samples.add((K)value0);也可以实现对第一列的抽样。
2.TotalOrderPartitioner
TotalOrderPartitioner主要改进了两点:
1)读partition时指定keyClass为Text.class
因为partition文件中的key类型为Text
在configure函数中,修改:
//ClasskeyClass=(Class )job.getMapOutputKeyClass(); Class keyClass=(Class )Text.class; 2)查找分区时,改用value查
publicintgetPartition(Kkey,Vvalue,intnumPartitions){ Textvalue0=newText(value.toString().split("\t")[0]); returnpartitions.findPartition((K)value0); }3.Sort
1)设置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass
2)初始化InputSampler对象,抽样
3)partitionFile通过CacheFile传给TotalOrderPartitioner,执行MapReduce任务
ClassinputFormatClass=TextInputFormat.class; ClassoutputFormatClass=TextOutputFormat.class; ClassoutputKeyClass=Text.class; ClassoutputValueClass=Text.class; jobConf.setMapOutputKeyClass(LongWritable.class); //Setuser-supplied(possiblydefault)jobconfigs jobConf.setNumReduceTasks(num_reduces); jobConf.setInputFormat(inputFormatClass); jobConf.setOutputFormat(outputFormatClass); jobConf.setOutputKeyClass(outputKeyClass); jobConf.setOutputValueClass(outputValueClass); if(sampler!=null){ System.out.println("Samplinginputtoeffecttotal-ordersort..."); jobConf.setPartitionerClass(TotalOrderPartitioner.class); PathinputDir=FileInputFormat.getInputPaths(jobConf)[0]; inputDir=inputDir.makeQualified(inputDir.getFileSystem(jobConf)); //PathpartitionFile=newPath(inputDir,"_sortPartitioning"); TotalOrderPartitioner.setPartitionFile(jobConf,partitionFile); InputSampler.writePartitionFile(jobConf,sampler); URIpartitionUri=newURI(partitionFile.toString()+"#"+"_sortPartitioning"); DistributedCache.addCacheFile(partitionUri,jobConf); DistributedCache.createSymlink(jobConf); } FileSystemhdfs=FileSystem.get(jobConf); hdfs.delete(outputpath); hdfs.close(); System.out.println("Runningon"+ cluster.getTaskTrackers()+ "nodestosortfrom"+ FileInputFormat.getInputPaths(jobConf)[0]+"into"+ FileOutputFormat.getOutputPath(jobConf)+ "with"+num_reduces+"reduces."); DatestartTime=newDate(); System.out.println("Jobstarted:"+startTime); jobResult=JobClient.runJob(jobConf); 四、执行
usage:
hadoopjaryitengfei.jarcom.yitengfei.Sort[-m
][-r ]
[-splitRandom|//Samplefromrandomsplitsatrandom(general)
-splitSample|//Samplefromfirstrecordsinsplits(randomdata)
-splitInterval]//Samplefromsplitsatintervals(sorteddata)
-splitPartial|//Samplefrompartialsplitsatrandom(general)]
Example:
hadoopjaryitengfei.jarcom.yitengfei.Sort-r10-splitPartial0.11000010/user/rp-rd/yitengfei/sample/input/user/rp-rd/yitengfei/sample/output/user/rp-rd/yitengfei/sample/partition
五、性能
200G输入数据,15亿条url,1000个分区,排序时间只用了6分钟
总结 以上就是本文关于Hadoop对文本文件的快速全局排序实现方法及分析的全部内容,希望对大家有所帮助,感兴趣的朋友可以继续参阅本站:hadoop重新格式化HDFS步骤解析、浅谈七种常见的Hadoop和Spark项目案例。如有不足之处,欢迎留言指出,感谢朋友们对本站的支持!