天天看点

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

文章目录

  • 一、SQL on Hadoop
        • 1.1 SQL是一种传统的用来进行数据分析的标准
        • 1.2 Spark SQL前身
        • 1.3 Spark SQL架构(了解)
          • Spark SQL和其它组件的关系图:
        • 1.4 Spark SQL 运行原理
          • Catalyst优化器-1
          • Catalyst优化器-2
          • Catalyst优化器-3
  • 二、Spark SQL API
        • 1.1 SparkSession
        • 1.2 Dataset
        • 1.3 样例类创建Dataset
          • 练习1:使用Dataset完成零售商店指标统计
        • 1.4 DataFrame
        • 1.5 **RDD与DataFrame对比**
          • 1.5.1 **创建DataFrame** (读json文件转换成DF)
        • 1.6 Spark SQL 常用API
        • 1.7 **RDD->DataFrame**
        • 1.8 DataFrame ->RDD

一、SQL on Hadoop

1.1 SQL是一种传统的用来进行数据分析的标准

​ Hive是原始的SQL-on-Hadoop解决方案

​ Impala:和Hive一样,提供了一种可以针对已有Hadoop数据编写SQL查询的方法

​ Presto:类似于Impala,未被主要供应商支持

​ Shark:Spark SQL的前身,设计目标是作为Hive的一个补充

​ Phoenix:基于HBase的开源SQL查询引擎

1.2 Spark SQL前身

Shark的初衷:让Hive运行在Spark之上(hive on spark不再使用MapReduce引擎)

是对Hive的改造,继承了大量Hive代码,给优化和维护带来了大量的麻烦

1.3 Spark SQL架构(了解)

Spark SQL是Spark的核心组件之一(2014.4 Spark1.0)

能够直接访问现存的Hive数据

提供JDBC/ODBC接口供第三方工具借助Spark进行数据处理

提供了更高层级的接口方便地处理数据(DSL风格API 、SQL)

支持多种操作方式:SQL、API编程

支持多种外部数据源:Parquet、JSON、RDBMS等

Spark SQL和其它组件的关系图:
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.4 Spark SQL 运行原理

Catalyst优化器是Spark SQL的核心

Fronted(SQL代码):首先,前面是我们写的SQL语句

Catalyst(优化器中间转换,将代码转换为逻辑计划):之后需要经过Catalyst优化器去做处理,把我们的的SQL语句转换为Logical Plan(逻辑计划),通过Catalog底层的分析转换,转换成逻辑计划,之后经过逻辑优化器(Logical Optimization优化器的一部分)把逻辑计划转换为优化过的逻辑计划

Backend(逻辑计划转换为物理计划阶段):通过转换转换成Physical Plans(物理计划),物理计划会对这个物理计划做一些处理,最后可以通过select等等查询结果,最后转换成RDD

最后经过底层,SQL语句编程Spark代码

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
Catalyst优化器-1

逻辑计划

select name from // 第三步:拿出id=1 的name

(

select id,name from people //第一步:先查询子查询,从people表中查出id,name,新表命名p

)p

where p.id = 1 // 第二步:过滤id = 1的

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
Catalyst优化器-2

优化:

1、在投影上面查询过滤器

2、检查过滤是否可下压

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
Catalyst优化器-3

物理计划

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

二、Spark SQL API

1.1 SparkSession

SparkContext

SQLContext

Spark SQL的编程入口

HiveContext(SQLContext的子集,可以去处理Hive相关的内容)

SparkSession(Spark 2.x推荐,把SparkContext、SQLCOntext、HiveContext都整做了整合)

SparkSession:合并了SQLContext与HiveContext

提供与Spark功能交互单一入口点,并允许使用DataFrame和Dataset API对Spark进行编程

注意:默认情况下,spark是SparkSession的对象

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.2 Dataset

Dataset (Spark 1.6+)

特定域对象中的强类型集合

1、createDataset()的参数可以是:Seq、Array、RDD

2、上面三行代码生成的Dataset分别是:

Dataset[Int]、Dataset[(String,Int)]、Dataset[(String,Int,Int)]

3、Dataset=RDD+Schema,所以Dataset与RDD有大部共同的函数,如map、filter等

spark-shell演示:

scala> spark.createDataset(1 to 3).show        // 传入序列
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+


scala> spark.createDataset(List("a","b","c")).show
+-----+
|value|
+-----+
|    a|
|    b|
|    c|
+-----+

               ^

