天天看點

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

文章目錄

  • 一、SQL on Hadoop
        • 1.1 SQL是一種傳統的用來進行資料分析的标準
        • 1.2 Spark SQL前身
        • 1.3 Spark SQL架構(了解)
          • Spark SQL和其它元件的關系圖:
        • 1.4 Spark SQL 運作原理
          • Catalyst優化器-1
          • Catalyst優化器-2
          • Catalyst優化器-3
  • 二、Spark SQL API
        • 1.1 SparkSession
        • 1.2 Dataset
        • 1.3 樣例類建立Dataset
          • 練習1:使用Dataset完成零售商店名額統計
        • 1.4 DataFrame
        • 1.5 **RDD與DataFrame對比**
          • 1.5.1 **建立DataFrame** (讀json檔案轉換成DF)
        • 1.6 Spark SQL 常用API
        • 1.7 **RDD->DataFrame**
        • 1.8 DataFrame ->RDD

一、SQL on Hadoop

1.1 SQL是一種傳統的用來進行資料分析的标準

​ Hive是原始的SQL-on-Hadoop解決方案

​ Impala:和Hive一樣,提供了一種可以針對已有Hadoop資料編寫SQL查詢的方法

​ Presto:類似于Impala,未被主要供應商支援

​ Shark:Spark SQL的前身,設計目标是作為Hive的一個補充

​ Phoenix:基于HBase的開源SQL查詢引擎

1.2 Spark SQL前身

Shark的初衷:讓Hive運作在Spark之上(hive on spark不再使用MapReduce引擎)

是對Hive的改造,繼承了大量Hive代碼,給優化和維護帶來了大量的麻煩

1.3 Spark SQL架構(了解)

Spark SQL是Spark的核心元件之一(2014.4 Spark1.0)

能夠直接通路現存的Hive資料

提供JDBC/ODBC接口供第三方工具借助Spark進行資料處理

提供了更高層級的接口友善地處理資料(DSL風格API 、SQL)

支援多種操作方式:SQL、API程式設計

支援多種外部資料源:Parquet、JSON、RDBMS等

Spark SQL和其它元件的關系圖:
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.4 Spark SQL 運作原理

Catalyst優化器是Spark SQL的核心

Fronted(SQL代碼):首先,前面是我們寫的SQL語句

Catalyst(優化器中間轉換,将代碼轉換為邏輯計劃):之後需要經過Catalyst優化器去做處理,把我們的的SQL語句轉換為Logical Plan(邏輯計劃),通過Catalog底層的分析轉換,轉換成邏輯計劃,之後經過邏輯優化器(Logical Optimization優化器的一部分)把邏輯計劃轉換為優化過的邏輯計劃

Backend(邏輯計劃轉換為實體計劃階段):通過轉換轉換成Physical Plans(實體計劃),實體計劃會對這個實體計劃做一些處理,最後可以通過select等等查詢結果,最後轉換成RDD

最後經過底層,SQL語句程式設計Spark代碼

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
Catalyst優化器-1

邏輯計劃

select name from // 第三步:拿出id=1 的name

(

select id,name from people //第一步:先查詢子查詢,從people表中查出id,name,新表命名p

)p

where p.id = 1 // 第二步:過濾id = 1的

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
Catalyst優化器-2

優化:

1、在投影上面查詢過濾器

2、檢查過濾是否可下壓

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
Catalyst優化器-3

實體計劃

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

二、Spark SQL API

1.1 SparkSession

SparkContext

SQLContext

Spark SQL的程式設計入口

HiveContext(SQLContext的子集,可以去處理Hive相關的内容)

SparkSession(Spark 2.x推薦,把SparkContext、SQLCOntext、HiveContext都整做了整合)

SparkSession:合并了SQLContext與HiveContext

提供與Spark功能互動單一入口點,并允許使用DataFrame和Dataset API對Spark進行程式設計

注意:預設情況下,spark是SparkSession的對象

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.2 Dataset

Dataset (Spark 1.6+)

特定域對象中的強類型集合

1、createDataset()的參數可以是:Seq、Array、RDD

2、上面三行代碼生成的Dataset分别是:

Dataset[Int]、Dataset[(String,Int)]、Dataset[(String,Int,Int)]

3、Dataset=RDD+Schema,是以Dataset與RDD有大部共同的函數,如map、filter等

