hadoop二次排序的原理和实现方法
默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用到二次排序了。下面我们来说说二次排序
1、二次排序原理
我们把二次排序分为以下几个阶段
Map起始阶段
在Map阶段,使用job.setInputFormatClass()定义的InputFormat,将输入的数据集分割成小数据块split,同时InputFormat提供一个RecordReader的实现。在这里我们使用的是TextInputFormat,它提供的RecordReader会将文本的行号作为Key,这一行的文本作为Value。这就是自定Mapper的输入是
Map最后阶段
在Map阶段的最后,会先调用job.setPartitionerClass()对这个Mapper的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置Key比较函数类,则使用Key实现的compareTo()方法
Reduce阶段
在Reduce阶段,reduce()方法接受所有映射到这个Reduce的map输出后,也会调用job.setSortComparatorClass()方法设置的Key比较函数类,对所有数据进行排序。然后开始构造一个Key对应的Value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个Key相同,它们就属于同一组,它们的Value放在一个Value迭代器,而这个迭代器的Key使用属于同一个组的所有Key的第一个Key。最后就是进入Reducer的reduce()方法,reduce()方法的输入是所有的Key和它的Value迭代器,同样注意输入与输出的类型必须与自定义的Reducer中声明的一致
接下来我们通过示例,可以很直观的了解二次排序的原理
输入文件sort.txt内容为
40204010403040530303020301030405020505050105060
输出文件的内容(从小到大排序)如下
3010302030303040--------405401040204030--------5010502050505060
从输出的结果可以看出Key实现了从小到大的排序,同时相同Key的Value也实现了从小到大的排序,这就是二次排序的结果
2、二次排序的具体流程
在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。
在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。
1、自定义key
所有自定义的key应该实现接口WritableComparable,因为它是可序列化的并且可比较的。WritableComparable的内部方法如下所示
//反序列化,从流中的二进制转换成IntPair publicvoidreadFields(DataInputin)throwsIOException //序列化,将IntPair转化成使用流传送的二进制 publicvoidwrite(DataOutputout) //key的比较 publicintcompareTo(IntPairo) //默认的分区类HashPartitioner,使用此方法 publicinthashCode() //默认实现 publicbooleanequals(Objectright)
2、自定义分区
自定义分区函数类FirstPartitioner,是key的第一次比较,完成对所有key的排序。
publicstaticclassFirstPartitionerextendsPartitioner
在job中使用setPartitionerClasss()方法设置Partitioner
job.setPartitionerClasss(FirstPartitioner.Class);
3、Key的比较类
这是Key的第二次比较,对所有的Key进行排序,即同时完成IntPair中的first和second排序。该类是一个比较器,可以通过两种方式实现。
1)继承WritableComparator。
publicstaticclassKeyComparatorextendsWritableComparator
必须有一个构造函数,并且重载以下方法。
publicintcompare(WritableComparablew1,WritableComparablew2)
2)实现接口RawComparator。
上面两种实现方式,在Job中,可以通过setSortComparatorClass()方法来设置Key的比较类。
job.setSortComparatorClass(KeyComparator.Class);
注意:如果没有使用自定义的SortComparator类,则默认使用Key中compareTo()方法对Key排序。
4、定义分组类函数
在Reduce阶段,构造一个与Key相对应的Value迭代器的时候,只要first相同就属于同一个组,放在一个Value迭代器。定义这个比较器,可以有两种方式。
1)继承WritableComparator。
publicstaticclassGroupingComparatorextendsWritableComparator
必须有一个构造函数,并且重载以下方法。
publicintcompare(WritableComparablew1,WritableComparablew2)
2)实现接口RawComparator。
上面两种实现方式,在Job中,可以通过setGroupingComparatorClass()方法来设置分组类。
job.setGroupingComparatorClass(GroupingComparator.Class);
另外注意的是,如果reduce的输入与输出不是同一种类型,则Combiner和Reducer不能共用Reducer类,因为
Combiner的输出是reduce的输入。除非重新定义一个Combiner。
3、代码实现
Hadoop的example包中自带了一个MapReduce的二次排序算法,下面对example包中的二次排序进行改进
packagecom.buaa; importjava.io.DataInput; importjava.io.DataOutput; importjava.io.IOException; importorg.apache.hadoop.io.WritableComparable; /** *@ProjectNameSecondarySort *@PackageNamecom.buaa *@ClassNameIntPair *@Description将示例数据中的key/value封装成一个整体作为Key,同时实现WritableComparable接口并重写其方法 *@Author刘吉超 *@Date2016-06-0722:31:53 */ publicclassIntPairimplementsWritableComparable{ privateintfirst; privateintsecond; publicIntPair(){ } publicIntPair(intleft,intright){ set(left,right); } publicvoidset(intleft,intright){ first=left; second=right; } @Override publicvoidreadFields(DataInputin)throwsIOException{ first=in.readInt(); second=in.readInt(); } @Override publicvoidwrite(DataOutputout)throwsIOException{ out.writeInt(first); out.writeInt(second); } @Override publicintcompareTo(IntPairo) { if(first!=o.first){ returnfirst { publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ Stringline=value.toString(); StringTokenizertokenizer=newStringTokenizer(line); intleft=0; intright=0; if(tokenizer.hasMoreTokens()){ left=Integer.parseInt(tokenizer.nextToken()); if(tokenizer.hasMoreTokens()) right=Integer.parseInt(tokenizer.nextToken()); context.write(newIntPair(left,right),newIntWritable(right)); } } } /* *自定义分区函数类FirstPartitioner,根据IntPair中的first实现分区 */ publicstaticclassFirstPartitionerextendsPartitioner { @Override publicintgetPartition(IntPairkey,IntWritablevalue,intnumPartitions){ returnMath.abs(key.getFirst()*127)%numPartitions; } } /* *自定义GroupingComparator类,实现分区内的数据分组 */ @SuppressWarnings("rawtypes") publicstaticclassGroupingComparatorextendsWritableComparator{ protectedGroupingComparator(){ super(IntPair.class,true); } @Override publicintcompare(WritableComparablew1,WritableComparablew2){ IntPairip1=(IntPair)w1; IntPairip2=(IntPair)w2; intl=ip1.getFirst(); intr=ip2.getFirst(); returnl==r?0:(l { publicvoidreduce(IntPairkey,Iterable values,Contextcontext)throwsIOException,InterruptedException{ for(IntWritableval:values){ context.write(newText(Integer.toString(key.getFirst())),val); } } } publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{ //读取配置文件 Configurationconf=newConfiguration(); //判断路径是否存在,如果存在,则删除 Pathmypath=newPath(args[1]); FileSystemhdfs=mypath.getFileSystem(conf); if(hdfs.isDirectory(mypath)){ hdfs.delete(mypath,true); } Jobjob=newJob(conf,"secondarysort"); //设置主类 job.setJarByClass(SecondarySort.class); //输入路径 FileInputFormat.setInputPaths(job,newPath(args[0])); //输出路径 FileOutputFormat.setOutputPath(job,newPath(args[1])); //Mapper job.setMapperClass(Map.class); //Reducer job.setReducerClass(Reduce.class); //分区函数 job.setPartitionerClass(FirstPartitioner.class); //本示例并没有自定义SortComparator,而是使用IntPair中compareTo方法进行排序job.setSortComparatorClass(); //分组函数 job.setGroupingComparatorClass(GroupingComparator.class); //map输出key类型 job.setMapOutputKeyClass(IntPair.class); //map输出value类型 job.setMapOutputValueClass(IntWritable.class); //reduce输出key类型 job.setOutputKeyClass(Text.class); //reduce输出value类型 job.setOutputValueClass(IntWritable.class); //输入格式 job.setInputFormatClass(TextInputFormat.class); //输出格式 job.setOutputFormatClass(TextOutputFormat.class); System.exit(job.waitForCompletion(true)?0:1); } }
总结
以上所述是小编给大家介绍的hadoop二次排序的原理和实现方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!