Java和scala实现 Spark RDD转换成DataFrame的两种方法小结
一:准备数据源
在项目下新建一个student.txt文件,里面的内容为:
1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18
二:实现
Java版:
1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:
packagecom.cxd.sql; importjava.io.Serializable; @SuppressWarnings("serial") publicclassStudentimplementsSerializable{ Stringsid; Stringsname; intsage; publicStringgetSid(){ returnsid; } publicvoidsetSid(Stringsid){ this.sid=sid; } publicStringgetSname(){ returnsname; } publicvoidsetSname(Stringsname){ this.sname=sname; } publicintgetSage(){ returnsage; } publicvoidsetSage(intsage){ this.sage=sage; } @Override publicStringtoString(){ return"Student[sid="+sid+",sname="+sname+",sage="+sage+"]"; } }
2.转换,具体代码如下
packagecom.cxd.sql; importjava.util.ArrayList; importorg.apache.spark.SparkConf; importorg.apache.spark.api.java.JavaRDD; importorg.apache.spark.sql.Dataset; importorg.apache.spark.sql.Row; importorg.apache.spark.sql.RowFactory; importorg.apache.spark.sql.SaveMode; importorg.apache.spark.sql.SparkSession; importorg.apache.spark.sql.types.DataTypes; importorg.apache.spark.sql.types.StructField; importorg.apache.spark.sql.types.StructType; publicclassTxtToParquetDemo{ publicstaticvoidmain(String[]args){ SparkConfconf=newSparkConf().setAppName("TxtToParquet").setMaster("local"); SparkSessionspark=SparkSession.builder().config(conf).getOrCreate(); reflectTransform(spark);//Java反射 dynamicTransform(spark);//动态转换 } /** *通过Java反射转换 *@paramspark */ privatestaticvoidreflectTransform(SparkSessionspark) { JavaRDDsource=spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD rowRDD=source.map(line->{ Stringparts[]=line.split(","); Studentstu=newStudent(); stu.setSid(parts[0]); stu.setSname(parts[1]); stu.setSage(Integer.valueOf(parts[2])); returnstu; }); Dataset df=spark.createDataFrame(rowRDD,Student.class); df.select("sid","sname","sage"). coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res"); } /** *动态转换 *@paramspark */ privatestaticvoiddynamicTransform(SparkSessionspark) { JavaRDD
source=spark.read().textFile("stuInfo.txt").javaRDD(); JavaRDD rowRDD=source.map(line->{ String[]parts=line.split(","); Stringsid=parts[0]; Stringsname=parts[1]; intsage=Integer.parseInt(parts[2]); returnRowFactory.create( sid, sname, sage ); }); ArrayList
fields=newArrayList (); StructFieldfield=null; field=DataTypes.createStructField("sid",DataTypes.StringType,true); fields.add(field); field=DataTypes.createStructField("sname",DataTypes.StringType,true); fields.add(field); field=DataTypes.createStructField("sage",DataTypes.IntegerType,true); fields.add(field); StructTypeschema=DataTypes.createStructType(fields); Dataset df=spark.createDataFrame(rowRDD,schema); df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1"); } }
scala版本:
importorg.apache.spark.sql.SparkSession importorg.apache.spark.sql.types.StringType importorg.apache.spark.sql.types.StructField importorg.apache.spark.sql.types.StructType importorg.apache.spark.sql.Row importorg.apache.spark.sql.types.IntegerType objectRDD2Dataset{ caseclassStudent(id:Int,name:String,age:Int) defmain(args:Array[String]) { valspark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate() importspark.implicits._ reflectCreate(spark) dynamicCreate(spark) } /** *通过Java反射转换 *@paramspark */ privatedefreflectCreate(spark:SparkSession):Unit={ importspark.implicits._ valstuRDD=spark.sparkContext.textFile("student2.txt") //toDF()为隐式转换 valstuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF() //stuDf.select("id","name","age").write.text("result")//对写入文件指定列名 stuDf.printSchema() stuDf.createOrReplaceTempView("student") valnameDf=spark.sql("selectnamefromstudentwhereage<20") //nameDf.write.text("result")//将查询结果写入一个文件 nameDf.show() } /** *动态转换 *@paramspark */ privatedefdynamicCreate(spark:SparkSession):Unit={ valstuRDD=spark.sparkContext.textFile("student.txt") importspark.implicits._ valschemaString="id,name,age" valfields=schemaString.split(",").map(fieldName=>StructField(fieldName,StringType,nullable=true)) valschema=StructType(fields) valrowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2))) valstuDf=spark.createDataFrame(rowRDD,schema) stuDf.printSchema() valtmpView=stuDf.createOrReplaceTempView("student") valnameDf=spark.sql("selectnamefromstudentwhereage<20") //nameDf.write.text("result")//将查询结果写入一个文件 nameDf.show() } }
注:
1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
以上这篇Java和scala实现SparkRDD转换成DataFrame的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。