spark-shell示範:

scala> spark.createDataset(1 to 3).show        // 傳入序列
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+


scala> spark.createDataset(List("a","b","c")).show
+-----+
|value|
+-----+
|    a|
|    b|
|    c|
+-----+

               ^

scala> spark.createDataset(List(("a",1),("b",2),("c",3))).show
+---+---+
| _1| _2|           // 傳入元組    _1表示第一列資料,_2表示第二列資料
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+



scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5)))).show
+---+---+---+---+                                                               
| _1| _2| _3| _4|          // 傳入RDD的方式
+---+---+---+---+
|  1|  2|  3|  4|
|  2|  3|  4|  5|
+---+---+---+---+


scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5))))      // 檢視Dataset是什麼
res7: org.apache.spark.sql.Dataset[(Int, Int, Int, Int)] = [_1: int, _2: int ... 2 more fields]    



           

IDE示範:

package createDataset

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}

object CreateDatasetDemo2 extends App{

  // 建立SparkSession
  val spark: SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()


  // 需要導入一個隐式轉換
  import spark.implicits._


  // 通過seq建立Dataset
  val seqDS: Dataset[Int] = spark.createDataset(1 to 10)
  seqDS.show()

  // 通過List集合建立Dataset
  val ListDS: Dataset[(String, Int)] = spark.createDataset(List(("a",1),("b",2),("c",3)))
  ListDS.show()


  // 通過SparkSession建立SparkContext

  val sc: SparkContext = spark.sparkContext

  // 通過RDD建立Dataset
  val ListRDD: RDD[(String, Int, Int)] = sc.parallelize(List(("a",1,1),("b",2,3),("c",3,3)))
  val rddDS: Dataset[(String, Int, Int)] = spark.createDataset(ListRDD)
  rddDS.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.3 樣例類建立Dataset

使用Case Class建立Dataset

Scala中在class關鍵字前加上case關鍵字 這個類就成為了樣例類,樣例類和普通類差別:

(1)不需要new可以直接生成對象

(2)預設實作序列化接口

(3)預設自動覆寫 toString()、equals()、hashCode()

spark-shell 示範:

scala> case class Student(name:String,age:Int)
defined class Student

scala> Seq(Student("zhangsan",15),Student("lisi",20))
res10: Seq[Student] = List(Student(zhangsan,15), Student(lisi,20))

scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS
res11: org.apache.spark.sql.Dataset[Student] = [name: string, age: int]

scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS.show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 15|
|    lisi| 20|
+--------+---+

           

IDEA示範:

package test

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object CreateDataSetDemo extends App {

  // 定義兩個樣例類
  case class Point(label:String,x:Double,y:Double)    // 樣例類1

  case class Category(id:Int,name:String)     // 樣例類2

  // 1、建立一個SparkSession對象
  private val spark: SparkSession = SparkSession.builder().master("local[*]")
    .appName("test01").getOrCreate()


  // 2、建立一個RDD
  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext
  //  裡面包含三元組的RDD
  // RDD1
  private val pointRDD: RDD[(String, Double, Double)] = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5)))

  // RDD2
  private val categoriesRDD: RDD[(Int, String)] = sc.makeRDD(List((1,"foo"),(2,"bar")))

  // 3、使用spark對象來建立Dataset
  // IDE和spark-shell下不一樣,沒有toDS,是以需要先導包
  // import spark.implicits._
  private val ds1: Dataset[(String, Double, Double)] = pointRDD.toDS()

  ds1.show()

  private val poiontDS: Dataset[Point] = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS()

  poiontDS.show()


  private val categoryDS: Dataset[Category] = categoriesRDD.map(x=>Category(x._1,x._2)).toDS()
  
  categoryDS.show()


  // 對這兩個Dataset進行join操作
  //  ===比較的是值
  private val df: DataFrame = poiontDS.join(categoryDS,poiontDS("label")===categoryDS("name"))

  df.show()
}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
練習1:使用Dataset完成零售商店名額統計

需求說明

完成資料裝載

使用RDD裝載零售商店業務資料

customers.csv、orders.csv、order_items.csv、products.csv

定義樣例類

Customer、Order、OrderItem、Product

将RDD轉換為Dataset

請找出

誰的消費額最高?

哪個産品銷量最高?

