Hadoop编程基于MR程序实现倒排索引示例
相信接触过搜索引擎开发的同学对倒排索引并不陌生,谷歌、百度等搜索引擎都是用的倒排索引,关于倒排索引的有关知识,这里就不再深入讲解,有兴趣的同学到网上了解一下。这篇博文就带着大家一起学习下如何利用Hadoop的MR程序来实现倒排索引的功能。
一、数据准备
1、输入文件数据
这里我们准备三个输入文件,分别如下所示
a.txt
hellotom hellojerry hellotom
b.txt
hellojerry hellojerry tomjerry
c.txt
hellojerry hellotom
2、最终输出文件数据
最终输出文件的结果为:
[plain]viewplaincopy helloc.txt-->2b.txt-->2a.txt-->3 jerryc.txt-->1b.txt-->3a.txt-->1 tomc.txt-->1b.txt-->1a.txt-->2
二、倒排索引过程分析
根据输入文件数据和最终的输出文件结果可知,此程序需要利用两个MR实现,具体流程可总结归纳如下:
-------------第一步Mapper的输出结果格式如下:--------------------
context.wirte("hello->a.txt","1")
context.wirte("hello->a.txt","1")
context.wirte("hello->a.txt","1")
context.wirte("hello->b.txt","1")
context.wirte("hello->b.txt","1")
context.wirte("hello->c.txt","1")
context.wirte("hello->c.txt","1")
-------------第一步Reducer的得到的输入数据格式如下:-------------
<"hello->a.txt",{1,1,1}>
<"hello->b.txt",{1,1}>
<"hello->c.txt",{1,1}>
-------------第一步Reducer的输出数据格式如下---------------------
context.write("hello->a.txt","3")
context.write("hello->b.txt","2")
context.write("hello->c.txt","2")
-------------第二步Mapper得到的输入数据格式如下:-----------------
context.write("hello->a.txt","3")
context.write("hello->b.txt","2")
context.write("hello->c.txt","2")
-------------第二步Mapper输出的数据格式如下:--------------------
context.write("hello","a.txt->3")
context.write("hello","b.txt->2")
context.write("hello","c.txt->2")
-------------第二步Reducer得到的输入数据格式如下:-----------------
<"hello",{"a.txt->3","b.txt->2","c.txt->2"}>
-------------第二步Reducer输出的数据格式如下:-----------------
context.write("hello","a.txt->3b.txt->2c.txt->2")
最终结果为:
helloa.txt->3b.txt->2c.txt->2
三、程序开发
3.1、第一步MR程序与输入输出
packagecom.lyz.hdfs.mr.ii;
importjava.io.IOException;
importorg.apache.commons.lang.StringUtils;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.FileSplit;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*倒排索引第一步MapReduce程序,此处程序将所有的Map/Reduce/Runner程序放在一个类中
*@authorliuyazhuang
*
*/
publicclassInverseIndexStepOne{
/**
*完成倒排索引第一步的mapper程序
*@authorliuyazhuang
*
*/
publicstaticclassStepOneMapperextendsMapper{
@Override
protectedvoidmap(LongWritablekey,Textvalue,Mapper.Contextcontext)
throwsIOException,InterruptedException{
//获取一行数据
Stringline=value.toString();
//切分出每个单词
String[]fields=StringUtils.split(line,"");
//获取数据的切片信息
FileSplitfileSplit=(FileSplit)context.getInputSplit();
//根据切片信息获取文件名称
StringfileName=fileSplit.getPath().getName();
for(Stringfield:fields){
context.write(newText(field+"-->"+fileName),newLongWritable(1));
}
}
}
/**
*完成倒排索引第一步的Reducer程序
*最终输出结果为:
*hello-->a.txt3
hello-->b.txt2
hello-->c.txt2
jerry-->a.txt1
jerry-->b.txt3
jerry-->c.txt1
tom-->a.txt2
tom-->b.txt1
tom-->c.txt1
*@authorliuyazhuang
*
*/
publicstaticclassStepOneReducerextendsReducer{
@Override
protectedvoidreduce(Textkey,Iterablevalues,
Reducer.Contextcontext)throwsIOException,InterruptedException{
longcounter=0;
for(LongWritablevalue:values){
counter+=value.get();
}
context.write(key,newLongWritable(counter));
}
}
//运行第一步的MR程序
publicstaticvoidmain(String[]args)throwsException{
Configurationconf=newConfiguration();
Jobjob=Job.getInstance(conf);
job.setJarByClass(InverseIndexStepOne.class);
job.setMapperClass(StepOneMapper.class);
job.setReducerClass(StepOneReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job,newPath("D:/hadoop_data/ii"));
FileOutputFormat.setOutputPath(job,newPath("D:/hadoop_data/ii/result"));
job.waitForCompletion(true);
}
}
3.1.1输入数据
a.txt
hellotom hellojerry hellotom
b.txt
hellojerry hellojerry tomjerry
c.txt
hellojerry hellotom
3.1.2
输出结果:
hello-->a.txt3 hello-->b.txt2 hello-->c.txt2 jerry-->a.txt1 jerry-->b.txt3 jerry-->c.txt1 tom-->a.txt2 tom-->b.txt1 tom-->c.txt1
3.2第二步MR程序与输入输出
packagecom.lyz.hdfs.mr.ii;
importjava.io.IOException;
importorg.apache.commons.lang.StringUtils;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*倒排索引第二步MapReduce程序,此处程序将所有的Map/Reduce/Runner程序放在一个类中
*@authorliuyazhuang
*
*/
publicclassInverseIndexStepTwo{
/**
*完成倒排索引第二步的mapper程序
*
*从第一步MR程序中得到的输入信息为:
*hello-->a.txt3
hello-->b.txt2
hello-->c.txt2
jerry-->a.txt1
jerry-->b.txt3
jerry-->c.txt1
tom-->a.txt2
tom-->b.txt1
tom-->c.txt1
*@authorliuyazhuang
*
*/
publicstaticclassStepTwoMapperextendsMapper{
@Override
protectedvoidmap(LongWritablekey,Textvalue,Mapper.Contextcontext)
throwsIOException,InterruptedException{
Stringline=value.toString();
String[]fields=StringUtils.split(line,"\t");
String[]wordAndFileName=StringUtils.split(fields[0],"-->");
Stringword=wordAndFileName[0];
StringfileName=wordAndFileName[1];
longcounter=Long.parseLong(fields[1]);
context.write(newText(word),newText(fileName+"-->"+counter));
}
}
/**
*完成倒排索引第二步的Reducer程序
*得到的输入信息格式为:
*<"hello",{"a.txt->3","b.txt->2","c.txt->2"}>,
*最终输出结果如下:
*helloc.txt-->2b.txt-->2a.txt-->3
jerryc.txt-->1b.txt-->3a.txt-->1
tomc.txt-->1b.txt-->1a.txt-->2
*@authorliuyazhuang
*
*/
publicstaticclassStepTwoReducerextendsReducer{
@Override
protectedvoidreduce(Textkey,Iterablevalues,Reducer.Contextcontext)
throwsIOException,InterruptedException{
Stringresult="";
for(Textvalue:values){
result+=value+"";
}
context.write(key,newText(result));
}
}
//运行第一步的MR程序
publicstaticvoidmain(String[]args)throwsException{
Configurationconf=newConfiguration();
Jobjob=Job.getInstance(conf);
job.setJarByClass(InverseIndexStepTwo.class);
job.setMapperClass(StepTwoMapper.class);
job.setReducerClass(StepTwoReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,newPath("D:/hadoop_data/ii/result/part-r-00000"));
FileOutputFormat.setOutputPath(job,newPath("D:/hadoop_data/ii/result/final"));
job.waitForCompletion(true);
}
}
3.2.1输入数据
hello-->a.txt3 hello-->b.txt2 hello-->c.txt2 jerry-->a.txt1 jerry-->b.txt3 jerry-->c.txt1 tom-->a.txt2 tom-->b.txt1 tom-->c.txt1
3.2.2输出结果
helloc.txt-->2b.txt-->2a.txt-->3 jerryc.txt-->1b.txt-->3a.txt-->1 tomc.txt-->1b.txt-->1a.txt-->2
以上就是本文关于Hadoop编程基于MR程序实现倒排索引示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Hadoop对文本文件的快速全局排序实现方法及分析、hadoop重新格式化HDFS步骤解析、浅谈七种常见的Hadoop和Spark项目案例等,有什么问题可以直接留言,小编会及时回复大家的。感谢朋友们对本站的支持!