详解Java编写并运行spark应用程序的方法
我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:
121.205.198.92--[21/Feb/2014:00:00:07+0800]"GET/archives/417.htmlHTTP/1.1"20011465"http://shiyanjun.cn/archives/417.html/""Mozilla/5.0(WindowsNT5.1;rv:11.0)Gecko/20100101Firefox/11.0" 121.205.198.92--[21/Feb/2014:00:00:11+0800]"POST/wp-comments-post.phpHTTP/1.1"30226"http://shiyanjun.cn/archives/417.html/""Mozilla/5.0(WindowsNT5.1;rv:23.0)Gecko/20100101Firefox/23.0" 121.205.198.92--[21/Feb/2014:00:00:12+0800]"GET/archives/417.html/HTTP/1.1"30126"http://shiyanjun.cn/archives/417.html/""Mozilla/5.0(WindowsNT5.1;rv:11.0)Gecko/20100101Firefox/11.0" 121.205.198.92--[21/Feb/2014:00:00:12+0800]"GET/archives/417.htmlHTTP/1.1"20011465"http://shiyanjun.cn/archives/417.html""Mozilla/5.0(WindowsNT5.1;rv:11.0)Gecko/20100101Firefox/11.0" 121.205.241.229--[21/Feb/2014:00:00:13+0800]"GET/archives/526.htmlHTTP/1.1"20012080"http://shiyanjun.cn/archives/526.html/""Mozilla/5.0(WindowsNT5.1;rv:11.0)Gecko/20100101Firefox/11.0" 121.205.241.229--[21/Feb/2014:00:00:15+0800]"POST/wp-comments-post.phpHTTP/1.1"30226"http://shiyanjun.cn/archives/526.html/""Mozilla/5.0(WindowsNT5.1;rv:23.0)Gecko/20100101Firefox/23.0"
Java实现Spark应用程序(Application)
我们实现的统计分析程序,有如下几个功能点:
从HDFS读取日志数据文件
将每行的第一个字段(IP地址)抽取出来
统计每个IP地址出现的次数
根据每个IP地址出现的次数进行一个降序排序
根据IP地址,调用GeoIP库获取IP所属国家
打印输出结果,每行的格式:[国家代码]IP地址频率
下面,看我们使用Java实现的统计分析应用程序代码,如下所示:
packageorg.shirdrn.spark.job; importjava.io.File; importjava.io.IOException; importjava.util.Arrays; importjava.util.Collections; importjava.util.Comparator; importjava.util.List; importjava.util.regex.Pattern; importorg.apache.commons.logging.Log; importorg.apache.commons.logging.LogFactory; importorg.apache.spark.api.java.JavaPairRDD; importorg.apache.spark.api.java.JavaRDD; importorg.apache.spark.api.java.JavaSparkContext; importorg.apache.spark.api.java.function.FlatMapFunction; importorg.apache.spark.api.java.function.Function2; importorg.apache.spark.api.java.function.PairFunction; importorg.shirdrn.spark.job.maxmind.Country; importorg.shirdrn.spark.job.maxmind.LookupService; importscala.Serializable; importscala.Tuple2; publicclassIPAddressStatsimplementsSerializable{ privatestaticfinallongserialVersionUID=8533489548835413763L; privatestaticfinalLogLOG=LogFactory.getLog(IPAddressStats.class); privatestaticfinalPatternSPACE=Pattern.compile(""); privatetransientLookupServicelookupService; privatetransientfinalStringgeoIPFile; publicIPAddressStats(StringgeoIPFile){ this.geoIPFile=geoIPFile; try{ //lookupService:getcountrycodefromaIPaddress Filefile=newFile(this.geoIPFile); LOG.info("GeoIPfile:"+file.getAbsolutePath()); lookupService=newAdvancedLookupService(file,LookupService.GEOIP_MEMORY_CACHE); }catch(IOExceptione){ thrownewRuntimeException(e); } } @SuppressWarnings("serial") publicvoidstat(String[]args){ JavaSparkContextctx=newJavaSparkContext(args[0],"IPAddressStats", System.getenv("SPARK_HOME"),JavaSparkContext.jarOfClass(IPAddressStats.class)); JavaRDDlines=ctx.textFile(args[1],1); //splitsandextractsipaddressfiled JavaRDD words=lines.flatMap(newFlatMapFunction (){ @Override publicIterable call(Strings){ //121.205.198.92--[21/Feb/2014:00:00:07+0800]"GET/archives/417.htmlHTTP/1.1"20011465"http://shiyanjun.cn/archives/417.html/""Mozilla/5.0(WindowsNT5.1;rv:11.0)Gecko/20100101Firefox/11.0" //ipaddress returnArrays.asList(SPACE.split(s)[0]); } }); //map JavaPairRDD ones=words.map(newPairFunction (){ @Override publicTuple2 call(Strings){ returnnewTuple2 (s,1); } }); //reduce JavaPairRDD counts=ones.reduceByKey(newFunction2 (){ @Override publicIntegercall(Integeri1,Integeri2){ returni1+i2; } }); List >output=counts.collect(); //sortstatisticsresultbyvalue Collections.sort(output,newComparator >(){ @Override publicintcompare(Tuple2 t1,Tuple2 t2){ if(t1._2 t2._2){ return-1; } return0; } }); writeTo(args,output); } privatevoidwriteTo(String[]args,List >output){ for(Tuple2,?>tuple:output){ Countrycountry=lookupService.getCountry((String)tuple._1); LOG.info("["+country.getCode()+"]"+tuple._1+"\t"+tuple._2); } } publicstaticvoidmain(String[]args){ //./bin/run-my-java-exampleorg.shirdrn.spark.job.IPAddressStatsspark://m1:7077hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat if(args.length<3){ System.err.println("Usage:IPAddressStats "); System.err.println("Example:org.shirdrn.spark.job.IPAddressStatsspark://m1:7077hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat"); System.exit(1); } StringgeoIPFile=args[2]; IPAddressStatsstats=newIPAddressStats(geoIPFile); stats.stat(args); System.exit(0); } }
具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:
org.apache.spark spark-core_2.10 0.9.0-incubating log4j log4j 1.2.16 dnsjava dnsjava 2.1.1 commons-net commons-net 3.1 org.apache.hadoop hadoop-client 1.2.1
需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:
14/03/1022:34:06INFOscheduler.DAGScheduler:FailedtoruncollectatIPAddressStats.java:76 Exceptioninthread"main"org.apache.spark.SparkException:Jobaborted:Tasknotserializable:java.io.NotSerializableException:org.shirdrn.spark.job.IPAddressStats atorg.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) atscala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) atscala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) atorg.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) atorg.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) atorg.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740) atscala.collection.immutable.List.foreach(List.scala:318) atorg.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740) atorg.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) atorg.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) atakka.actor.ActorCell.receiveMessage(ActorCell.scala:498) atakka.actor.ActorCell.invoke(ActorCell.scala:456) atakka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) atakka.dispatch.Mailbox.run(Mailbox.scala:219) atakka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) atscala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) atscala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) atscala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) atscala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在Spark集群上运行Java程序
这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:
maven-assembly-plugin org.shirdrn.spark.job.UserAgentStats jar-with-dependencies *.properties *.xml make-assembly package single
将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:
cygwin=false case"`uname`"in CYGWIN*)cygwin=true;; esac SCALA_VERSION=2.10 #FigureoutwheretheScalaframeworkisinstalled FWDIR="$(cd`dirname$0`/..;pwd)" #ExportthisasSPARK_HOME exportSPARK_HOME="$FWDIR" #Loadenvironmentvariablesfromconf/spark-env.sh,ifitexists if[-e"$FWDIR/conf/spark-env.sh"];then .$FWDIR/conf/spark-env.sh fi if[-z"$1"];then echo"Usage:run-example[ ]">&2 exit1 fi #FigureouttheJARfilethatourexampleswerepackagedinto.Thisincludesabitofahack #toavoidthe-sourcesand-docpackagesthatarebuiltbypublish-local. EXAMPLES_DIR="$FWDIR"/java-examples SPARK_EXAMPLES_JAR="" if[-e"$EXAMPLES_DIR"/*.jar];then exportSPARK_EXAMPLES_JAR=`ls"$EXAMPLES_DIR"/*.jar` fi if[[-z$SPARK_EXAMPLES_JAR]];then echo"FailedtofindSparkexamplesassemblyin$FWDIR/examples/target">&2 echo"YouneedtobuildSparkwithsbt/sbtassemblybeforerunningthisprogram">&2 exit1 fi #SincetheexamplesJARideallyshouldn'tincludespark-core(thatdependencyshouldbe #"provided"),alsoaddourstandardSparkclasspath,builtusingcompute-classpath.sh. CLASSPATH=`$FWDIR/bin/compute-classpath.sh` CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH" if$cygwin;then CLASSPATH=`cygpath-wp$CLASSPATH` exportSPARK_EXAMPLES_JAR=`cygpath-w$SPARK_EXAMPLES_JAR` fi #Findjavabinary if[-n"${JAVA_HOME}"];then RUNNER="${JAVA_HOME}/bin/java" else if[`command-vjava`];then RUNNER="java" else echo"JAVA_HOMEisnotset">&2 exit1 fi fi #SetJAVA_OPTStobeabletoloadnativelibrariesandtosetheapsize JAVA_OPTS="$SPARK_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS-Djava.library.path=$SPARK_LIBRARY_PATH" #LoadextraJAVA_OPTSfromconf/java-opts,ifitexists if[-e"$FWDIR/conf/java-opts"];then JAVA_OPTS="$JAVA_OPTS`cat$FWDIR/conf/java-opts`" fi exportJAVA_OPTS if["$SPARK_PRINT_LAUNCH_COMMAND"=="1"];then echo-n"SparkCommand:" echo"$RUNNER"-cp"$CLASSPATH"$JAVA_OPTS"$@" echo"========================================" echo fi exec"$RUNNER"-cp"$CLASSPATH"$JAVA_OPTS"$@"
在Spark上运行我们开发的Java程序,执行如下命令:
cd/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 ./bin/run-my-java-exampleorg.shirdrn.spark.job.IPAddressStatsspark://m1:7077hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:
Spark集群主节点URL:例如我的是spark://m1:7077
输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件
如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:
14/03/1022:17:24INFOjob.IPAddressStats:GeoIPfile:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat SLF4J:ClasspathcontainsmultipleSLF4Jbindings. SLF4J:Foundbindingin[jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J:Foundbindingin[jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J:Seehttp://www.slf4j.org/codes.html#multiple_bindingsforanexplanation. SLF4J:Actualbindingisoftype[org.slf4j.impl.Log4jLoggerFactory] 14/03/1022:17:25INFOslf4j.Slf4jLogger:Slf4jLoggerstarted 14/03/1022:17:25INFORemoting:Startingremoting 14/03/1022:17:25INFORemoting:Remotingstarted;listeningonaddresses:[akka.tcp://spark@m1:57379] 14/03/1022:17:25INFORemoting:Remotingnowlistensonaddresses:[akka.tcp://spark@m1:57379] 14/03/1022:17:25INFOspark.SparkEnv:RegisteringBlockManagerMaster 14/03/1022:17:25INFOstorage.DiskBlockManager:Createdlocaldirectoryat/tmp/spark-local-20140310221725-c1cb 14/03/1022:17:25INFOstorage.MemoryStore:MemoryStorestartedwithcapacity143.8MB. 14/03/1022:17:25INFOnetwork.ConnectionManager:Boundsockettoport45189withid=ConnectionManagerId(m1,45189) 14/03/1022:17:25INFOstorage.BlockManagerMaster:TryingtoregisterBlockManager 14/03/1022:17:25INFOstorage.BlockManagerMasterActor$BlockManagerInfo:Registeringblockmanagerm1:45189with143.8MBRAM 14/03/1022:17:25INFOstorage.BlockManagerMaster:RegisteredBlockManager 14/03/1022:17:25INFOspark.HttpServer:StartingHTTPServer 14/03/1022:17:25INFOserver.Server:jetty-7.x.y-SNAPSHOT 14/03/1022:17:25INFOserver.AbstractConnector:StartedSocketConnector@0.0.0.0:49186 14/03/1022:17:25INFObroadcast.HttpBroadcast:Broadcastserverstartedathttp://10.95.3.56:49186 14/03/1022:17:25INFOspark.SparkEnv:RegisteringMapOutputTracker 14/03/1022:17:25INFOspark.HttpFileServer:HTTPFileserverdirectoryis/tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370 14/03/1022:17:25INFOspark.HttpServer:StartingHTTPServer 14/03/1022:17:25INFOserver.Server:jetty-7.x.y-SNAPSHOT 14/03/1022:17:25INFOserver.AbstractConnector:StartedSocketConnector@0.0.0.0:52073 14/03/1022:17:26INFOserver.Server:jetty-7.x.y-SNAPSHOT 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/storage/rdd,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/storage,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/stages/stage,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/stages/pool,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/stages,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/environment,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/executors,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/metrics/json,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/static,null} 14/03/1022:17:26INFOhandler.ContextHandler:startedo.e.j.s.h.ContextHandler{/,null} 14/03/1022:17:26INFOserver.AbstractConnector:StartedSelectChannelConnector@0.0.0.0:4040 14/03/1022:17:26INFOui.SparkUI:StartedSparkWebUIathttp://m1:4040 14/03/1022:17:26INFOspark.SparkContext:AddedJAR/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jarathttp://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jarwithtimestamp1394515046396 14/03/1022:17:26INFOclient.AppClient$ClientActor:Connectingtomasterspark://m1:7077... 14/03/1022:17:26INFOstorage.MemoryStore:ensureFreeSpace(60341)calledwithcurMem=0,maxMem=150837657 14/03/1022:17:26INFOstorage.MemoryStore:Blockbroadcast_0storedasvaluestomemory(estimatedsize58.9KB,free143.8MB) 14/03/1022:17:26INFOcluster.SparkDeploySchedulerBackend:ConnectedtoSparkclusterwithappIDapp-20140310221726-0000 14/03/1022:17:27INFOclient.AppClient$ClientActor:Executoradded:app-20140310221726-0000/0onworker-20140310221648-s1-52544(s1:52544)with1cores 14/03/1022:17:27INFOcluster.SparkDeploySchedulerBackend:GrantedexecutorIDapp-20140310221726-0000/0onhostPorts1:52544with1cores,512.0MBRAM 14/03/1022:17:27WARNutil.NativeCodeLoader:Unabletoloadnative-hadooplibraryforyourplatform...usingbuiltin-javaclasseswhereapplicable 14/03/1022:17:27WARNsnappy.LoadSnappy:Snappynativelibrarynotloaded 14/03/1022:17:27INFOclient.AppClient$ClientActor:Executorupdated:app-20140310221726-0000/0isnowRUNNING 14/03/1022:17:27INFOmapred.FileInputFormat:Totalinputpathstoprocess:1 14/03/1022:17:27INFOspark.SparkContext:Startingjob:collectatIPAddressStats.java:77 14/03/1022:17:27INFOscheduler.DAGScheduler:RegisteringRDD4(reduceByKeyatIPAddressStats.java:70) 14/03/1022:17:27INFOscheduler.DAGScheduler:Gotjob0(collectatIPAddressStats.java:77)with1outputpartitions(allowLocal=false) 14/03/1022:17:27INFOscheduler.DAGScheduler:Finalstage:Stage0(collectatIPAddressStats.java:77) 14/03/1022:17:27INFOscheduler.DAGScheduler:Parentsoffinalstage:List(Stage1) 14/03/1022:17:27INFOscheduler.DAGScheduler:Missingparents:List(Stage1) 14/03/1022:17:27INFOscheduler.DAGScheduler:SubmittingStage1(MapPartitionsRDD[4]atreduceByKeyatIPAddressStats.java:70),whichhasnomissingparents 14/03/1022:17:27INFOscheduler.DAGScheduler:Submitting1missingtasksfromStage1(MapPartitionsRDD[4]atreduceByKeyatIPAddressStats.java:70) 14/03/1022:17:27INFOscheduler.TaskSchedulerImpl:Addingtaskset1.0with1tasks 14/03/1022:17:28INFOcluster.SparkDeploySchedulerBackend:Registeredexecutor:Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-671170811]withID0 14/03/1022:17:28INFOscheduler.TaskSetManager:Startingtask1.0:0asTID0onexecutor0:s1(PROCESS_LOCAL) 14/03/1022:17:28INFOscheduler.TaskSetManager:Serializedtask1.0:0as2396bytesin5ms 14/03/1022:17:29INFOstorage.BlockManagerMasterActor$BlockManagerInfo:Registeringblockmanagers1:47282with297.0MBRAM 14/03/1022:17:32INFOscheduler.TaskSetManager:FinishedTID0in3376msons1(progress:0/1) 14/03/1022:17:32INFOscheduler.DAGScheduler:CompletedShuffleMapTask(1,0) 14/03/1022:17:32INFOscheduler.DAGScheduler:Stage1(reduceByKeyatIPAddressStats.java:70)finishedin4.420s 14/03/1022:17:32INFOscheduler.DAGScheduler:lookingfornewlyrunnablestages 14/03/1022:17:32INFOscheduler.DAGScheduler:running:Set() 14/03/1022:17:32INFOscheduler.DAGScheduler:waiting:Set(Stage0) 14/03/1022:17:32INFOscheduler.DAGScheduler:failed:Set() 14/03/1022:17:32INFOscheduler.TaskSchedulerImpl:RemoveTaskSet1.0frompool 14/03/1022:17:32INFOscheduler.DAGScheduler:MissingparentsforStage0:List() 14/03/1022:17:32INFOscheduler.DAGScheduler:SubmittingStage0(MapPartitionsRDD[6]atreduceByKeyatIPAddressStats.java:70),whichisnowrunnable 14/03/1022:17:32INFOscheduler.DAGScheduler:Submitting1missingtasksfromStage0(MapPartitionsRDD[6]atreduceByKeyatIPAddressStats.java:70) 14/03/1022:17:32INFOscheduler.TaskSchedulerImpl:Addingtaskset0.0with1tasks 14/03/1022:17:32INFOscheduler.TaskSetManager:Startingtask0.0:0asTID1onexecutor0:s1(PROCESS_LOCAL) 14/03/1022:17:32INFOscheduler.TaskSetManager:Serializedtask0.0:0as2255bytesin1ms 14/03/1022:17:32INFOspark.MapOutputTrackerMasterActor:Askedtosendmapoutputlocationsforshuffle0tospark@s1:33534 14/03/1022:17:32INFOspark.MapOutputTrackerMaster:Sizeofoutputstatusesforshuffle0is120bytes 14/03/1022:17:32INFOscheduler.TaskSetManager:FinishedTID1in282msons1(progress:0/1) 14/03/1022:17:32INFOscheduler.DAGScheduler:CompletedResultTask(0,0) 14/03/1022:17:32INFOscheduler.DAGScheduler:Stage0(collectatIPAddressStats.java:77)finishedin0.314s 14/03/1022:17:32INFOscheduler.TaskSchedulerImpl:RemoveTaskSet0.0frompool 14/03/1022:17:32INFOspark.SparkContext:Jobfinished:collectatIPAddressStats.java:77,took4.870958309s 14/03/1022:17:32INFOjob.IPAddressStats:[CN]58.246.49.218312 14/03/1022:17:32INFOjob.IPAddressStats:[KR]1.234.83.77300 14/03/1022:17:32INFOjob.IPAddressStats:[CN]120.43.11.16212 14/03/1022:17:32INFOjob.IPAddressStats:[CN]110.85.72.254207 14/03/1022:17:32INFOjob.IPAddressStats:[CN]27.150.229.134185 14/03/1022:17:32INFOjob.IPAddressStats:[HK]180.178.52.181181 14/03/1022:17:32INFOjob.IPAddressStats:[CN]120.37.210.212180 14/03/1022:17:32INFOjob.IPAddressStats:[CN]222.77.226.83176 14/03/1022:17:32INFOjob.IPAddressStats:[CN]120.43.11.205169 14/03/1022:17:32INFOjob.IPAddressStats:[CN]120.43.9.19165 ...
我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。
另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。
总结
以上就是本文关于详解Java编写并运行spark应用程序的方法的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家。