DataFrame:通过SparkSql将scala类转为DataFrame的方法
如下所示:
importjava.text.DecimalFormat
importcom.alibaba.fastjson.JSON
importcom.donews.data.AppConfig
importcom.typesafe.config.ConfigFactory
importorg.apache.spark.sql.types.{StructField,StructType}
importorg.apache.spark.sql.{Row,SaveMode,DataFrame,SQLContext}
importorg.apache.spark.{SparkConf,SparkContext}
importorg.slf4j.LoggerFactory
/**
*Createdbysilentwolfon2016/6/3.
*/
caseclassUserTag(SUUID:String,
MAN:Float,
WOMAN:Float,
AGE10_19:Float,
AGE20_29:Float,
AGE30_39:Float,
AGE40_49:Float,
AGE50_59:Float,
GAME:Float,
MOVIE:Float,
MUSIC:Float,
ART:Float,
POLITICS_NEWS:Float,
FINANCIAL:Float,
EDUCATION_TRAINING:Float,
HEALTH_CARE:Float,
TRAVEL:Float,
AUTOMOBILE:Float,
HOUSE_PROPERTY:Float,
CLOTHING_ACCESSORIES:Float,
BEAUTY:Float,
IT:Float,
BABY_PRODUCT:Float,
FOOD_SERVICE:Float,
HOME_FURNISHING:Float,
SPORTS:Float,
OUTDOOR_ACTIVITIES:Float,
MEDICINE:Float
)
objectUserTagTable{
valLOG=LoggerFactory.getLogger(UserOverviewFirst.getClass)
valREP_HOME=s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"
defmain(args:Array[String]){
varstartTime=System.currentTimeMillis()
valconf:com.typesafe.config.Config=ConfigFactory.load()
valsc=newSparkContext()
valsqlContext=newSQLContext(sc)
vardf1:DataFrame=null
if(args.length==0){
println("请输入:appkey,StartTime:2016-04-10,StartEnd:2016-04-11")
}
else{
varappkey=args(0)
varlastdate=args(1)
df1=loadDataFrame(sqlContext,appkey,"2016-04-10",lastdate)
df1.registerTempTable("suuidTable")
sqlContext.udf.register("taginfo",(a:String)=>userTagInfo(a))
sqlContext.udf.register("intToString",(b:Long)=>intToString(b))
importsqlContext.implicits._
//***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。
sqlContext.sql("selectdistinct(suuid)ASsuuid,taginfo(suuid)fromsuuidTablegroupbysuuid").map{caseRow(suuid:String,taginfo:String)=>
valtaginfoObj=JSON.parseObject(taginfo)
UserTag(suuid.toString,
taginfoObj.getFloat("man"),
taginfoObj.getFloat("woman"),
taginfoObj.getFloat("age10_19"),
taginfoObj.getFloat("age20_29"),
taginfoObj.getFloat("age30_39"),
taginfoObj.getFloat("age40_49"),
taginfoObj.getFloat("age50_59"),
taginfoObj.getFloat("game"),
taginfoObj.getFloat("movie"),
taginfoObj.getFloat("music"),
taginfoObj.getFloat("art"),
taginfoObj.getFloat("politics_news"),
taginfoObj.getFloat("financial"),
taginfoObj.getFloat("education_training"),
taginfoObj.getFloat("health_care"),
taginfoObj.getFloat("travel"),
taginfoObj.getFloat("automobile"),
taginfoObj.getFloat("house_property"),
taginfoObj.getFloat("clothing_accessories"),
taginfoObj.getFloat("beauty"),
taginfoObj.getFloat("IT"),
taginfoObj.getFloat("baby_Product"),
taginfoObj.getFloat("food_service"),
taginfoObj.getFloat("home_furnishing"),
taginfoObj.getFloat("sports"),
taginfoObj.getFloat("outdoor_activities"),
taginfoObj.getFloat("medicine")
)}.toDF().registerTempTable("resultTable")
valresultDF=sqlContext.sql(s"select'$appkey'ASAPPKEY,'$lastdate'ASDATE,SUUID,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39,"+
"AGE40_49,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE,"+
"HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT,FOOD_SERVICE,HOME_FURNISHING,SPORTS,OUTDOOR_ACTIVITIES,"+
"MEDICINEfromresultTableWHERESUUIDISNOTNULL")
resultDF.write.mode(SaveMode.Overwrite).options(
Map("table"->"USER_TAGS","zkUrl"->conf.getString("Hbase.url"))
).format("org.apache.phoenix.spark").save()
}
}
defintToString(suuid:Long):String={
suuid.toString()
}
defuserTagInfo(num1:String):String={
varde=newDecimalFormat("0.00")
varmannum=de.format(math.random).toFloat
varman=mannum
varwoman=de.format(1-mannum).toFloat
varage10_19num=de.format(math.random*0.2).toFloat
varage20_29num=de.format(math.random*0.2).toFloat
varage30_39num=de.format(math.random*0.2).toFloat
varage40_49num=de.format(math.random*0.2).toFloat
varage10_19=age10_19num
varage20_29=age20_29num
varage30_39=age30_39num
varage40_49=age40_49num
varage50_59=de.format(1-age10_19num-age20_29num-age30_39num-age40_49num).toFloat
vargame=de.format(math.random*1).toFloat
varmovie=de.format(math.random*1).toFloat
varmusic=de.format(math.random*1).toFloat
varart=de.format(math.random*1).toFloat
varpolitics_news=de.format(math.random*1).toFloat
varfinancial=de.format(math.random*1).toFloat
vareducation_training=de.format(math.random*1).toFloat
varhealth_care=de.format(math.random*1).toFloat
vartravel=de.format(math.random*1).toFloat
varautomobile=de.format(math.random*1).toFloat
varhouse_property=de.format(math.random*1).toFloat
varclothing_accessories=de.format(math.random*1).toFloat
varbeauty=de.format(math.random*1).toFloat
varIT=de.format(math.random*1).toFloat
varbaby_Product=de.format(math.random*1).toFloat
varfood_service=de.format(math.random*1).toFloat
varhome_furnishing=de.format(math.random*1).toFloat
varsports=de.format(math.random*1).toFloat
varoutdoor_activities=de.format(math.random*1).toFloat
varmedicine=de.format(math.random*1).toFloat
"{"+"\"man\""+":"+man+","+"\"woman\""+":"+woman+","+"\"age10_19\""+":"+age10_19+","+"\"age20_29\""+":"+age20_29+","+
"\"age30_39\""+":"+age30_39+","+"\"age40_49\""+":"+age40_49+","+"\"age50_59\""+":"+age50_59+","+"\"game\""+":"+game+","+
"\"movie\""+":"+movie+","+"\"music\""+":"+music+","+"\"art\""+":"+art+","+"\"politics_news\""+":"+politics_news+","+
"\"financial\""+":"+financial+","+"\"education_training\""+":"+education_training+","+"\"health_care\""+":"+health_care+","+
"\"travel\""+":"+travel+","+"\"automobile\""+":"+automobile+","+"\"house_property\""+":"+house_property+","+"\"clothing_accessories\""+":"+clothing_accessories+","+
"\"beauty\""+":"+beauty+","+"\"IT\""+":"+IT+","+"\"baby_Product\""+":"+baby_Product+","+"\"food_service\""+":"+food_service+","+
"\"home_furnishing\""+":"+home_furnishing+","+"\"sports\""+":"+sports+","+"\"outdoor_activities\""+":"+outdoor_activities+","+"\"medicine\""+":"+medicine+
"}";
}
defloadDataFrame(ctx:SQLContext,appkey:String,startDay:String,endDay:String):DataFrame={
valpath=s"$REP_HOME/appstatistic"
ctx.read.parquet(path)
.filter(s"timestampisnotnullandappkey='$appkey'andday>='$startDay'andday<='$endDay'")
}
}
以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。