DataFrame:通过SparkSql将scala类转为DataFrame的方法
如下所示:
importjava.text.DecimalFormat importcom.alibaba.fastjson.JSON importcom.donews.data.AppConfig importcom.typesafe.config.ConfigFactory importorg.apache.spark.sql.types.{StructField,StructType} importorg.apache.spark.sql.{Row,SaveMode,DataFrame,SQLContext} importorg.apache.spark.{SparkConf,SparkContext} importorg.slf4j.LoggerFactory /** *Createdbysilentwolfon2016/6/3. */ caseclassUserTag(SUUID:String, MAN:Float, WOMAN:Float, AGE10_19:Float, AGE20_29:Float, AGE30_39:Float, AGE40_49:Float, AGE50_59:Float, GAME:Float, MOVIE:Float, MUSIC:Float, ART:Float, POLITICS_NEWS:Float, FINANCIAL:Float, EDUCATION_TRAINING:Float, HEALTH_CARE:Float, TRAVEL:Float, AUTOMOBILE:Float, HOUSE_PROPERTY:Float, CLOTHING_ACCESSORIES:Float, BEAUTY:Float, IT:Float, BABY_PRODUCT:Float, FOOD_SERVICE:Float, HOME_FURNISHING:Float, SPORTS:Float, OUTDOOR_ACTIVITIES:Float, MEDICINE:Float ) objectUserTagTable{ valLOG=LoggerFactory.getLogger(UserOverviewFirst.getClass) valREP_HOME=s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}" defmain(args:Array[String]){ varstartTime=System.currentTimeMillis() valconf:com.typesafe.config.Config=ConfigFactory.load() valsc=newSparkContext() valsqlContext=newSQLContext(sc) vardf1:DataFrame=null if(args.length==0){ println("请输入:appkey,StartTime:2016-04-10,StartEnd:2016-04-11") } else{ varappkey=args(0) varlastdate=args(1) df1=loadDataFrame(sqlContext,appkey,"2016-04-10",lastdate) df1.registerTempTable("suuidTable") sqlContext.udf.register("taginfo",(a:String)=>userTagInfo(a)) sqlContext.udf.register("intToString",(b:Long)=>intToString(b)) importsqlContext.implicits._ //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。 sqlContext.sql("selectdistinct(suuid)ASsuuid,taginfo(suuid)fromsuuidTablegroupbysuuid").map{caseRow(suuid:String,taginfo:String)=> valtaginfoObj=JSON.parseObject(taginfo) UserTag(suuid.toString, taginfoObj.getFloat("man"), taginfoObj.getFloat("woman"), taginfoObj.getFloat("age10_19"), taginfoObj.getFloat("age20_29"), taginfoObj.getFloat("age30_39"), taginfoObj.getFloat("age40_49"), taginfoObj.getFloat("age50_59"), taginfoObj.getFloat("game"), taginfoObj.getFloat("movie"), taginfoObj.getFloat("music"), taginfoObj.getFloat("art"), taginfoObj.getFloat("politics_news"), taginfoObj.getFloat("financial"), taginfoObj.getFloat("education_training"), taginfoObj.getFloat("health_care"), taginfoObj.getFloat("travel"), taginfoObj.getFloat("automobile"), taginfoObj.getFloat("house_property"), taginfoObj.getFloat("clothing_accessories"), taginfoObj.getFloat("beauty"), taginfoObj.getFloat("IT"), taginfoObj.getFloat("baby_Product"), taginfoObj.getFloat("food_service"), taginfoObj.getFloat("home_furnishing"), taginfoObj.getFloat("sports"), taginfoObj.getFloat("outdoor_activities"), taginfoObj.getFloat("medicine") )}.toDF().registerTempTable("resultTable") valresultDF=sqlContext.sql(s"select'$appkey'ASAPPKEY,'$lastdate'ASDATE,SUUID,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39,"+ "AGE40_49,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE,"+ "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT,FOOD_SERVICE,HOME_FURNISHING,SPORTS,OUTDOOR_ACTIVITIES,"+ "MEDICINEfromresultTableWHERESUUIDISNOTNULL") resultDF.write.mode(SaveMode.Overwrite).options( Map("table"->"USER_TAGS","zkUrl"->conf.getString("Hbase.url")) ).format("org.apache.phoenix.spark").save() } } defintToString(suuid:Long):String={ suuid.toString() } defuserTagInfo(num1:String):String={ varde=newDecimalFormat("0.00") varmannum=de.format(math.random).toFloat varman=mannum varwoman=de.format(1-mannum).toFloat varage10_19num=de.format(math.random*0.2).toFloat varage20_29num=de.format(math.random*0.2).toFloat varage30_39num=de.format(math.random*0.2).toFloat varage40_49num=de.format(math.random*0.2).toFloat varage10_19=age10_19num varage20_29=age20_29num varage30_39=age30_39num varage40_49=age40_49num varage50_59=de.format(1-age10_19num-age20_29num-age30_39num-age40_49num).toFloat vargame=de.format(math.random*1).toFloat varmovie=de.format(math.random*1).toFloat varmusic=de.format(math.random*1).toFloat varart=de.format(math.random*1).toFloat varpolitics_news=de.format(math.random*1).toFloat varfinancial=de.format(math.random*1).toFloat vareducation_training=de.format(math.random*1).toFloat varhealth_care=de.format(math.random*1).toFloat vartravel=de.format(math.random*1).toFloat varautomobile=de.format(math.random*1).toFloat varhouse_property=de.format(math.random*1).toFloat varclothing_accessories=de.format(math.random*1).toFloat varbeauty=de.format(math.random*1).toFloat varIT=de.format(math.random*1).toFloat varbaby_Product=de.format(math.random*1).toFloat varfood_service=de.format(math.random*1).toFloat varhome_furnishing=de.format(math.random*1).toFloat varsports=de.format(math.random*1).toFloat varoutdoor_activities=de.format(math.random*1).toFloat varmedicine=de.format(math.random*1).toFloat "{"+"\"man\""+":"+man+","+"\"woman\""+":"+woman+","+"\"age10_19\""+":"+age10_19+","+"\"age20_29\""+":"+age20_29+","+ "\"age30_39\""+":"+age30_39+","+"\"age40_49\""+":"+age40_49+","+"\"age50_59\""+":"+age50_59+","+"\"game\""+":"+game+","+ "\"movie\""+":"+movie+","+"\"music\""+":"+music+","+"\"art\""+":"+art+","+"\"politics_news\""+":"+politics_news+","+ "\"financial\""+":"+financial+","+"\"education_training\""+":"+education_training+","+"\"health_care\""+":"+health_care+","+ "\"travel\""+":"+travel+","+"\"automobile\""+":"+automobile+","+"\"house_property\""+":"+house_property+","+"\"clothing_accessories\""+":"+clothing_accessories+","+ "\"beauty\""+":"+beauty+","+"\"IT\""+":"+IT+","+"\"baby_Product\""+":"+baby_Product+","+"\"food_service\""+":"+food_service+","+ "\"home_furnishing\""+":"+home_furnishing+","+"\"sports\""+":"+sports+","+"\"outdoor_activities\""+":"+outdoor_activities+","+"\"medicine\""+":"+medicine+ "}"; } defloadDataFrame(ctx:SQLContext,appkey:String,startDay:String,endDay:String):DataFrame={ valpath=s"$REP_HOME/appstatistic" ctx.read.parquet(path) .filter(s"timestampisnotnullandappkey='$appkey'andday>='$startDay'andday<='$endDay'") } }
以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。