基于MapReduce实现决策树算法
本文实例为大家分享了MapReduce实现决策树算法的具体代码,供大家参考,具体内容如下
首先,基于C45决策树算法实现对应的Mapper算子,相关的代码如下:
publicclassMapClassextendsMapReduceBaseimplementsMapper{
privatefinalstaticIntWritableone=newIntWritable(1);
privateTextattValue=newText();
privateinti;
privateStringtoken;
publicstaticintno_Attr;
publicSplitsplit=null;
publicintsize_split_1=0;
publicvoidconfigure(JobConfconf){
try{
split=(Split)ObjectSerializable.unSerialize(conf.get("currentsplit"));
}catch(ClassNotFoundExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}catch(IOExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
size_split_1=Integer.parseInt(conf.get("current_index"));
}
publicvoidmap(LongWritablekey,Textvalue,OutputCollectoroutput,Reporterreporter)
throwsIOException{
Stringline=value.toString();//changinginputinstancevalueto
//string
StringTokenizeritr=newStringTokenizer(line);
intindex=0;
Stringattr_value=null;
no_Attr=itr.countTokens()-1;
Stringattr[]=newString[no_Attr];
booleanmatch=true;
for(i=0;i
然后,基于C45决策树算法实现对应的Reducer算子,相关的代码如下:
publicclassReduceextendsMapReduceBaseimplementsReducer{
staticintcnt=0;
ArrayListar=newArrayList();
Stringdata=null;
privatestaticintcurrentIndex;
publicvoidconfigure(JobConfconf){
currentIndex=Integer.valueOf(conf.get("currentIndex"));
}
publicvoidreduce(Textkey,Iteratorvalues,OutputCollectoroutput,
Reporterreporter)throwsIOException{
intsum=0;
//sum表示按照某个属性进行划分的子数据集上的某个类出现的个数
while(values.hasNext()){
sum+=values.next().get();
}
//最后将这个属性上的取值写入output中;
output.collect(key,newIntWritable(sum));
Stringdata=key+""+sum;
ar.add(data);
//将最终结果写入到文件中;
writeToFile(ar);
ar.add("\n");
}
publicstaticvoidwriteToFile(ArrayListtext){
try{
cnt++;
Pathinput=newPath("C45/intermediate"+currentIndex+".txt");
Configurationconf=newConfiguration();
FileSystemfs=FileSystem.get(conf);
BufferedWriterbw=newBufferedWriter(newOutputStreamWriter(fs.create(input,true)));
for(Stringstr:text){
bw.write(str);
}
bw.newLine();
bw.close();
}catch(Exceptione){
System.out.println("Fileisnotcreatinginreduce");
}
}
}
最后,编写Main函数,启动MapReduce作业,需要启动多趟,代码如下:
packagecom.hackecho.hadoop;
importjava.io.BufferedWriter;
importjava.io.IOException;
importjava.io.OutputStreamWriter;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.StringTokenizer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.conf.Configured;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.FileInputFormat;
importorg.apache.hadoop.mapred.FileOutputFormat;
importorg.apache.hadoop.mapred.JobClient;
importorg.apache.hadoop.mapred.JobConf;
importorg.apache.hadoop.util.Tool;
importorg.apache.hadoop.util.ToolRunner;
importorg.apache.log4j.PropertyConfigurator;
importorg.dmg.pmml.MiningFunctionType;
importorg.dmg.pmml.Node;
importorg.dmg.pmml.PMML;
importorg.dmg.pmml.TreeModel;
//在这里MapReduce的作用就是根据各个属性的特征来划分子数据集
publicclassMainextendsConfiguredimplementsTool{
//当前分裂
publicstaticSplitcurrentsplit=newSplit();
//已经分裂完成的集合
publicstaticListsplitted=newArrayList();
//current_index表示目前进行分裂的位置
publicstaticintcurrent_index=0;
publicstaticArrayListar=newArrayList();
publicstaticListleafSplits=newArrayList();
publicstaticfinalStringPROJECT_HOME=System.getProperty("user.dir");
publicstaticvoidmain(String[]args)throwsException{
//在splitted中已经放入了一个currentsplit了,所以此时的splitted的size大小为1
PropertyConfigurator.configure(PROJECT_HOME+"/conf/log/log4j.properties");
splitted.add(currentsplit);
Pathc45=newPath("C45");
Configurationconf=newConfiguration();
FileSystemfs=FileSystem.get(conf);
if(fs.exists(c45)){
fs.delete(c45,true);
}
fs.mkdirs(c45);
intres=0;
intsplit_index=0;
//增益率
doublegainratio=0;
//最佳增益
doublebest_gainratio=0;
//熵值
doubleentropy=0;
//分类标签
StringclassLabel=null;
//属性个数
inttotal_attributes=MapClass.no_Attr;
total_attributes=4;
//分裂的个数
intsplit_size=splitted.size();
//增益率
GainRatiogainObj;
//产生分裂的新节点
Splitnewnode;
while(split_size>current_index){
currentsplit=splitted.get(current_index);
gainObj=newGainRatio();
res=ToolRunner.run(newConfiguration(),newMain(),args);
System.out.println("CurrentNODEINDEX.::"+current_index);
intj=0;
inttemp_size;
gainObj.getcount();
//计算当前节点的信息熵
entropy=gainObj.currNodeEntophy();
//获取在当前节点的分类
classLabel=gainObj.majorityLabel();
currentsplit.classLabel=classLabel;
if(entropy!=0.0&¤tsplit.attr_index.size()!=total_attributes){
System.out.println("");
System.out.println("EntropyNOTTzeroSPLITINDEX::"+entropy);
best_gainratio=0;
//计算各个属性的信息增益值
for(j=0;j=best_gainratio){
split_index=j;
best_gainratio=gainratio;
}
}
}
//split_index表示在第几个属性上完成了分裂,也就是分裂的索引值;
//attr_values_split表示分裂的属性所取的值的拼接成的字符串;
Stringattr_values_split=gainObj.getvalues(split_index);
StringTokenizerattrs=newStringTokenizer(attr_values_split);
intnumber_splits=attrs.countTokens();//numberofsplits
//possiblewith
//attributeselected
Stringred="";
System.out.println("INDEX::"+split_index);
System.out.println("SPLITTINGVALUES"+attr_values_split);
//根据分裂形成的属性值的集合将在某个节点上按照属性值将数据集分成若干类
for(intsplitnumber=1;splitnumber<=number_splits;splitnumber++){
temp_size=currentsplit.attr_index.size();
newnode=newSplit();
for(inty=0;y
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。