Spark学习笔记之Spark SQL的具体使用
1.SparkSQL是什么?
- 处理结构化数据的一个spark的模块
- 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用
2.SparkSQL的特点
- 多语言的接口支持(javapythonscala)
- 统一的数据访问
- 完全兼容hive
- 支持标准的连接
3.为什么学习SparkSQL?
我们已经学习了Hive,它是将HiveSQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有SparkSQL的应运而生,它是将SparkSQL转换成RDD,然后提交到集群执行,执行效率非常快!
4.DataFrame(数据框)
- 与RDD类似,DataFrame也是一个分布式数据容器
- 然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
- DataFrame其实就是带有schema信息的RDD
5.SparkSQL1.x的API编程
org.apache.spark spark-sql_2.11 ${spark.version}
5.1使用sqlContext创建DataFrame(测试用)
objectOps3{ defmain(args:Array[String]):Unit={ valconf=newSparkConf().setAppName("Ops3").setMaster("local[3]") valsc=newSparkContext(conf) valsqlContext=newSQLContext(sc) valrdd1=sc.parallelize(List(Person("admin1",14,"man"),Person("admin2",16,"man"),Person("admin3",18,"man"))) valdf1:DataFrame=sqlContext.createDataFrame(rdd1) df1.show(1) } } caseclassPerson(name:String,age:Int,sex:String);
5.2使用sqlContxet中提供的隐式转换函数(测试用)
importorg.apache.spark valconf=newSparkConf().setAppName("Ops3").setMaster("local[3]") valsc=newSparkContext(conf) valsqlContext=newSQLContext(sc) valrdd1=sc.parallelize(List(Person("admin1",14,"man"),Person("admin2",16,"man"),Person("admin3",18,"man"))) importsqlContext.implicits._ valdf1:DataFrame=rdd1.toDF df1.show() 5.3使用SqlContext创建DataFrame(常用) valconf=newSparkConf().setAppName("Ops3").setMaster("local[3]") valsc=newSparkContext(conf) valsqlContext=newSQLContext(sc) vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/") valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType))) valrowRDD:RDD[Row]=linesRDD.map(line=>{ vallineSplit:Array[String]=line.split(",") Row(lineSplit(0),lineSplit(1).toInt,lineSplit(2)) }) valrowDF:DataFrame=sqlContext.createDataFrame(rowRDD,schema) rowDF.show()
6.使用新版本的2.x的API
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]") valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate() valsc=sparkSession.sparkContext vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/") //数据清洗 valrowRDD:RDD[Row]=linesRDD.map(line=>{ valsplits:Array[String]=line.split(",") Row(splits(0),splits(1).toInt,splits(2)) }) valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType))) valdf:DataFrame=sparkSession.createDataFrame(rowRDD,schema) df.createOrReplaceTempView("p1") valdf2=sparkSession.sql("select*fromp1") df2.show()
7.操作SparkSQL的方式
7.1使用SQL语句的方式对DataFrame进行操作
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]") valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API相当于Spark1.x的SQLContext valsc=sparkSession.sparkContext vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/") //数据清洗 valrowRDD:RDD[Row]=linesRDD.map(line=>{ valsplits:Array[String]=line.split(",") Row(splits(0),splits(1).toInt,splits(2)) }) valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType))) valdf:DataFrame=sparkSession.createDataFrame(rowRDD,schema) df.createOrReplaceTempView("p1")//这是Sprk2.x新的API相当于Spark1.x的registTempTable() valdf2=sparkSession.sql("select*fromp1") df2.show()
7.2使用DSL语句的方式对DataFrame进行操作
DSL(domainspecificlanguage)特定领域语言 valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]") valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate() valsc=sparkSession.sparkContext vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest/") //数据清洗 valrowRDD:RDD[Row]=linesRDD.map(line=>{ valsplits:Array[String]=line.split(",") Row(splits(0),splits(1).toInt,splits(2)) }) valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType))) valrowDF:DataFrame=sparkSession.createDataFrame(rowRDD,schema) importsparkSession.implicits._ valdf:DataFrame=rowDF.select("name","age").where("age>10").orderBy($"age".desc) df.show()
8.SparkSQL的输出
8.1写出到JSON文件
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]") valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate() valsc=sparkSession.sparkContext vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest") //数据清洗 valrowRDD:RDD[Row]=linesRDD.map(line=>{ valsplits:Array[String]=line.split(",") Row(splits(0),splits(1).toInt,splits(2)) }) valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType))) valrowDF:DataFrame=sparkSession.createDataFrame(rowRDD,schema) importsparkSession.implicits._ valdf:DataFrame=rowDF.select("name","age").where("age>10").orderBy($"age".desc) df.write.json("hdfs://uplooking02:8020/sparktest1")
8.2写出到关系型数据库(mysql)
valconf=newSparkConf().setAppName("Ops5")setMaster("local[3]") valsparkSession:SparkSession=SparkSession.builder().config(conf).getOrCreate() valsc=sparkSession.sparkContext vallinesRDD:RDD[String]=sc.textFile("hdfs://uplooking02:8020/sparktest") //数据清洗 valrowRDD:RDD[Row]=linesRDD.map(line=>{ valsplits:Array[String]=line.split(",") Row(splits(0),splits(1).toInt,splits(2)) }) valschema=StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("sex",StringType))) valrowDF:DataFrame=sparkSession.createDataFrame(rowRDD,schema) importsparkSession.implicits._ valdf:DataFrame=rowDF.select("name","age").where("age>10").orderBy($"age".desc) valurl="jdbc:mysql://localhost:3306/test" //表会自动创建 valtbName="person1"; valprop=newProperties() prop.put("user","root") prop.put("password","root") //SaveMode默认为ErrorIfExists df.write.mode(SaveMode.Append).jdbc(url,tbName,prop)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。