文章目錄
- 一、SQL on Hadoop
-
-
-
- 1.1 SQL是一種傳統的用來進行資料分析的标準
- 1.2 Spark SQL前身
- 1.3 Spark SQL架構(了解)
-
- Spark SQL和其它元件的關系圖:
- 1.4 Spark SQL 運作原理
-
- Catalyst優化器-1
- Catalyst優化器-2
- Catalyst優化器-3
-
-
- 二、Spark SQL API
-
-
-
- 1.1 SparkSession
- 1.2 Dataset
- 1.3 樣例類建立Dataset
-
- 練習1:使用Dataset完成零售商店名額統計
- 1.4 DataFrame
- 1.5 **RDD與DataFrame對比**
-
- 1.5.1 **建立DataFrame** (讀json檔案轉換成DF)
- 1.6 Spark SQL 常用API
- 1.7 **RDD->DataFrame**
- 1.8 DataFrame ->RDD
-
-
一、SQL on Hadoop
1.1 SQL是一種傳統的用來進行資料分析的标準
Hive是原始的SQL-on-Hadoop解決方案
Impala:和Hive一樣,提供了一種可以針對已有Hadoop資料編寫SQL查詢的方法
Presto:類似于Impala,未被主要供應商支援
Shark:Spark SQL的前身,設計目标是作為Hive的一個補充
Phoenix:基于HBase的開源SQL查詢引擎
1.2 Spark SQL前身
Shark的初衷:讓Hive運作在Spark之上(hive on spark不再使用MapReduce引擎)
是對Hive的改造,繼承了大量Hive代碼,給優化和維護帶來了大量的麻煩
1.3 Spark SQL架構(了解)
Spark SQL是Spark的核心元件之一(2014.4 Spark1.0)
能夠直接通路現存的Hive資料
提供JDBC/ODBC接口供第三方工具借助Spark進行資料處理
提供了更高層級的接口友善地處理資料(DSL風格API 、SQL)
支援多種操作方式:SQL、API程式設計
支援多種外部資料源:Parquet、JSON、RDBMS等
Spark SQL和其它元件的關系圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR10MFRkT6FERNFDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzQjM2UTOxMjM3ATMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1.4 Spark SQL 運作原理
Catalyst優化器是Spark SQL的核心
Fronted(SQL代碼):首先,前面是我們寫的SQL語句
Catalyst(優化器中間轉換,将代碼轉換為邏輯計劃):之後需要經過Catalyst優化器去做處理,把我們的的SQL語句轉換為Logical Plan(邏輯計劃),通過Catalog底層的分析轉換,轉換成邏輯計劃,之後經過邏輯優化器(Logical Optimization優化器的一部分)把邏輯計劃轉換為優化過的邏輯計劃
Backend(邏輯計劃轉換為實體計劃階段):通過轉換轉換成Physical Plans(實體計劃),實體計劃會對這個實體計劃做一些處理,最後可以通過select等等查詢結果,最後轉換成RDD
最後經過底層,SQL語句程式設計Spark代碼
Catalyst優化器-1
邏輯計劃
select name from // 第三步:拿出id=1 的name
(
select id,name from people //第一步:先查詢子查詢,從people表中查出id,name,新表命名p
)p
where p.id = 1 // 第二步:過濾id = 1的
Catalyst優化器-2
優化:
1、在投影上面查詢過濾器
2、檢查過濾是否可下壓
Catalyst優化器-3
實體計劃
二、Spark SQL API
1.1 SparkSession
SparkContext
SQLContext
Spark SQL的程式設計入口
HiveContext(SQLContext的子集,可以去處理Hive相關的内容)
SparkSession(Spark 2.x推薦,把SparkContext、SQLCOntext、HiveContext都整做了整合)
SparkSession:合并了SQLContext與HiveContext
提供與Spark功能互動單一入口點,并允許使用DataFrame和Dataset API對Spark進行程式設計
注意:預設情況下,spark是SparkSession的對象
1.2 Dataset
Dataset (Spark 1.6+)
特定域對象中的強類型集合
1、createDataset()的參數可以是:Seq、Array、RDD
2、上面三行代碼生成的Dataset分别是:
Dataset[Int]、Dataset[(String,Int)]、Dataset[(String,Int,Int)]
3、Dataset=RDD+Schema,是以Dataset與RDD有大部共同的函數,如map、filter等
spark-shell示範:
scala> spark.createDataset(1 to 3).show // 傳入序列
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
scala> spark.createDataset(List("a","b","c")).show
+-----+
|value|
+-----+
| a|
| b|
| c|
+-----+
^
scala> spark.createDataset(List(("a",1),("b",2),("c",3))).show
+---+---+
| _1| _2| // 傳入元組 _1表示第一列資料,_2表示第二列資料
+---+---+
| a| 1|
| b| 2|
| c| 3|
+---+---+
scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5)))).show
+---+---+---+---+
| _1| _2| _3| _4| // 傳入RDD的方式
+---+---+---+---+
| 1| 2| 3| 4|
| 2| 3| 4| 5|
+---+---+---+---+
scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5)))) // 檢視Dataset是什麼
res7: org.apache.spark.sql.Dataset[(Int, Int, Int, Int)] = [_1: int, _2: int ... 2 more fields]
IDE示範:
package createDataset
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
object CreateDatasetDemo2 extends App{
// 建立SparkSession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(this.getClass.getName)
.getOrCreate()
// 需要導入一個隐式轉換
import spark.implicits._
// 通過seq建立Dataset
val seqDS: Dataset[Int] = spark.createDataset(1 to 10)
seqDS.show()
// 通過List集合建立Dataset
val ListDS: Dataset[(String, Int)] = spark.createDataset(List(("a",1),("b",2),("c",3)))
ListDS.show()
// 通過SparkSession建立SparkContext
val sc: SparkContext = spark.sparkContext
// 通過RDD建立Dataset
val ListRDD: RDD[(String, Int, Int)] = sc.parallelize(List(("a",1,1),("b",2,3),("c",3,3)))
val rddDS: Dataset[(String, Int, Int)] = spark.createDataset(ListRDD)
rddDS.show()
}
1.3 樣例類建立Dataset
使用Case Class建立Dataset
Scala中在class關鍵字前加上case關鍵字 這個類就成為了樣例類,樣例類和普通類差別:
(1)不需要new可以直接生成對象
(2)預設實作序列化接口
(3)預設自動覆寫 toString()、equals()、hashCode()
spark-shell 示範:
scala> case class Student(name:String,age:Int)
defined class Student
scala> Seq(Student("zhangsan",15),Student("lisi",20))
res10: Seq[Student] = List(Student(zhangsan,15), Student(lisi,20))
scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS
res11: org.apache.spark.sql.Dataset[Student] = [name: string, age: int]
scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 15|
| lisi| 20|
+--------+---+
IDEA示範:
package test
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object CreateDataSetDemo extends App {
// 定義兩個樣例類
case class Point(label:String,x:Double,y:Double) // 樣例類1
case class Category(id:Int,name:String) // 樣例類2
// 1、建立一個SparkSession對象
private val spark: SparkSession = SparkSession.builder().master("local[*]")
.appName("test01").getOrCreate()
// 2、建立一個RDD
import spark.implicits._
private val sc: SparkContext = spark.sparkContext
// 裡面包含三元組的RDD
// RDD1
private val pointRDD: RDD[(String, Double, Double)] = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5)))
// RDD2
private val categoriesRDD: RDD[(Int, String)] = sc.makeRDD(List((1,"foo"),(2,"bar")))
// 3、使用spark對象來建立Dataset
// IDE和spark-shell下不一樣,沒有toDS,是以需要先導包
// import spark.implicits._
private val ds1: Dataset[(String, Double, Double)] = pointRDD.toDS()
ds1.show()
private val poiontDS: Dataset[Point] = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS()
poiontDS.show()
private val categoryDS: Dataset[Category] = categoriesRDD.map(x=>Category(x._1,x._2)).toDS()
categoryDS.show()
// 對這兩個Dataset進行join操作
// ===比較的是值
private val df: DataFrame = poiontDS.join(categoryDS,poiontDS("label")===categoryDS("name"))
df.show()
}
練習1:使用Dataset完成零售商店名額統計
需求說明
完成資料裝載
使用RDD裝載零售商店業務資料
customers.csv、orders.csv、order_items.csv、products.csv
定義樣例類
Customer、Order、OrderItem、Product
将RDD轉換為Dataset
請找出
誰的消費額最高?
哪個産品銷量最高?
package practice
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import practice.Demo01.Order_item
object Demo01 extends App {
// 建立SparkSession對象
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test01")
.getOrCreate()
// 導包
import spark.implicits._
// 建立SparkContext對象
private val sc: SparkContext = spark.sparkContext
// 生成RDD
private val orderFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\orders.csv")
private val orderDs: Dataset[Order] = orderFile.map(x => {
// 把引号替換掉
val fields = x.split(",").map(y => y.replace("\"", ""))
Order(fields(0), fields(1), fields(2), fields(3))
}).toDS()
orderDs.show()
// 建立樣例類,指定字段
case class Order(id:String,date:String,customerId:String,status:String)
case class Customer(
id:String,
fname:String,
Iname:String,
email:String,
password:String,
street:String,
city:String,
state:String,
zipcode:String
)
private val CustomerFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\customers.csv")
private val CusDS: Dataset[Customer] = CustomerFile.map(x => {
val fileds01 = x.split(",").map(y => y.replace("\"", ""))
Customer(fileds01(0), fileds01(1), fileds01(2), fileds01(3), fileds01(4), fileds01(5), fileds01(6), fileds01(7), fileds01(8))
}).toDS()
CusDS.show()
case class Order_item(
id:String,
order_id:String,
product_id:String,
qunatity:String,
subtotal:String,
product_price:String
)
private val Order_itme_File: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\order_items.csv")
private val Order_Item_RDD: Dataset[Order_item] = Order_itme_File.map(x => {
val fileds03 = x.split(",").map(y => y.replace("\"", ""))
Order_item(fileds03(0), fileds03(1), fileds03(2), fileds03(3), fileds03(4), fileds03(5))
}).toDS()
Order_Item_RDD.show()
case class Product(
id:String,
category_id:String,
name:String,
description:String,
price:Double,
image:String
)
private val ProductFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\products.csv")
private val ProductRDD: Dataset[Product] = ProductFile.map(x => {
val filelds04:Array[String] = x.split(",").map(y => y.replace("\"", ""))
Product(filelds04(0), filelds04(1), filelds04(2), filelds04(3).toDouble, filelds04(4), filelds04(5))
}).toDS()
ProductRDD.show()
}
1.4 DataFrame
DataFrame (Spark 1.4+)
DataFrame=Dataset[Row] + schema ----有點像Dateset的子集
類似傳統資料的二維表格
在RDD基礎上加入了Schema(資料結構資訊)
DataFrame Schema支援嵌套資料類型
struct
map
array
提供更多類似SQL操作的API
1.5 RDD與DataFrame對比
DataFrame隻關心每個字段的名字和每個字段的類型,直接把這些屬性展示出來,而RDD需要先去擷取對象,再擷取屬性。RDD 關心的隻是資料,DataFrame關心的是裡面的字段和字段類型
1.5.1 建立DataFrame (讀json檔案轉換成DF)
将JSON檔案轉成DataFrame
people.json内容如下
{“name”:“Michael”}
{“name”:“Andy”, “age”:30}
{“name”:“Justin”, “age”:19}
IDEA示範:
package DataFrameDemo
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDataFrame extends App {
// 建立一個SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
.getOrCreate()
// 導包,建立SparkContext
import spark.implicits._
val sc: SparkContext = spark.sparkContext
// DataFrame是個type,就是個别名
// type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
// 建立DataFrame
// 通過spark.read讀取檔案,生成DataFrame
val jsonDF: DataFrame = spark.read.json("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
// 和上面直接read寫法效果一樣
// private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
jsonDF.show()
spark.stop()
}
1.6 Spark SQL 常用API
常用API:大類分為算子和采用SQL語句執行查詢操作
package DataFrameDemo
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDataFrame02 extends App {
// 建立一個SparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
.getOrCreate()
// 導包,建立SparkContext
import spark.implicits._
val sc: SparkContext = spark.sparkContext
// DataFrame是個type,就是個别名
// type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
// 建立DataFrame
// 通過spark.read讀取檔案,生成DataFrame
val jsonDF: DataFrame = spark.read.json("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
// 和上面直接read寫法效果一樣
// private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")
println("----show:查讀取檔案結果-----")
jsonDF.show()
println("-------perintSchema-----")
// 列印schema資訊
jsonDF.printSchema()
println("------select:單個字段---------")
// 使用select
// 查詢name字段生成的結果轉換成了DataFrame
private val df1: DataFrame = jsonDF.select("name")
df1.show()
// jsonDF.select("name").show
println("---select 查多字段-------")
// 查多個字段 用()來指定
jsonDF.select(jsonDF("name"),jsonDF("age")).show()
//對字段操作
println("------對字段操作--------")
jsonDF.select(jsonDF("name"),jsonDF("age")+1).show()
//也可以寫成下面那樣,每次寫jsontoDF太麻煩,可以用$指代
println("-----$指代的寫法-------")
jsonDF.select($"name",$"age"+2).show()
// filter
println("----------filter----------")
jsonDF.filter($"age">20).show()
println("------groupBy:分組---------")
private val df2: DataFrame = jsonDF.groupBy("age").count()
df2.show()
println("----以下方式采用sql語句執行查詢操作----")
println("-----createOrReplaceTempView:臨時表------")
/**
* createOrReplaceTempView:建立臨時圖,
* 此圖生命周期和用于建立此資料集的SparkSession相關聯
*
*/
// 把DataFrame注冊成一張臨時表
jsonDF.createOrReplaceTempView("people")
// 結果也是DataFrame
// 寫sql語句
private val df3: DataFrame = spark.sql("select * from people where age>20")
df3.show()
// spark.newSession().sql("select * from people where age>20") 臨時表再建立session寫SQL會報錯,這種寫法不太可取
println("----createOrReplaceGlobalTempView:全局臨時表------")
// 建立全局臨時表
/**
* 注意點:需要在臨時表前加上global_temp. 不然會報錯,親身經曆
*/
jsonDF.createOrReplaceGlobalTempView("p1")
private val df4: DataFrame = spark.newSession().sql("select * from global_temp.p1 where age>20")
df4.show()
spark.stop()
}
1.7 RDD->DataFrame
//方式一:通過反射擷取RDD内的Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object RDDToDataFrame01 extends App {
// 建立一個SparkSession對象
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("rddToDataFrame")
.getOrCreate()
import spark.implicits._
private val sc: SparkContext = spark.sparkContext
case class People(name:String,age:Int)
private val peopleFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")
private val textRDD: RDD[Array[String]] = peopleFile.map(x => {
x.split(",")
})
// 通過樣例類将RDD轉換成DataFrame
private val peopleDF: DataFrame = textRDD.map(x=>People(x(0),x(1).trim.toInt)).toDF()
peopleDF.printSchema()
peopleDF.show()
}
//方式二:通過程式設計接口指定Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object RDDToDataFrame02 extends App{
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
private val sc: SparkContext = spark.sparkContext
private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")
private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
x.split(",")
})
// wangwu,19
// 1、自定義一個schema資訊
// 可能有多個字段,轉換成一個數組
private val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
// 2、把RDD轉換成Row
private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))
// 3、把RDD轉換成DataFrame
// dataframe = dataset + schema
private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)
df1.printSchema()
df1.show()
}
1.8 DataFrame ->RDD
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object RDDToDataFrame02 extends App{
private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
private val sc: SparkContext = spark.sparkContext
private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")
private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
x.split(",")
})
// wangwu,19
// 1、自定義一個schema資訊
// 可能有多個字段,轉換成一個數組
private val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
// 2、把RDD轉換成Row
private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))
// 3、把RDD轉換成DataFrame
// dataframe = dataset + schema
private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)
df1.printSchema()
df1.show()
// DataFrame ---> RDD
val rddres: RDD[Row] = df1.rdd
println(rddres.collect().mkString(" "))
}