scala> spark.createDataset(List(("a",1),("b",2),("c",3))).show
+---+---+
| _1| _2|           // 传入元组    _1表示第一列数据,_2表示第二列数据
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+



scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5)))).show
+---+---+---+---+                                                               
| _1| _2| _3| _4|          // 传入RDD的方式
+---+---+---+---+
|  1|  2|  3|  4|
|  2|  3|  4|  5|
+---+---+---+---+


scala> spark.createDataset(sc.makeRDD(List((1,2,3,4),(2,3,4,5))))      // 查看Dataset是什么
res7: org.apache.spark.sql.Dataset[(Int, Int, Int, Int)] = [_1: int, _2: int ... 2 more fields]    



           

IDE演示:

package createDataset

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}

object CreateDatasetDemo2 extends App{

  // 创建SparkSession
  val spark: SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName(this.getClass.getName)
    .getOrCreate()


  // 需要导入一个隐式转换
  import spark.implicits._


  // 通过seq创建Dataset
  val seqDS: Dataset[Int] = spark.createDataset(1 to 10)
  seqDS.show()

  // 通过List集合创建Dataset
  val ListDS: Dataset[(String, Int)] = spark.createDataset(List(("a",1),("b",2),("c",3)))
  ListDS.show()


  // 通过SparkSession创建SparkContext

  val sc: SparkContext = spark.sparkContext

  // 通过RDD创建Dataset
  val ListRDD: RDD[(String, Int, Int)] = sc.parallelize(List(("a",1,1),("b",2,3),("c",3,3)))
  val rddDS: Dataset[(String, Int, Int)] = spark.createDataset(ListRDD)
  rddDS.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.3 样例类创建Dataset

使用Case Class创建Dataset

Scala中在class关键字前加上case关键字 这个类就成为了样例类,样例类和普通类区别:

(1)不需要new可以直接生成对象

(2)默认实现序列化接口

(3)默认自动覆盖 toString()、equals()、hashCode()

spark-shell 演示:

scala> case class Student(name:String,age:Int)
defined class Student

scala> Seq(Student("zhangsan",15),Student("lisi",20))
res10: Seq[Student] = List(Student(zhangsan,15), Student(lisi,20))

scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS
res11: org.apache.spark.sql.Dataset[Student] = [name: string, age: int]

scala> Seq(Student("zhangsan",15),Student("lisi",20)).toDS.show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 15|
|    lisi| 20|
+--------+---+

           

IDEA演示:

package test

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object CreateDataSetDemo extends App {

  // 定义两个样例类
  case class Point(label:String,x:Double,y:Double)    // 样例类1

  case class Category(id:Int,name:String)     // 样例类2

  // 1、创建一个SparkSession对象
  private val spark: SparkSession = SparkSession.builder().master("local[*]")
    .appName("test01").getOrCreate()


  // 2、创建一个RDD
  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext
  //  里面包含三元组的RDD
  // RDD1
  private val pointRDD: RDD[(String, Double, Double)] = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5)))

  // RDD2
  private val categoriesRDD: RDD[(Int, String)] = sc.makeRDD(List((1,"foo"),(2,"bar")))

  // 3、使用spark对象来创建Dataset
  // IDE和spark-shell下不一样,没有toDS,所以需要先导包
  // import spark.implicits._
  private val ds1: Dataset[(String, Double, Double)] = pointRDD.toDS()

  ds1.show()

  private val poiontDS: Dataset[Point] = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS()

  poiontDS.show()


  private val categoryDS: Dataset[Category] = categoriesRDD.map(x=>Category(x._1,x._2)).toDS()
  
  categoryDS.show()


  // 对这两个Dataset进行join操作
  //  ===比较的是值
  private val df: DataFrame = poiontDS.join(categoryDS,poiontDS("label")===categoryDS("name"))

  df.show()
}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
练习1:使用Dataset完成零售商店指标统计

需求说明

完成数据装载

使用RDD装载零售商店业务数据

customers.csv、orders.csv、order_items.csv、products.csv

定义样例类

Customer、Order、OrderItem、Product

将RDD转换为Dataset

请找出

谁的消费额最高?

哪个产品销量最高?

package practice

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import practice.Demo01.Order_item

object Demo01 extends App {

  // 创建SparkSession对象
  private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test01")
    .getOrCreate()

  // 导包
  import spark.implicits._
  // 创建SparkContext对象
  private val sc: SparkContext = spark.sparkContext

