文章目录
- 一、SQL on Hadoop
-
-
-
- 1.1 SQL是一种传统的用来进行数据分析的标准
- 1.2 Spark SQL前身
- 1.3 Spark SQL架构(了解)
-
- Spark SQL和其它组件的关系图:
- 1.4 Spark SQL 运行原理
-
- Catalyst优化器-1
- Catalyst优化器-2
- Catalyst优化器-3
-
-
- 二、Spark SQL API
-
-
-
- 1.1 SparkSession
- 1.2 Dataset
- 1.3 样例类创建Dataset
-
- 练习1:使用Dataset完成零售商店指标统计
- 1.4 DataFrame
- 1.5 **RDD与DataFrame对比**
-
- 1.5.1 **创建DataFrame** (读json文件转换成DF)
- 1.6 Spark SQL 常用API
- 1.7 **RDD->DataFrame**
- 1.8 DataFrame ->RDD
-
-
一、SQL on Hadoop
1.1 SQL是一种传统的用来进行数据分析的标准
Hive是原始的SQL-on-Hadoop解决方案
Impala:和Hive一样,提供了一种可以针对已有Hadoop数据编写SQL查询的方法
Presto:类似于Impala,未被主要供应商支持
Shark:Spark SQL的前身,设计目标是作为Hive的一个补充
Phoenix:基于HBase的开源SQL查询引擎
1.2 Spark SQL前身
Shark的初衷:让Hive运行在Spark之上(hive on spark不再使用MapReduce引擎)
是对Hive的改造,继承了大量Hive代码,给优化和维护带来了大量的麻烦
1.3 Spark SQL架构(了解)
Spark SQL是Spark的核心组件之一(2014.4 Spark1.0)
能够直接访问现存的Hive数据
提供JDBC/ODBC接口供第三方工具借助Spark进行数据处理
提供了更高层级的接口方便地处理数据(DSL风格API 、SQL)
支持多种操作方式:SQL、API编程
支持多种外部数据源:Parquet、JSON、RDBMS等
Spark SQL和其它组件的关系图:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR10MFRkT6FERNFDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzQjM2UTOxMjM3ATMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1.4 Spark SQL 运行原理
Catalyst优化器是Spark SQL的核心
Fronted(SQL代码):首先,前面是我们写的SQL语句
Catalyst(优化器中间转换,将代码转换为逻辑计划):之后需要经过Catalyst优化器去做处理,把我们的的SQL语句转换为Logical Plan(逻辑计划),通过Catalog底层的分析转换,转换成逻辑计划,之后经过逻辑优化器(Logical Optimization优化器的一部分)把逻辑计划转换为优化过的逻辑计划
Backend(逻辑计划转换为物理计划阶段):通过转换转换成Physical Plans(物理计划),物理计划会对这个物理计划做一些处理,最后可以通过select等等查询结果,最后转换成RDD
最后经过底层,SQL语句编程Spark代码
Catalyst优化器-1
逻辑计划
select name from // 第三步:拿出id=1 的name
(
select id,name from people //第一步:先查询子查询,从people表中查出id,name,新表命名p
)p
where p.id = 1 // 第二步:过滤id = 1的
Catalyst优化器-2
优化:
1、在投影上面查询过滤器
2、检查过滤是否可下压
Catalyst优化器-3
物理计划
二、Spark SQL API
1.1 SparkSession
SparkContext
SQLContext
Spark SQL的编程入口
HiveContext(SQLContext的子集,可以去处理Hive相关的内容)
SparkSession(Spark 2.x推荐,把SparkContext、SQLCOntext、HiveContext都整做了整合)
SparkSession:合并了SQLContext与HiveContext
提供与Spark功能交互单一入口点,并允许使用DataFrame和Dataset API对Spark进行编程
注意:默认情况下,spark是SparkSession的对象
1.2 Dataset
Dataset (Spark 1.6+)
特定域对象中的强类型集合
1、createDataset()的参数可以是:Seq、Array、RDD
2、上面三行代码生成的Dataset分别是:
Dataset[Int]、Dataset[(String,Int)]、Dataset[(String,Int,Int)]
3、Dataset=RDD+Schema,所以Dataset与RDD有大部共同的函数,如map、filter等
spark-shell演示:
scala> spark.createDataset(1 to 3).show // 传入序列
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
scala> spark.createDataset(List("a","b","c")).show
+-----+
|value|
+-----+
| a|
| b|
| c|
+-----+
^
scala> spark.createDataset(List(("a",1),("b",2),("c",3))).show
+---+---+
| _1| _2| // 传入元组 _1表示第一列数据,_2表示第二列数据
+---+---+
| a| 1|
| b| 2|
| c| 3|
+---+---+
scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5)))).show
+---+---+---+---+
| _1| _2| _3| _4| // 传入RDD的方式
+---+---+---+---+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
+---+---+---+---+
scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5)))) // 查看Dataset是什么
res7: org.apache.spark.sql.Dataset[(Int, Int, Int, Int)] = [_1: int, _2: int ... 2 more fields]
IDE演示:
package createDataset
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
object CreateDatasetDemo2 extends App{
// 创建SparkSession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
// 需要导入一个隐式转换
import spark.implicits._
// 通过seq创建Dataset
val seqDS: Dataset[Int] = spark.createDataset(1 to 10)
seqDS.show()
// 通过List集合创建Dataset
val ListDS: Dataset[(String, Int)] = spark.createDataset(List(("a",1),("b",2),("c",3)))
ListDS.show()
// 通过SparkSession创建SparkContext
val sc: SparkContext = spark.sparkContext
// 通过RDD创建Dataset
val ListRDD: RDD[(String, Int, Int)] = sc.parallelize(List(("a",1,1),("b",2,3),("c",3,3)))
val rddDS: Dataset[(String, Int, Int)] = spark.createDataset(ListRDD)
rddDS.show()
}
1.3 样例类创建Dataset
使用Case Class创建Dataset
Scala中在class关键字前加上case关键字 这个类就成为了样例类,样例类和普通类区别:
(1)不需要new可以直接生成对象
(2)默认实现序列化接口
(3)默认自动覆盖 toString()、equals()、hashCode()
spark-shell 演示:
scala> case class Student(name:String,age:Int)
defined class Student
scala> Seq(Student("zhangsan",15),Student("lisi",20))
res10: Seq[Student] = List(Student(zhangsan,15), Student(lisi,20))
scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS
res11: org.apache.spark.sql.Dataset[Student] = [name: string, age: int]
scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 15|
| lisi| 20|
+--------+---+
IDEA演示:
package test
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object CreateDataSetDemo extends App {
// 定义两个样例类
case class Point(label:String,x:Double,y:Double) // 样例类1
case class Category(id:Int,name:String) // 样例类2
// 1、创建一个SparkSession对象
private val spark: SparkSession = SparkSession.builder().master("local[*]")
.appName("test01").getOrCreate()
// 2、创建一个RDD
import spark.implicits._
private val sc: SparkContext = spark.sparkContext
// 里面包含三元组的RDD
// RDD1
private val pointRDD: RDD[(String, Double, Double)] = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5)))
// RDD2
private val categoriesRDD: RDD[(Int, String)] = sc.makeRDD(List((1,"foo"),(2,"bar")))
// 3、使用spark对象来创建Dataset
// IDE和spark-shell下不一样,没有toDS,所以需要先导包
// import spark.implicits._
private val ds1: Dataset[(String, Double, Double)] = pointRDD.toDS()
ds1.show()
private val poiontDS: Dataset[Point] = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS()
poiontDS.show()
private val categoryDS: Dataset[Category] = categoriesRDD.map(x=>Category(x._1,x._2)).toDS()
categoryDS.show()
// 对这两个Dataset进行join操作
// ===比较的是值
private val df: DataFrame = poiontDS.join(categoryDS,poiontDS("label")===categoryDS("name"))
df.show()
}
练习1:使用Dataset完成零售商店指标统计
需求说明
完成数据装载
使用RDD装载零售商店业务数据
customers.csv、orders.csv、order_items.csv、products.csv
定义样例类
Customer、Order、OrderItem、Product
将RDD转换为Dataset
请找出
谁的消费额最高?
哪个产品销量最高?
package practice
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import practice.Demo01.Order_item
object Demo01 extends App {
// 创建SparkSession对象
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test01")
.getOrCreate()
// 导包
import spark.implicits._
// 创建SparkContext对象
private val sc: SparkContext = spark.sparkContext
// 生成RDD
private val orderFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\orders.csv")
private val orderDs: Dataset[Order] = orderFile.map(x => {
// 把引号替换掉
val fields = x.split(",").map(y => y.replace("\"", ""))
Order(fields(0), fields(1), fields(2), fields(3))
}).toDS()
orderDs.show()
// 创建样例类,指定字段
case class Order(id:String,date:String,customerId:String,status:String)
case class Customer(
id:String,
fname:String,
Iname:String,
email:String,
password:String,
street:String,
city:String,
state:String,
zipcode:String
)
private val CustomerFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\customers.csv")
private val CusDS: Dataset[Customer] = CustomerFile.map(x => {
val fileds01 = x.split(",").map(y => y.replace("\"", ""))
Customer(fileds01(0), fileds01(1), fileds01(2), fileds01(3), fileds01(4), fileds01(5), fileds01(6), fileds01(7), fileds01(8))
}).toDS()
CusDS.show()
case class Order_item(
id:String,
order_id:String,
product_id:String,
qunatity:String,
subtotal:String,
product_price:String
)
private val Order_itme_File: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\order_items.csv")
private val Order_Item_RDD: Dataset[Order_item] = Order_itme_File.map(x => {
val fileds03 = x.split(",").map(y => y.replace("\"", ""))
Order_item(fileds03(0), fileds03(1), fileds03(2), fileds03(3), fileds03(4), fileds03(5))
}).toDS()
Order_Item_RDD.show()
case class Product(
id:String,
category_id:String,
name:String,
description:String,
price:Double,
image:String
)
private val ProductFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\products.csv")
private val ProductRDD: Dataset[Product] = ProductFile.map(x => {
val filelds04:Array[String] = x.split(",").map(y => y.replace("\"", ""))
Product(filelds04(0), filelds04(1), filelds04(2), filelds04(3).toDouble, filelds04(4), filelds04(5))
}).toDS()
ProductRDD.show()
}
1.4 DataFrame
DataFrame (Spark 1.4+)
DataFrame=Dataset[Row] + schema ----有点像Dateset的子集
类似传统数据的二维表格
在RDD基础上加入了Schema(数据结构信息)
DataFrame Schema支持嵌套数据类型
struct
map
array
提供更多类似SQL操作的API
1.5 RDD与DataFrame对比
DataFrame只关心每个字段的名字和每个字段的类型,直接把这些属性展示出来,而RDD需要先去获取对象,再获取属性。RDD 关心的只是数据,DataFrame关心的是里面的字段和字段类型
1.5.1 创建DataFrame (读json文件转换成DF)
将JSON文件转成DataFrame
people.json内容如下
{“name”:“Michael”}
{“name”:“Andy”, “age”:30}
{“name”:“Justin”, “age”:19}
IDEA演示:
package DataFrameDemo
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDataFrame extends App {
// 创建一个SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
.getOrCreate()
// 导包,创建SparkContext
import spark.implicits._
val sc: SparkContext = spark.sparkContext
// DataFrame是个type,就是个别名
// type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
// 创建DataFrame
// 通过spark.read读取文件,生成DataFrame
val jsonDF: DataFrame = spark.read.json("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
// 和上面直接read写法效果一样
// private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
jsonDF.show()
spark.stop()
}
1.6 Spark SQL 常用API
常用API:大类分为算子和采用SQL语句执行查询操作
package DataFrameDemo
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDataFrame02 extends App {
// 创建一个SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
.getOrCreate()
// 导包,创建SparkContext
import spark.implicits._
val sc: SparkContext = spark.sparkContext
// DataFrame是个type,就是个别名
// type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
// 创建DataFrame
// 通过spark.read读取文件,生成DataFrame
val jsonDF: DataFrame = spark.read.json("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
// 和上面直接read写法效果一样
// private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
println("----show:查读取文件结果-----")
jsonDF.show()
println("-------perintSchema-----")
// 打印schema信息
jsonDF.printSchema()
println("------select:单个字段---------")
// 使用select
// 查询name字段生成的结果转换成了DataFrame
private val df1: DataFrame = jsonDF.select("name")
df1.show()
// jsonDF.select("name").show
println("---select 查多字段-------")
// 查多个字段 用()来指定
jsonDF.select(jsonDF("name"),jsonDF("age")).show()
//对字段操作
println("------对字段操作--------")
jsonDF.select(jsonDF("name"),jsonDF("age")+1).show()
//也可以写成下面那样,每次写jsontoDF太麻烦,可以用$指代
println("-----$指代的写法-------")
jsonDF.select($"name",$"age"+2).show()
// filter
println("----------filter----------")
jsonDF.filter($"age">20).show()
println("------groupBy:分组---------")
private val df2: DataFrame = jsonDF.groupBy("age").count()
df2.show()
println("----以下方式采用sql语句执行查询操作----")
println("-----createOrReplaceTempView:临时表------")
/**
* createOrReplaceTempView:创建临时图,
* 此图生命周期和用于创建此数据集的SparkSession相关联
*
*/
// 把DataFrame注册成一张临时表
jsonDF.createOrReplaceTempView("people")
// 结果也是DataFrame
// 写sql语句
private val df3: DataFrame = spark.sql("select * from people where age>20")
df3.show()
// spark.newSession().sql("select * from people where age>20") 临时表再创建session写SQL会报错,这种写法不太可取
println("----createOrReplaceGlobalTempView:全局临时表------")
// 创建全局临时表
/**
* 注意点:需要在临时表前加上global_temp. 不然会报错,亲身经历
*/
jsonDF.createOrReplaceGlobalTempView("p1")
private val df4: DataFrame = spark.newSession().sql("select * from global_temp.p1 where age>20")
df4.show()
spark.stop()
}
1.7 RDD->DataFrame
//方式一:通过反射获取RDD内的Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object RDDToDataFrame01 extends App {
// 创建一个SparkSession对象
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("rddToDataFrame")
.getOrCreate()
import spark.implicits._
private val sc: SparkContext = spark.sparkContext
case class People(name:String,age:Int)
private val peopleFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")
private val textRDD: RDD[Array[String]] = peopleFile.map(x => {
x.split(",")
})
// 通过样例类将RDD转换成DataFrame
private val peopleDF: DataFrame = textRDD.map(x=>People(x(0),x(1).trim.toInt)).toDF()
peopleDF.printSchema()
peopleDF.show()
}
//方式二:通过编程接口指定Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object RDDToDataFrame02 extends App{
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
private val sc: SparkContext = spark.sparkContext
private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")
private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
x.split(",")
})
// wangwu,19
// 1、自定义一个schema信息
// 可能有多个字段,转换成一个数组
private val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
// 2、把RDD转换成Row
private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))
// 3、把RDD转换成DataFrame
// dataframe = dataset + schema
private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)
df1.printSchema()
df1.show()
}
1.8 DataFrame ->RDD
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object RDDToDataFrame02 extends App{
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
private val sc: SparkContext = spark.sparkContext
private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")
private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
x.split(",")
})
// wangwu,19
// 1、自定义一个schema信息
// 可能有多个字段,转换成一个数组
private val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
// 2、把RDD转换成Row
private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))
// 3、把RDD转换成DataFrame
// dataframe = dataset + schema
private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)
df1.printSchema()
df1.show()
// DataFrame ---> RDD
val rddres: RDD[Row] = df1.rdd
println(rddres.collect().mkString(" "))
}