java 中自定义OutputFormat的实例详解
java中自定义OutputFormat的实例详解
实例代码:
packagecom.ccse.hadoop.outputformat;
importjava.io.IOException;
importjava.net.URI;
importjava.net.URISyntaxException;
importjava.util.StringTokenizer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataOutputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.JobContext;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.OutputCommitter;
importorg.apache.hadoop.mapreduce.OutputFormat;
importorg.apache.hadoop.mapreduce.RecordWriter;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
publicclassMySelfOutputFormatApp{
publicfinalstaticStringINPUT_PATH="hdfs://chaoren1:9000/mapinput";
publicfinalstaticStringOUTPUT_PATH="hdfs://chaoren1:9000/mapoutput";
publicfinalstaticStringOUTPUT_FILENAME="/abc";
publicstaticvoidmain(String[]args)throwsIOException,URISyntaxException,
ClassNotFoundException,InterruptedException{
Configurationconf=newConfiguration();
FileSystemfileSystem=FileSystem.get(newURI(OUTPUT_PATH),conf);
fileSystem.delete(newPath(OUTPUT_PATH),true);
Jobjob=newJob(conf,MySelfOutputFormatApp.class.getSimpleName());
job.setJarByClass(MySelfOutputFormatApp.class);
FileInputFormat.setInputPaths(job,newPath(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(MyselfOutputFormat.class);
job.waitForCompletion(true);
}
publicstaticclassMyMapperextendsMapper{
privateTextword=newText();
privateLongWritablewritable=newLongWritable(1);
@Override
protectedvoidmap(LongWritablekey,Textvalue,
Mapper.Contextcontext)
throwsIOException,InterruptedException{
if(value!=null){
Stringline=value.toString();
StringTokenizertokenizer=newStringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
context.write(word,writable);
}
}
}
}
publicstaticclassMyReducerextendsReducer{
@Override
protectedvoidreduce(Textkey,Iterablevalues,
Reducer.Contextcontext)
throwsIOException,InterruptedException{
longsum=0;
for(LongWritablevalue:values){
sum+=value.get();
}
context.write(key,newLongWritable(sum));
}
}
publicstaticclassMyselfOutputFormatextendsOutputFormat{
privateFSDataOutputStreamoutputStream=null;
@Override
publicRecordWritergetRecordWriter(
TaskAttemptContextcontext)throwsIOException,
InterruptedException{
try{
FileSystemfileSystem=FileSystem.get(newURI(MySelfOutputFormatApp.OUTPUT_PATH),context.getConfiguration());
//指定文件的输出路径
finalPathpath=newPath(MySelfOutputFormatApp.OUTPUT_PATH
+MySelfOutputFormatApp.OUTPUT_FILENAME);
this.outputStream=fileSystem.create(path,false);
}catch(URISyntaxExceptione){
e.printStackTrace();
}
returnnewMySelfRecordWriter(outputStream);
}
@Override
publicvoidcheckOutputSpecs(JobContextcontext)throwsIOException,
InterruptedException{
}
@Override
publicOutputCommittergetOutputCommitter(TaskAttemptContextcontext)
throwsIOException,InterruptedException{
returnnewFileOutputCommitter(newPath(MySelfOutputFormatApp.OUTPUT_PATH),context);
}
}
publicstaticclassMySelfRecordWriterextendsRecordWriter{
privateFSDataOutputStreamoutputStream=null;
publicMySelfRecordWriter(FSDataOutputStreamoutputStream){
this.outputStream=outputStream;
}
@Override
publicvoidwrite(Textkey,LongWritablevalue)throwsIOException,
InterruptedException{
this.outputStream.writeBytes(key.toString());
this.outputStream.writeBytes("\t");
this.outputStream.writeLong(value.get());
}
@Override
publicvoidclose(TaskAttemptContextcontext)throwsIOException,
InterruptedException{
this.outputStream.close();
}
}
}
2.OutputFormat是用于处理各种输出目的地的。
2.1OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。
2.2RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。
2.3RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。
以上就是java中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!