java 中Spark中将对象序列化存储到hdfs
java中Spark中将对象序列化存储到hdfs
摘要:Spark应用中经常会遇到这样一个需求:需要将JAVA对象序列化并存储到HDFS,尤其是利用MLlib计算出来的一些模型,存储到hdfs以便模型可以反复利用.下面的例子演示了Spark环境下从Hbase读取数据,生成一个word2vec模型,存储到hdfs.
废话不多说,直接贴代码了.spark1.4+hbase0.98
importorg.apache.spark.storage.StorageLevel
importscala.collection.JavaConverters._
importjava.io.File
importjava.io.FileInputStream
importjava.io.FileOutputStream
importjava.io.ObjectInputStream
importjava.io.ObjectOutputStream
importjava.net.URI
importjava.util.Date
importorg.ansj.library.UserDefineLibrary
importorg.ansj.splitWord.analysis.NlpAnalysis
importorg.ansj.splitWord.analysis.ToAnalysis
importorg.apache.hadoop.fs.FSDataInputStream
importorg.apache.hadoop.fs.FSDataOutputStream
importorg.apache.hadoop.fs.FileSystem
importorg.apache.hadoop.fs.FileUtil
importorg.apache.hadoop.fs.Path
importorg.apache.hadoop.hbase.client._
importorg.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor,TableName}
importorg.apache.hadoop.hbase.filter.FilterList
importorg.apache.hadoop.hbase.filter.PageFilter
importorg.apache.hadoop.hbase.filter.RegexStringComparator
importorg.apache.hadoop.hbase.filter.SingleColumnValueFilter
importorg.apache.hadoop.hbase.filter.CompareFilter.CompareOp
importorg.apache.hadoop.hbase.mapreduce.TableInputFormat
importorg.apache.hadoop.hbase.protobuf.ProtobufUtil
importorg.apache.hadoop.hbase.util.{Base64,Bytes}
importcom.feheadline.fespark.db.Neo4jManager
importcom.feheadline.fespark.util.Env
importorg.apache.spark.SparkConf
importorg.apache.spark.SparkContext
importorg.apache.spark.rdd._
importorg.apache.spark.mllib.feature.{Word2Vec,Word2VecModel}
importscala.math.log
importscala.io.Source
objectWord2VecDemo{
defconvertScanToString(scan:Scan)={
valproto=ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
defmain(args:Array[String]):Unit={
valsparkConf=newSparkConf().setAppName("Word2VecDemo")
sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.kryoserializer.buffer","256m")
sparkConf.set("spark.kryoserializer.buffer.max","2046m")
sparkConf.set("spark.akka.frameSize","500")
sparkConf.set("spark.rpc.askTimeout","30")
valsc=newSparkContext(sparkConf)
valhbaseConf=HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","myzookeeper")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"crawled")
valscan=newScan()
valfilterList:FilterList=newFilterList(FilterList.Operator.MUST_PASS_ALL)
valcomp:RegexStringComparator=newRegexStringComparator(""".{1500,}""")
valarticleFilter:SingleColumnValueFilter=newSingleColumnValueFilter(
"data".getBytes,
"article".getBytes,
CompareOp.EQUAL,
comp
)
filterList.addFilter(articleFilter)
filterList.addFilter(newPageFilter(100))
scan.setFilter(filterList)
scan.setCaching(50)
scan.setCacheBlocks(false)
hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
valcrawledRDD=sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]
)
valarticlesRDD=crawledRDD.filter{
case(_,result)=>{
valcontent=Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
content!=null
}
}
valwordsInDoc=articlesRDD.map{
case(_,result)=>{
valcontent=Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
elseSeq("")
}
}
valfitleredWordsInDoc=wordsInDoc.filter(_.nonEmpty)
valword2vec=newWord2Vec()
valmodel=word2vec.fit(fitleredWordsInDoc)
//---------------------------------------重点看这里-------------------------------------------------------------
//将上面的模型存储到hdfs
valhadoopConf=sc.hadoopConfiguration
hadoopConf.set("fs.defaultFS","hdfs://myhadoop:9000/")
valfileSystem=FileSystem.get(hadoopConf)
valpath=newPath("/user/hadoop/data/mllib/word2vec-object")
valoos=newObjectOutputStream(newFSDataOutputStream(fileSystem.create(path)))
oos.writeObject(model)
oos.close
//这里示例另外一个程序直接从hdfs读取序列化对象使用模型
valois=newObjectInputStream(newFSDataInputStream(fileSystem.open(path)))
valsample_model=ois.readObject.asInstanceOf[Word2VecModel]
/*
*//你还可以将序列化文件从hdfs放到本地,scala程序使用模型
*importjava.io._
*importorg.apache.spark.mllib.feature.{Word2Vec,Word2VecModel}
*valois=newObjectInputStream(newFileInputStream("/home/cherokee/tmp/word2vec-object"))
*valsample_model=ois.readObject.asInstanceOf[Word2VecModel]
*ois.close
*/
//--------------------------------------------------------------------------------------------------------------
}
}
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!