  // 生成RDD
  private val orderFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\orders.csv")

  private val orderDs: Dataset[Order] = orderFile.map(x => {
    // 把引号替换掉
    val fields = x.split(",").map(y => y.replace("\"", ""))
    Order(fields(0), fields(1), fields(2), fields(3))
  }).toDS()
  orderDs.show()



  // 创建样例类,指定字段
  case class Order(id:String,date:String,customerId:String,status:String)


  case class Customer(
                     id:String,
                     fname:String,
                     Iname:String,
                     email:String,
                     password:String,
                     street:String,
                     city:String,
                     state:String,
                     zipcode:String
                     )
  private val CustomerFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\customers.csv")

  private val CusDS: Dataset[Customer] = CustomerFile.map(x => {
    val fileds01 = x.split(",").map(y => y.replace("\"", ""))
    Customer(fileds01(0), fileds01(1), fileds01(2), fileds01(3), fileds01(4), fileds01(5), fileds01(6), fileds01(7), fileds01(8))
  }).toDS()
  CusDS.show()



  case class Order_item(
                       id:String,
                       order_id:String,
                       product_id:String,
                       qunatity:String,
                       subtotal:String,
                       product_price:String
                       )
  private val Order_itme_File: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\order_items.csv")
  private val Order_Item_RDD: Dataset[Order_item] = Order_itme_File.map(x => {
    val fileds03 = x.split(",").map(y => y.replace("\"", ""))
    Order_item(fileds03(0), fileds03(1), fileds03(2), fileds03(3), fileds03(4), fileds03(5))
  }).toDS()
  Order_Item_RDD.show()


  case class Product(
                    id:String,
                    category_id:String,
                    name:String,
                    description:String,
                    price:Double,
                    image:String
                    )
  private val ProductFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\src\\CustomerTest\\products.csv")