package practice

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import practice.Demo01.Order_item

object Demo01 extends App {

  // 建立SparkSession對象
  private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test01")
    .getOrCreate()

  // 導包
  import spark.implicits._
  // 建立SparkContext對象
  private val sc: SparkContext = spark.sparkContext

  // 生成RDD
  private val orderFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\orders.csv")

  private val orderDs: Dataset[Order] = orderFile.map(x => {
    // 把引号替換掉
    val fields = x.split(",").map(y => y.replace("\"", ""))
    Order(fields(0), fields(1), fields(2), fields(3))
  }).toDS()
  orderDs.show()



  // 建立樣例類,指定字段
  case class Order(id:String,date:String,customerId:String,status:String)


  case class Customer(
                     id:String,
                     fname:String,
                     Iname:String,
                     email:String,
                     password:String,
                     street:String,
                     city:String,
                     state:String,
                     zipcode:String
                     )
  private val CustomerFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\customers.csv")

  private val CusDS: Dataset[Customer] = CustomerFile.map(x => {
    val fileds01 = x.split(",").map(y => y.replace("\"", ""))
    Customer(fileds01(0), fileds01(1), fileds01(2), fileds01(3), fileds01(4), fileds01(5), fileds01(6), fileds01(7), fileds01(8))
  }).toDS()
  CusDS.show()



  case class Order_item(
                       id:String,
                       order_id:String,
                       product_id:String,
                       qunatity:String,
                       subtotal:String,
                       product_price:String
                       )
  private val Order_itme_File: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\order_items.csv")
  private val Order_Item_RDD: Dataset[Order_item] = Order_itme_File.map(x => {
    val fileds03 = x.split(",").map(y => y.replace("\"", ""))
    Order_item(fileds03(0), fileds03(1), fileds03(2), fileds03(3), fileds03(4), fileds03(5))
  }).toDS()
  Order_Item_RDD.show()


  case class Product(
                    id:String,
                    category_id:String,
                    name:String,
                    description:String,
                    price:Double,
                    image:String
                    )
  private val ProductFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\products.csv")

