Spark学习笔记之Spark SQL的具体使用
1.SparkSQL是什么?
- 处理结构化数据的一个spark的模块
- 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用
2.SparkSQL的特点
- 多语言的接口支持(javapythonscala)
- 统一的数据访问
- 完全兼容hive
- 支持标准的连接
3.为什么学习SparkSQL?
我们已经学习了Hive,它是将HiveSQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有SparkSQL的应运而生,它是将SparkSQL转换成RDD,然后提交到集群执行,执行效率非常快!
4.DataFrame(数据框)
- 与RDD类似,DataFrame也是一个分布式数据容器
- 然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
- DataFrame其实就是带有schema信息的RDD
5.SparkSQL1.x的API编程
org.apache.spark spark-sql_2.11 ${spark.version}
5.1使用sqlContext创建DataFrame(测试用)
objectOps3{
defmain(args:Array[String]):Unit={
valconf=newSparkConf().setAppName("Ops3").setMaster("local[3]")
valsc=newSparkContext(conf)
valsqlContext=newSQLContext(sc)
valrdd1=sc.parallelize(List(Person("admin1",14,"man"),Person("admin2",16,"man"),Person("admin3",18,"man")))
valdf1:DataFrame=sqlContext.createDataFrame(rdd1)
df1.show(1)
}
}
caseclassPerson(name:String,age:Int,sex:String);
5.2使用sqlContxet中提供的隐式转换函数(测试用)
importorg.apache.spark
valconf=newSparkConf().setAppName("Ops3").setMaster("local[3]")
valsc=newSparkContext(conf)
valsqlContext=newSQLContext(sc)
valrdd1=sc.parallelize(List(Person("admin1",14,"man"),Person("admin2",16,"man"),Person("admin3",18,"man")))
importsqlContext.implicits._
valdf1:DataFrame=rdd1.toDF
df1.show()
5.3使用SqlContext创建DataFrame(常用)
valconf=newSparkConf().setAppName("Ops3").setMaster("local[3]")
valsc=newSparkContext(conf)
valsqlContext=newSQLContext(sc)
vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/")
valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType)))
valrowRDD:RDD[Row]=linesRDD.map(line=>{
vallineSplit:Array[String]=line.split(",")
Row(lineSplit(0),lineSplit(1).toInt,lineSplit(2))
})
valrowDF:DataFrame=sqlContext.createDataFrame(rowRDD,schema)
rowDF.show()
6.使用新版本的2.x的API
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]")
valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate()
valsc=sparkSession.sparkContext
vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
valrowRDD:RDD[Row]=linesRDD.map(line=>{
valsplits:Array[String]=line.split(",")
Row(splits(0),splits(1).toInt,splits(2))
})
valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType)))
valdf:DataFrame=sparkSession.createDataFrame(rowRDD,schema)
df.createOrReplaceTempView("p1")
valdf2=sparkSession.sql("select*fromp1")
df2.show()
7.操作SparkSQL的方式
7.1使用SQL语句的方式对DataFrame进行操作
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]")
valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API相当于Spark1.x的SQLContext
valsc=sparkSession.sparkContext
vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
valrowRDD:RDD[Row]=linesRDD.map(line=>{
valsplits:Array[String]=line.split(",")
Row(splits(0),splits(1).toInt,splits(2))
})
valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType)))
valdf:DataFrame=sparkSession.createDataFrame(rowRDD,schema)
df.createOrReplaceTempView("p1")//这是Sprk2.x新的API相当于Spark1.x的registTempTable()
valdf2=sparkSession.sql("select*fromp1")
df2.show()
7.2使用DSL语句的方式对DataFrame进行操作
DSL(domainspecificlanguage)特定领域语言
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]")
valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate()
valsc=sparkSession.sparkContext
vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
valrowRDD:RDD[Row]=linesRDD.map(line=>{
valsplits:Array[String]=line.split(",")
Row(splits(0),splits(1).toInt,splits(2))
})
valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType)))
valrowDF:DataFrame=sparkSession.createDataFrame(rowRDD,schema)
importsparkSession.implicits._
valdf:DataFrame=rowDF.select("name","age").where("age>10").orderBy($"age".desc)
df.show()
8.SparkSQL的输出
8.1写出到JSON文件
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]")
valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate()
valsc=sparkSession.sparkContext
vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest")
//数据清洗
valrowRDD:RDD[Row]=linesRDD.map(line=>{
valsplits:Array[String]=line.split(",")
Row(splits(0),splits(1).toInt,splits(2))
})
valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType)))
valrowDF:DataFrame=sparkSession.createDataFrame(rowRDD,schema)
importsparkSession.implicits._
valdf:DataFrame=rowDF.select("name","age").where("age>10").orderBy($"age".desc)
df.write.json("hdfs://uplooking02:8020/sparktest1")
8.2写出到关系型数据库(mysql)
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]")
valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate()
valsc=sparkSession.sparkContext
vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest")
//数据清洗
valrowRDD:RDD[Row]=linesRDD.map(line=>{
valsplits:Array[String]=line.split(",")
Row(splits(0),splits(1).toInt,splits(2))
})
valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType)))
valrowDF:DataFrame=sparkSession.createDataFrame(rowRDD,schema)
importsparkSession.implicits._
valdf:DataFrame=rowDF.select("name","age").where("age>10").orderBy($"age".desc)
valurl="jdbc:mysql://localhost:3306/test"
//表会自动创建
valtbName="person1";
valprop=newProperties()
prop.put("user","root")
prop.put("password","root")
//SaveMode默认为ErrorIfExists
df.write.mode(SaveMode.Append).jdbc(url,tbName,prop)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。