  private val ProductRDD: Dataset[Product] = ProductFile.map(x => {
    val filelds04:Array[String] = x.split(",").map(y => y.replace("\"", ""))
    Product(filelds04(0), filelds04(1), filelds04(2), filelds04(3).toDouble, filelds04(4), filelds04(5))
  }).toDS()
  ProductRDD.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.4 DataFrame

DataFrame (Spark 1.4+)

DataFrame=Dataset[Row] + schema ----有点像Dateset的子集

类似传统数据的二维表格

在RDD基础上加入了Schema(数据结构信息)

DataFrame Schema支持嵌套数据类型

struct

map

array

提供更多类似SQL操作的API

1.5 RDD与DataFrame对比

DataFrame只关心每个字段的名字和每个字段的类型,直接把这些属性展示出来,而RDD需要先去获取对象,再获取属性。RDD 关心的只是数据,DataFrame关心的是里面的字段和字段类型

SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
1.5.1 创建DataFrame (读json文件转换成DF)

将JSON文件转成DataFrame

people.json内容如下

{“name”:“Michael”}

{“name”:“Andy”, “age”:30}

{“name”:“Justin”, “age”:19}

IDEA演示:

package DataFrameDemo

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDataFrame extends App {
  
  // 创建一个SparkSession
  val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
    .getOrCreate()

  // 导包,创建SparkContext
  import spark.implicits._
  val sc: SparkContext = spark.sparkContext

  // DataFrame是个type,就是个别名
  //  type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

  // 创建DataFrame
  // 通过spark.read读取文件,生成DataFrame
  val jsonDF: DataFrame = spark.read.json("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  // 和上面直接read写法效果一样
  // private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  jsonDF.show()


  spark.stop()
}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.6 Spark SQL 常用API

常用API:大类分为算子和采用SQL语句执行查询操作

package DataFrameDemo

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDataFrame02 extends App {

  // 创建一个SparkSession
  val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test")
    .getOrCreate()

  // 导包,创建SparkContext
  import spark.implicits._
  val sc: SparkContext = spark.sparkContext

  // DataFrame是个type,就是个别名
  //  type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

  // 创建DataFrame
  // 通过spark.read读取文件,生成DataFrame
  val jsonDF: DataFrame = spark.read.json("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  // 和上面直接read写法效果一样
  // private val JsonFrame: DataFrame = spark.read.format("json").load("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\DataFrameData\\people.json")

  println("----show:查读取文件结果-----")
  jsonDF.show()

  println("-------perintSchema-----")
  // 打印schema信息
  jsonDF.printSchema()

  println("------select:单个字段---------")

  // 使用select
  // 查询name字段生成的结果转换成了DataFrame
  private val df1: DataFrame = jsonDF.select("name")
  df1.show()
 // jsonDF.select("name").show

  println("---select 查多字段-------")
  // 查多个字段     用()来指定
  jsonDF.select(jsonDF("name"),jsonDF("age")).show()

  //对字段操作
  println("------对字段操作--------")
  jsonDF.select(jsonDF("name"),jsonDF("age")+1).show()
  //也可以写成下面那样,每次写jsontoDF太麻烦,可以用$指代
  println("-----$指代的写法-------")
  jsonDF.select($"name",$"age"+2).show()


  // filter
  println("----------filter----------")
  jsonDF.filter($"age">20).show()

  println("------groupBy:分组---------")
  private val df2: DataFrame = jsonDF.groupBy("age").count()
  df2.show()


  println("----以下方式采用sql语句执行查询操作----")
  println("-----createOrReplaceTempView:临时表------")
  /**
    * createOrReplaceTempView:创建临时图,
    * 此图生命周期和用于创建此数据集的SparkSession相关联
    *
    */

  // 把DataFrame注册成一张临时表
  jsonDF.createOrReplaceTempView("people")
  // 结果也是DataFrame
  // 写sql语句
  private val df3: DataFrame = spark.sql("select * from people where age>20")
  df3.show()

  // spark.newSession().sql("select * from people where age>20")  临时表再创建session写SQL会报错,这种写法不太可取

  println("----createOrReplaceGlobalTempView:全局临时表------")
  // 创建全局临时表
  /**
    * 注意点:需要在临时表前加上global_temp.   不然会报错,亲身经历
    */
 jsonDF.createOrReplaceGlobalTempView("p1")
  private val df4: DataFrame = spark.newSession().sql("select * from global_temp.p1 where age>20")
  df4.show()



  spark.stop()
}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.7 RDD->DataFrame

//方式一:通过反射获取RDD内的Schema

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object RDDToDataFrame01 extends App {

  // 创建一个SparkSession对象
  private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("rddToDataFrame")
    .getOrCreate()

  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext

  case class People(name:String,age:Int)

  private val peopleFile: RDD[String] = sc.textFile("file:///D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")

  private val textRDD: RDD[Array[String]] = peopleFile.map(x => {
    x.split(",")
  })

  // 通过样例类将RDD转换成DataFrame
  private val peopleDF: DataFrame = textRDD.map(x=>People(x(0),x(1).trim.toInt)).toDF()

  peopleDF.printSchema()

  peopleDF.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

//方式二:通过编程接口指定Schema

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object RDDToDataFrame02 extends App{

 private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()


  private val sc: SparkContext = spark.sparkContext

  private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")

  private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
    x.split(",")
  })

  // wangwu,19
  // 1、自定义一个schema信息
  // 可能有多个字段,转换成一个数组
  private val schema = StructType(Array(
    StructField("name", StringType, true),
    StructField("age", IntegerType, true)
  ))

  // 2、把RDD转换成Row
  private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))

  // 3、把RDD转换成DataFrame
  // dataframe = dataset + schema
 private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)

  df1.printSchema()
  df1.show()

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

1.8 DataFrame ->RDD

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object RDDToDataFrame02 extends App{

  private val spark: SparkSession = SparkSession.builder().master("local[*]").appName("test").getOrCreate()


  private val sc: SparkContext = spark.sparkContext

  private val peopleFile: RDD[String] = sc.textFile("D:\\All_Studying_Resources\\spark\\codes\\spark_day0104\\people\\people.txt")

  private val peopleRDD: RDD[Array[String]] = peopleFile.map(x => {
    x.split(",")
  })

  // wangwu,19
  // 1、自定义一个schema信息
  // 可能有多个字段,转换成一个数组
  private val schema = StructType(Array(
    StructField("name", StringType, true),
    StructField("age", IntegerType, true)
  ))

  // 2、把RDD转换成Row
  private val mapRDD: RDD[Row] = peopleRDD.map(x=>Row(x(0),x(1).trim.toInt))

  // 3、把RDD转换成DataFrame
  // dataframe = dataset + schema
  private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)

  df1.printSchema()
  df1.show()

// DataFrame --->  RDD
  val rddres: RDD[Row] = df1.rdd
  println(rddres.collect().mkString(" "))

}

           
SparkSQL(一)RDD && Dataset && DataFrame一、SQL on Hadoop二、Spark SQL API

继续阅读