  private val ProductRDD: Dataset[Product] = ProductFile.map(x => {
    val filelds04:Array[String] = x.split(",").map(y => y.replace("\"", ""))
    Product(filelds04(0), filelds04(1), filelds04(2), filelds04(3).toDouble, filelds04(4), filelds04(5))
  }).toDS()
  ProductRDD.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.4 DataFrame

DataFrame (Spark 1.4+)

DataFrame=Dataset[Row] + schema ----有點像Dateset的子集

類似傳統資料的二維表格

在RDD基礎上加入了Schema(資料結構資訊)

DataFrame Schema支援嵌套資料類型

struct

map

array

提供更多類似SQL操作的API

1.5 RDD與DataFrame對比

DataFrame隻關心每個字段的名字和每個字段的類型,直接把這些屬性展示出來,而RDD需要先去擷取對象,再擷取屬性。RDD 關心的隻是資料,DataFrame關心的是裡面的字段和字段類型

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
1.5.1 建立DataFrame (讀json檔案轉換成DF)

将JSON檔案轉成DataFrame

people.json内容如下

{“name”:“Michael”}

{“name”:“Andy”, “age”:30}

{“name”:“Justin”, “age”:19}

IDEA示範:

package DataFrameDemo

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDataFrame extends App {
  
  // 建立一個SparkSession
  val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
    .getOrCreate()

  // 導包,建立SparkContext
  import spark.implicits._
  val sc: SparkContext = spark.sparkContext

  // DataFrame是個type,就是個别名
  //  type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

  // 建立DataFrame
  // 通過spark.read讀取檔案,生成DataFrame
  val jsonDF: DataFrame = spark.read.json("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  // 和上面直接read寫法效果一樣
  // private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  jsonDF.show()


  spark.stop()
}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.6 Spark SQL 常用API

常用API:大類分為算子和采用SQL語句執行查詢操作

package DataFrameDemo

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDataFrame02 extends App {

  // 建立一個SparkSession
  val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
    .getOrCreate()

  // 導包,建立SparkContext
  import spark.implicits._
  val sc: SparkContext = spark.sparkContext

  // DataFrame是個type,就是個别名
  //  type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

  // 建立DataFrame
  // 通過spark.read讀取檔案,生成DataFrame
  val jsonDF: DataFrame = spark.read.json("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  // 和上面直接read寫法效果一樣
  // private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  println("----show:查讀取檔案結果-----")
  jsonDF.show()

  println("-------perintSchema-----")
  // 列印schema資訊
  jsonDF.printSchema()

  println("------select:單個字段---------")

  // 使用select
  // 查詢name字段生成的結果轉換成了DataFrame
  private val df1: DataFrame = jsonDF.select("name")
  df1.show()
 // jsonDF.select("name").show

  println("---select 查多字段-------")
  // 查多個字段     用()來指定
  jsonDF.select(jsonDF("name"),jsonDF("age")).show()

  //對字段操作
  println("------對字段操作--------")
  jsonDF.select(jsonDF("name"),jsonDF("age")+1).show()
  //也可以寫成下面那樣,每次寫jsontoDF太麻煩,可以用$指代
  println("-----$指代的寫法-------")
  jsonDF.select($"name",$"age"+2).show()


  // filter
  println("----------filter----------")
  jsonDF.filter($"age">20).show()

  println("------groupBy:分組---------")
  private val df2: DataFrame = jsonDF.groupBy("age").count()
  df2.show()


  println("----以下方式采用sql語句執行查詢操作----")
  println("-----createOrReplaceTempView:臨時表------")
  /**
    * createOrReplaceTempView:建立臨時圖,
    * 此圖生命周期和用于建立此資料集的SparkSession相關聯
    *
    */

  // 把DataFrame注冊成一張臨時表
  jsonDF.createOrReplaceTempView("people")
  // 結果也是DataFrame
  // 寫sql語句
  private val df3: DataFrame = spark.sql("select * from people where age>20")
  df3.show()

  // spark.newSession().sql("select * from people where age>20")  臨時表再建立session寫SQL會報錯,這種寫法不太可取

  println("----createOrReplaceGlobalTempView:全局臨時表------")
  // 建立全局臨時表
  /**
    * 注意點:需要在臨時表前加上global_temp.   不然會報錯,親身經曆
    */
 jsonDF.createOrReplaceGlobalTempView("p1")
  private val df4: DataFrame = spark.newSession().sql("select * from global_temp.p1 where age>20")
  df4.show()



  spark.stop()
}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.7 RDD->DataFrame

//方式一:通過反射擷取RDD内的Schema

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object RDDToDataFrame01 extends App {

  // 建立一個SparkSession對象
  private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("rddToDataFrame")
    .getOrCreate()

  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext

  case class People(name:String,age:Int)

  private val peopleFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")

  private val textRDD: RDD[Array[String]] = peopleFile.map(x => {
    x.split(",")
  })

  // 通過樣例類将RDD轉換成DataFrame
  private val peopleDF: DataFrame = textRDD.map(x=>People(x(0),x(1).trim.toInt)).toDF()

  peopleDF.printSchema()

  peopleDF.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

//方式二:通過程式設計接口指定Schema

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object RDDToDataFrame02 extends App{

 private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()


  private val sc: SparkContext = spark.sparkContext

  private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")

  private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
    x.split(",")
  })

  // wangwu,19
  // 1、自定義一個schema資訊
  // 可能有多個字段,轉換成一個數組
  private val schema = StructType(Array(
    StructField("name", StringType, true),
    StructField("age", IntegerType, true)
  ))

  // 2、把RDD轉換成Row
  private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))

  // 3、把RDD轉換成DataFrame
  // dataframe = dataset + schema
 private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)

  df1.printSchema()
  df1.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.8 DataFrame ->RDD

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object RDDToDataFrame02 extends App{

  private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()


  private val sc: SparkContext = spark.sparkContext

  private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")

  private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
    x.split(",")
  })

  // wangwu,19
  // 1、自定義一個schema資訊
  // 可能有多個字段,轉換成一個數組
  private val schema = StructType(Array(
    StructField("name", StringType, true),
    StructField("age", IntegerType, true)
  ))

  // 2、把RDD轉換成Row
  private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))

  // 3、把RDD轉換成DataFrame
  // dataframe = dataset + schema
  private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)

  df1.printSchema()
  df1.show()

// DataFrame --->  RDD
  val rddres: RDD[Row] = df1.rdd
  println(rddres.collect().mkString(" "))

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

繼續閱讀