天天看點

Spark SQL,DataFrames和Datasets Guide (spark 2.4.3)概述開始資料源性能調優分布式 SQL 引擎參考:備注

文章目錄

  • 概述
    • SQL
    • Datasets and DataFrames
      • Datasets
      • DataFrames
  • 開始
    • SparkSession
    • 建立DataFrame
    • DataFrame操作
    • 運作SQL查詢程式
    • 全局臨時視圖
    • 建立Datasets
    • 與RDD互動的操作
      • 使用反射方式推導Schema
      • 程式設計接口方式指定表結構
    • 聚合
      • 使用者定義的隐式聚合函數
      • 類型安全的自定義聚合函數
  • 資料源
    • 通用 加載/儲存 功能
      • 手動指定選項
      • 直接在檔案上運作SQL
      • 儲存模式
      • 儲存資料并持久化表
      • Bucketing(分段), Sorting(排序) and Partitioning(分區)
    • Parquet Files
      • 加載資料
      • 分區發現Partition Discovery
      • Schema Merging(模式合并)
      • Hive metastore Parquet table conversion(Hive metastore Parquet table 轉換)
        • Hive/Parquet Schema Reconciliation
        • Metadata Refreshing(中繼資料重新整理)
      • Configuration(配置)
    • ORC Files
    • JSON Files
    • Hive Tables
      • 指定 Hive 表的存儲格式
      • 與不同版本的 Hive Metastore 進行互動
    • JDBC To Other Databases(JDBC 連接配接其它資料庫)
    • Apache Avro Data Source Guide
      • 部署
      • 加載和儲存功能
    • 更多操作
    • Troubleshooting(故障排除)
  • 性能調優
    • 在記憶體中緩存資料
    • 其他配置選項
    • Broadcast Hint for SQL Queries
  • 分布式 SQL 引擎
    • 運作 Thrift JDBC/ODBC 伺服器
    • 運作 Spark SQL CLI
  • 參考:
  • 備注

概述

Spark SQL是Spark的一個元件,用于結構化資料的計算。與基本Spark RDD API不同,Spark SQL提供的接口為Spark提供了關于資料和正在執行的計算的結構的更多資訊。在内部,Spark使用這些額外資訊來執行額外的優化。與Sql SQL互動的方法有多種,包括SQL和DataSet API。當計算一個結果時,使用相同的執行引擎,這與您用來表達計算的API/語言無關。這種統一意味着,基于此API提供的表達一個給定轉化的方式,開發人員可以輕松地在不同API之間來回切換。

SQL

Spark SQL的一個使用是執行SQL查詢。Spark SQL也可以用于從現有的Hive安裝中讀取資料。有關如何配置此功能的更多資訊,請參閱Hive Table部分。當在另一種程式設計語言中運作SQL時,結果将傳回為DataSet/DataFrame。您還可以使用指令行或通過JDBC/ODBC與SQL界面互動。

Datasets and DataFrames

Datasets

Datasets 是分布式資料集合,是Spark 1.6中添加的一個新接口,它提供了RDD的優勢(強類型,使用強大的lambda函數的能力)和Spark SQL優化執行引擎的優點。Dataset可以由JVM對象構造,然後使用功能轉換(map, flatMap, filter等)進行操作。Datasets API在Scala和 Java中可用。Python沒有對Dataset API的支援。但由于Python的動态特性,資料集API的許多好處已經可用(即,您可以通過row.columnName按名稱通路行的字段)。R的情況類似。

DataFrames

DataFrame是由分布式資料集合(Datasets)組成的一系列命名列,它與關系資料庫的表類似,但有很多優化的地方。DataFrame支援多種資料源,包括結構化資料、Hive的表、外部資料庫、RDDs等。DataFrame API支援scala 、java、Python和R語言。在Scala和Java中,DataFrame由Dataset的Rows 表示。在Scala API中,DataFrame它隻是一個類型别名Dataset[Row]。而在Java API中,使用者需要使用Dataset來表示DataFrame。

開始

SparkSession

Spark中所有功能的入口點都是SparkSession類。要建立基本的SparkSession,隻需使用SparkSession.builder():

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
           

完整的例子可以在spark 倉庫中查找examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

SparkSession 在Spark 2.0中提供了對Hive功能的内置支援,包括使用HiveQL編寫查詢,通路Hive UDF以及從Hive表讀取資料的功能。要使用這些功能,不需要對現有的hive進行設定。

建立DataFrame

使用SparkSession,可以從現有的RDD,Hive表或Spark資料源建立DataFrame 。

示例,以下内容基于JSON檔案的内容建立DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
           

在spark 倉庫examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala中可以找到完整示例。

DataFrame操作

DataFrame支援結構化資料領域常用的資料操作,支援Scala、Java、Python和R語言,基本操作示例:

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

           

完整的例子參考spark 倉庫 examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

除了簡單的列引用和表達式之外,資料集還具有豐富的函數庫,包括字元串操作,日期算術,常用數學運算等。完整清單可在DataFrame函數參考中找到。

運作SQL查詢程式

SparkSession使應用程式以程式設計方式運作SQL查詢并傳回DataFrame結果

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
           

完整的例子位于spark 倉庫 examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

全局臨時視圖

Spark SQL中的臨時視圖是會話範圍的,如果建立它的會話終止,則會消失。如果希望有一個臨時視圖在所有會話之間共享,并在Spark應用程式終止之前保持活動狀态,則可以建立一個全局臨時視圖。全局臨時視圖綁定到系統保留的資料庫global_temp中,我們必須使用限定名稱來引用它,例如:SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
           

完整代碼在spark 倉庫的位置examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

建立Datasets

Datasets與RDDS相似,但是,它們不使用Java序列化或Kryo,而是使用專門的編碼器來序列化對象以在網絡上進行處理或傳輸。雖然編碼器和标準序列化都負責将對象轉換成位元組,但編碼器是動态生成的代碼,并使用一種允許Spark執行許多操作(如過濾、排序和哈希)的格式,而不必将位元組反序列化為對象。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
           

完整例子位于:examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

與RDD互動的操作

Spark SQL支援兩種不同方法将現有RDDS轉換成Datasets。第一種方法基于反射方式把RDD轉化為Datasets。這種基于反射的方法可以使代碼更簡潔。

建立Datasets的第二種方法是通過程式設計接口,它允許您構造一個schema,然後将其應用到現有的RDD。雖然這個方法比較繁瑣,但是它允許您在運作之前當列和列的類型都不确定時建構Datasets。

使用反射方式推導Schema

Scala SQL的Scala接口支援将包case classes(樣例類)的RDD自動轉換為DataFrame。case classes(樣例類)定義表的結構。通過反射來讀取case class(樣例類)的參數的名稱,并成為列的名稱。case classes(樣例類)也可以嵌套或包含複雜的類型,如Seqs(隊列)或數組。這些RDD可以被隐式轉換為DataFrame,然後将其注冊為表,用于後續SQL語句。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
           

完整代碼位于spark 倉庫 examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

程式設計接口方式指定表結構

當無法提前定義 case classes(樣例類)(例如,記錄結構被編碼在字元串中,或傳遞的是文本資料集,并且字段将針對不同的使用者進行不同的投影)時,可以通過三個步驟以程式設計方式建立DataFrame。

  • Create an RDD of Rows from the original RDD;(從最初的RDD上建立Row)
  • Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.(建立一個和步驟一中建立的Row結構相同的StructType來代表表的結構)
  • Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession. (通過SparkSession的createDataFrame方法将步驟二中建立的表結構應用到Rows上)
import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
           

完整代碼位于spark倉庫 examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

聚合

DataFrames内置了常見的聚合的函數,如count(), countDistinct(), avg(), max(), min()(等等),而這些函數是為DataFrames設計的, Spark SQL 在Scala和Java中也提供一些類型安全的版本,可以與強類型資料集一起工作。此外,不限制預定義的聚合函數,使用者可以建立自己的聚合函數。

使用者定義的隐式聚合函數

使用者需要繼承UserDefinedAggregateFunction這個抽象類,來實作自定義的隐式轉化函數。例如,使用者定義的平均值可能如下所示:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {
  // Data types of input arguments of this aggregate function
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  // Data types of values in the aggregation buffer
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  // The data type of the returned value
  def dataType: DataType = DoubleType
  // Whether this function always returns the same output on the identical input
  def deterministic: Boolean = true
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // Calculates the final result
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
           

完整例子 examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala

類型安全的自定義聚合函數

繼承Aggregator這個抽象類,來實作自定義的類型安全的自定義聚合函數,例如,類型安全的使用者定義平均值可能如下所示:

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // Specifies the Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
           

完整的例子examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala。

資料源

Spark SQL支援通過DataFrame接口對各種資料源進行操作。DataFrame可以使用關系轉換進行操作,也可以用于建立臨時視圖。将DataFrame注冊為臨時視圖允許您對其資料運作SQL查詢。本節介紹使用Spark資料源加載和儲存資料的一般方法,然後介紹可用于内置資料源的特定選項。

通用 加載/儲存 功能

手動指定選項

您還可以手動指定将要使用的資料源以及要傳遞給資料源的任何其他選項。資料源通過其全名指定(即org.apache.spark.sql.parquet),但内置的來源,你也可以使用自己的短名稱(json,parquet,jdbc,orc,libsvm,csv,text)。從任何資料源類型加載的DataFrame都可以使用此文法轉換為其他類型。

加載一個json檔案:

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
           

在spark 倉庫中完整的例子位置:examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

加載一個csv檔案:

val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")
           

spark 倉庫中完整例子位置examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

加載orc檔案

usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .save("users_with_options.orc")
           

例子:examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

直接在檔案上運作SQL

您可以直接使用SQL查詢該檔案,而不是使用讀取API将檔案加載到DataFrame并進行查詢。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
           

例子:examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

儲存模式

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) “error” or “errorifexists” (default) 将DataFrame儲存到資料源時,如果資料已存在,則會引發異常。
SaveMode.Append “append” 将DataFrame儲存到資料源時,如果資料/表已存在,則DataFrame的内容應附加到現有資料。
SaveMode.Overwrite “overwrite” 覆寫模式意味着在将DataFrame儲存到資料源時,如果資料/表已經存在,則預期現有資料将被DataFrame的内容覆寫。
SaveMode.Ignore “ignore” 忽略模式意味着在将DataFrame儲存到資料源時,如果資料已存在,則預期儲存操作不會儲存DataFrame的内容而不會更改現有資料。這與CREATE TABLE IF NOT EXISTSSQL類似。

儲存資料并持久化表

DataFrames也可以使用該saveAsTable 指令将持久表儲存到Hive Metastore中。請注意,使用此功能不需要現有的Hive部署。Spark将為您建立預設的本地Hive Metastore(使用Derby)。與createOrReplaceTempView指令不同, saveAsTable将實作DataFrame的内容并建立指向Hive Metastore中資料的指針。隻要您保持與同一Metastore的連接配接,即使您的Spark程式重新啟動後,持久表仍然存在。可以通過使用 SparkSession上的方法來建立持久表 。

對于基于檔案的資料源,例如text,parquet,json等,您可以通過path選項指定自定義表路徑 ,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”)。删除表時,将不會删除自定義表路徑,并且表資料仍然存在。如果未指定自定義表路徑,則Spark會将資料寫入倉庫目錄下的預設表路徑。删除表時,也将删除預設表路徑。

從Spark 2.1開始,持久資料源表将每個分區中繼資料存儲在Hive Metastore中。這帶來了幾個好處:

  • 由于Metastore隻能傳回查詢所需的分區,是以不再需要在表的第一個查詢中發現所有分區。
  • Hive DDL ALTER TABLE PARTITION … SET LOCATION現在可用于使用Datasource API建立的表。

    請注意,在建立外部資料源表(帶有path選項的表)時,預設情況下不會收集分區資訊。要同步Metastore中的分區資訊,可以調用MSCK REPAIR TABLE。

Bucketing(分段), Sorting(排序) and Partitioning(分區)

對于基于檔案的資料源,還可以對輸出進行存儲和排序或分區。分段和排序僅适用于持久表:

  • 把資料儲存為Hive表,bucketBy 分桶
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
           

完整例子examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

  • 分區

    而分區則可以同時使用save和saveAsTable使用資料集API。

    usersDF.write.partitionBy(“favorite_color”).format(“parquet”).save(“namesPartByColor.parquet”)

    也可以對單個表同時使用分桶和分區:

    peopleDF

    .write

    .partitionBy(“favorite_color”)

    .bucketBy(42, “name”)

    .saveAsTable(“people_partitioned_bucketed”)

    完整例子:examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

Parquet Files

Parquet是列式存儲格式的一種檔案類型,Parquet是列式存儲格式的一種檔案類型,Spark SQL支援讀取和寫入Parquet檔案,這些檔案自動保留原始資料的模式。

加載資料

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
           

完整的例子:examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

分區發現Partition Discovery

Table partitioning(表分區)是在像 Hive 這樣的系統中使用的常見的優化方法。在 partitioned table(分區表)中,資料通常存儲在不同的目錄中,partitioning column values encoded(分區列值編碼)在每個 partition directory(分區目錄)的路徑中。Parquet data source(Parquet 資料源)現在可以自動 discover(發現)和 infer(推斷)分區資訊。例如,我們可以使用以下 directory structure(目錄結構)将所有以前使用的 population data(人口資料)存儲到 partitioned table(分區表)中,其中有兩個額外的列 gender 和 country 作為 partitioning columns(分區列):

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
           

通過将 path/to/table 傳遞給 SparkSession.read.parquet 或 SparkSession.read.load,Spark SQL 将自動從路徑中提取 partitioning information(分區資訊)。現在傳回的 DataFrame 的 schema(模式)變成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
           

請注意,會自動 inferred(推斷)partitioning columns(分區列)的 data types(資料類型)。目前,支援 numeric data types(數字資料類型)和 string type(字元串類型)。有些使用者可能不想自動推斷 partitioning columns(分區列)的資料類型。對于這些用例,automatic type inference(自動類型推斷)可以由 spark.sql.sources.partitionColumnTypeInference.enabled 配置,預設為 true。當禁用 type inference(類型推斷)時,string type(字元串類型)将用于 partitioning columns(分區列)。

從 Spark 1.6.0 開始,預設情況下,partition discovery(分區發現)隻能找到給定路徑下的 partitions(分區)。對于上述示例,如果使用者将 path/to/table/gender=male 傳遞給 SparkSession.read.parquet 或 SparkSession.read.load,則 gender 将不被視為 partitioning column(分區列)。如果使用者需要指定 partition discovery(分區發現)應該開始的基本路徑,則可以在資料源選項中設定 basePath。例如,當 path/to/table/gender=male 是資料的路徑并且使用者将 basePath 設定為 path/to/table/,gender 将是一個 partitioning column(分區列)。

Schema Merging(模式合并)

像 ProtocolBuffer,Avro 和 Thrift 一樣,Parquet 也支援 schema evolution(模式演進)。使用者可以從一個 simple schema(簡單的架構)開始,并根據需要逐漸向 schema 添加更多的 columns(列)。以這種方式,使用者可能會使用不同但互相相容的 schemas 的 multiple Parquet files(多個 Parquet 檔案)。Parquet data source(Parquet 資料源)現在能夠自動檢測這種情況并 merge(合并)所有這些檔案的 schemas。

由于 schema merging(模式合并)是一個 expensive operation(相對昂貴的操作),并且在大多數情況下不是必需的,是以預設情況下從 1.5.0 開始。你可以按照如下的方式啟用它:

讀取 Parquet 檔案時,将 data source option(資料源選項)mergeSchema 設定為 true(如下面的例子所示),或

将 global SQL option(全局 SQL 選項)spark.sql.parquet.mergeSchema 設定為 true。

// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
           

完整例子:examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

Hive metastore Parquet table conversion(Hive metastore Parquet table 轉換)

當讀取和寫入 Hive metastore Parquet 表時,Spark SQL 将嘗試使用自己的 Parquet support(Parquet 支援),而不是 Hive SerDe 來獲得更好的性能。此 behavior(行為)由 spark.sql.hive.convertMetastoreParquet 配置控制,預設情況下 turned on(打開)。

Hive/Parquet Schema Reconciliation

從 table schema processing(表格模式處理)的角度來說,Hive 和 Parquet 之間有兩個關鍵的差別。

  1. Hive 不區分大小寫,而 Parquet 不是
  2. Hive 認為所有 columns(列)都可以為空,而 Parquet 中的可空性是 significant(重要)的。

    由于這個原因,當将 Hive metastore Parquet 表轉換為 Spark SQL Parquet 表時,我們必須調整 metastore schema 與 Parquet schema。reconciliation 規則是:

  • 在兩個 schema 中具有 same name(相同名稱)的 Fields(字段)必須具有 same data type(相同的資料類型),而不管 nullability(可空性)。reconciled field 應具有 Parquet 的資料類型,以便 nullability(可空性)得到尊重。
  • reconciled schema(調和模式)正好包含 Hive metastore schema 中定義的那些字段。

    1) 隻出現在 Parquet schema 中的任何字段将被 dropped(删除)在 reconciled schema 中。

    2)僅在 Hive metastore schema 中出現的任何字段在 reconciled schema 中作為 nullable field(可空字段)添加。

Metadata Refreshing(中繼資料重新整理)

Spark SQL 緩存 Parquet metadata 以獲得更好的性能。當啟用 Hive metastore Parquet table conversion(轉換)時,這些 converted tables(轉換表)的 metadata(中繼資料)也被 cached(緩存)。如果這些表由 Hive 或其他外部工具更新,則需要手動重新整理以確定 consistent metadata(一緻的中繼資料)。

// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
           

Configuration(配置)

可以使用 SparkSession 上的 setConf 方法或使用 SQL 運作 SET key = value 指令來完成 Parquet 的配置.

Property Name(參數名稱) Default(預設) Meaning(含義)
spark.sql.parquet.binaryAsString false 一些其他 Parquet-producing systems(Parquet 生産系統),特别是 Impala,Hive 和舊版本的 Spark SQL,在 writing out(寫出)Parquet schema 時,不區分 binary data(二進制資料)和 strings(字元串)。該 flag 告訴 Spark SQL 将 binary data(二進制資料)解釋為 string(字元串)以提供與這些系統的相容性。
spark.sql.parquet.int96AsTimestamp true 一些 Parquet-producing systems,特别是 Impala 和 Hive,将 Timestamp 存入INT96。該 flag 告訴 Spark SQL 将 INT96 資料解析為 timestamp 以提供與這些系統的相容性。
spark.sql.parquet.compression.codec snappy 在編寫 Parquet 檔案時設定 compression codec(壓縮編解碼器)的使用。可接受的值包括:uncompressed,snappy,gzip,lzo。
spark.sql.parquet.filterPushdown true 設定為 true 時啟用 Parquet filter push-down optimization。
spark.sql.hive.convertMetastoreParquet true 當設定為 false 時,Spark SQL 将使用 Hive SerDe 作為 parquet tables,而不是内置的支援。
spark.sql.parquet.mergeSchema false 當為 true 時,Parquet data source(Parquet 資料源)merges(合并)從所有 data files(資料檔案)收集的 schemas,否則如果沒有可用的 summary file,則從 summary file 或 random data file 中挑選 schema。
spark.sql.parquet.writeLegacyFormat false 如果為true,則資料将以Spark 1.4及更早版本的方式寫入。例如,十進制值将以Apache Parquet的固定長度位元組數組格式寫入,其他系統(如Apache Hive和Apache Impala)也使用該格式。如果為false,将使用Parquet中的較新格式。例如,小數将以基于int的格式寫入。如果Parquet輸出旨在用于不支援此較新格式的系統,請設定為true。

ORC Files

從Spark 2.3開始,Spark支援帶有ORC檔案的新ORC檔案格式的矢量化ORC閱讀器。為此,新添加了以下配置。

屬性名 預設值 含義
spark.sql.orc.impl native 可以選擇native 和 hive。native表示在Apache ORC 1.4上建構ORC支援。

hive

表示Hive 1.2.1中的ORC庫。
spark.sql.orc.enableVectorizedReader true 這個是針對spark.sql.orc.impl屬性為native 的。如果false,在native實作中使用新的非向量化ORC閱讀器。

JSON Files

Spark SQL 可以 automatically infer(自動推斷)JSON dataset 的 schema,并将其作為 Dataset[Row] 加載。這個 conversion(轉換)可以在 Dataset[String] 上使用 SparkSession.read.json() 來完成,或 JSON 檔案。

請注意,以 a json file 提供的檔案不是典型的 JSON 檔案。每行必須包含一個 separate(單獨的),self-contained valid(獨立的有效的)JSON 對象。有關更多資訊,請參閱 JSON Lines text format, also called newline-delimited JSON。

對于 regular multi-line JSON file(正常的多行 JSON 檔案),将 multiLine 選項設定為 true。

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
           

完整例子:examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

Hive Tables

Spark SQL 還支援讀取和寫入存儲在 Apache Hive 中的資料。但是,由于 Hive 具有大量依賴關系,是以這些依賴關系不包含在預設 Spark 分發中。如果在類路徑中找到 Hive 依賴項,Spark 将自動加載它們。請注意,這些 Hive 依賴關系也必須存在于所有工作節點上,因為它們将需要通路 Hive 序列化和反序列化庫(SerDes),以通路存儲在 Hive 中的資料。

通過将 hive-site.xml,core-site.xml(用于安全配置)和 hdfs-site.xml(用于 HDFS 配置)檔案放在 conf/ 中來完成配置。

當使用 Hive 時,必須用 Hive 支援執行個體化 SparkSession,包括連接配接到持續的 Hive 轉移,支援 Hive serdes 和 Hive 使用者定義的功能。沒有現有 Hive 部署的使用者仍然可以啟用 Hive 支援。當 hive-site.xml 未配置時,上下文會自動在目前目錄中建立 metastore_db,并建立由 spark.sql.warehouse.dir 配置的目錄,該目錄預設為Spark應用程式目前目錄中的 spark-warehouse 目錄 開始了 請注意,自從2.0.0以來,hive-site.xml 中的 hive.metastore.warehouse.dir 屬性已被棄用。而是使用 spark.sql.warehouse.dir 來指定倉庫中資料庫的預設位置。您可能需要向啟動 Spark 應用程式的使用者授予寫權限。

import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_ints").show()
// +---+
// |key|
// +---+
// |  0|
// |  1|
// |  2|
// ...

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()
           

完整的例子:examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala

指定 Hive 表的存儲格式

建立 Hive 表時,需要定義如何 從/向 檔案系統 read/write 資料,即 “輸入格式” 和 “輸出格式”。您還需要定義該表如何将資料反序列化為行,或将行序列化為資料,即 “serde”。以下選項可用于指定存儲格式(“serde”, “input format”, “output format”),例如,CREATE TABLE src(id int) USING hive OPTIONS(fileFormat ‘parquet’)。預設情況下,我們将以純文字形式讀取表格檔案。請注意,Hive 存儲處理程式在建立表時不受支援,您可以使用 Hive 端的存儲處理程式建立一個表,并使用 Spark SQL 來讀取它。

Property Name Meaning
fileFormat fileFormat是一種存儲格式規範的包,包括 “serde”,“input format” 和 “output format”。目前我們支援6個檔案格式:‘sequencefile’,‘rcfile’,‘orc’,‘parquet’,‘textfile’和’avro’
inputFormat, outputFormat 這兩個選項将相應的 “InputFormat” 和 “OutputFormat” 類的名稱指定為字元串文字,例如:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。這兩個選項必須成對出現,如果您已經指定了 “fileFormat” 選項,則無法指定它們。
serde 此選項指定 serde 類的名稱。當指定 fileFormat 選項時,如果給定的 fileFormat 已經包含 serde 的資訊,那麼不要指定這個選項。目前的 “sequencefile”,“textfile” 和 “rcfile” 不包含 serde 資訊,你可以使用這3個檔案格式的這個選項。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 這些選項隻能與 “textfile” 檔案格式一起使用。它們定義如何将分隔的檔案讀入行。

使用 OPTIONS 定義的所有其他屬性将被視為 Hive serde 屬性。

與不同版本的 Hive Metastore 進行互動

Spark SQL 的 Hive 支援的最重要的部分之一是與 Hive metastore 進行互動,這使得 Spark SQL 能夠通路 Hive 表的中繼資料。從 Spark 1.4.0 開始,使用 Spark SQL 的單一二進制建構可以使用下面所述的配置來查詢不同版本的 Hive 轉移。請注意,獨立于用于與轉移點通信的 Hive 版本,内部 Spark SQL 将針對 Hive 1.2.1 進行編譯,并使用這些類進行内部執行(serdes,UDF,UDAF等)。

以下選項可用于配置用于檢索中繼資料的 Hive 版本:

Property Name Default Meaning
spark.sql.hive.metastore.version 1.2.1 Hive metastore 版本。可用選項為 0.12.0 至 2.3.3。
spark.sql.hive.metastore.jars builtin 應該用于執行個體化HiveMetastoreClient的jar的位置。此屬性可以是以下三個選項之一:1. builtin使用Hive 1.2.1,它在-Phive啟用時與Spark程式集捆綁在一起。選擇此選項時,spark.sql.hive.metastore.version必須1.2.1定義或不定義。2. mave使用從Maven存儲庫下載下傳的指定版本的Hive jar。通常不建議将此配置用于生産部署。3. JVM 的标準格式的 classpath。該類路徑必須包含所有 Hive 及其依賴項,包括正确版本的 Hadoop。這些jars隻需要存在于 driver 程式中,但如果您正在運作在 yarn 叢集模式,那麼您必須確定它們與應用程式一起打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 使用逗号分隔的類字首清單,應使用在 Spark SQL 和特定版本的 Hive 之間共享的類加載器來加載。一個共享類的示例就是用來通路 Hive metastore 的 JDBC driver。其它需要共享的類,是需要與已經共享的類進行互動的。例如,log4j 使用的自定義 appender。
spark.sql.hive.metastore.barrierPrefixes (empty) 一個逗号分隔的類字首清單,應該明确地為 Spark SQL 正在通信的 Hive 的每個版本重新加載。例如,在通常将被共享的字首中聲明的 Hive UDF(即: org.apache.spark.*)。

JDBC To Other Databases(JDBC 連接配接其它資料庫)

Spark SQL 還包括可以使用 JDBC 從其他資料庫讀取資料的資料源。此功能應優于使用 JdbcRDD。這是因為結果作為 DataFrame 傳回,并且可以輕松地在 Spark SQL 中處理或與其他資料源連接配接。JDBC 資料源也更容易從 Java 或 Python 使用,因為它不需要使用者提供 ClassTag。(請注意,這不同于 Spark SQL JDBC 伺服器,允許其他應用程式使用 Spark SQL 運作查詢)。

要開始使用,您需要在 Spark 類路徑中包含特定資料庫的 JDBC driver 程式。例如,要從 Spark Shell 連接配接到 postgres,您将運作以下指令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
           

可以使用 Data Sources API 将來自遠端資料庫的表作為 DataFrame 或 Spark SQL 臨時視圖進行加載。使用者可以在資料源選項中指定 JDBC 連接配接屬性。使用者 和 密碼通常作為登入資料源的連接配接屬性提供。除了連接配接屬性外,Spark 還支援以下不區分大小寫的選項:

屬性名稱 含義
url 要連接配接的JDBC URL。源特定的連接配接屬性可以在URL中指定。例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 應該讀取的 JDBC 表。請注意,可以使用在SQL查詢的 FROM 子句中有效的任何内容。例如,您可以使用括号中的子查詢代替完整表。
query 用于将資料讀入Spark的查詢。指定的查詢将被括起來并在FROM子句中用作子查詢。Spark還會為子查詢子句配置設定别名。例如,spark将向JDBC Source發出以下形式的查詢。SELECT FROM (<user_specified_query>) spark_gen_alias 使用此選項時,以下是一些限制。不允許同時指定

dbtable

query

選項。不允許同時指定

query

partitionColumn

選項。當需要指定

partitionColumn

選項時,可以使用

dbtable

選項指定子查詢,并且可以使用作為

dbtable

的一部分提供的子查詢别名來限定分區列。例spark.read.format(“jdbc”).option(“url”, jdbcUrl).option(“query”, “select c1, c2 from t1”).load()
driver 用于連接配接到此 URL 的 JDBC driver 程式的類名
partitionColumn, lowerBound, upperBound 如果指定了這些選項中的任何一個,則必須全部指定這些選項。另外,必須指定 numPartitions。他們描述如何從多個 worker 并行讀取資料時将表給分區。partitionColumn 必須是有問題的表中的數字列。請注意,lowerBound 和 upperBound 僅用于決定分區的大小,而不是用于過濾表中的行。是以,表中的所有行将被分區并傳回。此選項僅适用于讀操作。
numPartitions 在表讀寫中可以用于并行度的最大分區數。這也确定并發JDBC連接配接的最大數量。如果要寫入的分區數超過此限制,則在寫入之前通過調用 coalesce(numPartitions) 将其減少到此限制。
queryTimeout 驅動程式等待Statement對象執行到指定秒數的秒數。零意味着沒有限制。在寫入路徑中,此選項取決于JDBC驅動程式如何實作API setQueryTimeout,例如,h2 JDBC驅動程式檢查每個查詢的逾時而不是整個JDBC批處理。它預設為0。
fetchsize JDBC 抓取的大小,用于确定每次資料往返傳遞的行數。這有利于提升 JDBC driver 的性能,它們的預設值較小(例如:Oracle 是 10 行)。該選項僅适用于讀取操作。
batchsize JDBC 批處理的大小,用于确定每次資料往返傳遞的行數。這有利于提升 JDBC driver 的性能。該選項僅适用于寫操作。預設值為 1000。
isolationLevel 事務隔離級别,适用于目前連接配接。它可以是 NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或 SERIALIZABLE 之一,對應于 JDBC 連接配接對象定義的标準事務隔離級别,預設為 READ_UNCOMMITTED。此選項僅适用于寫操作。請參考 java.sql.Connection 中的文檔。
sessionInitStatement 在向遠端資料庫打開每個資料庫會話之後,在開始讀取資料之前,此選項将執行自定義SQL語句(或PL / SQL塊)。使用它來實作會話初始化代碼。例:option(“sessionInitStatement”, “”“BEGIN execute immediate ‘alter session set “_serial_direct_read”=true’; END;”"")
truncate 這是一個與 JDBC 相關的選項。啟用 SaveMode.Overwrite 時,此選項會導緻 Spark 截斷現有表,而不是删除并重新建立。這可以更有效,并且防止表中繼資料(例如,索引)被移除。但是,在某些情況下,例如當新資料具有不同的模式時,它将無法工作。它預設為 false。此選項僅适用于寫操作。
cascadeTruncate 謹慎使用 級聯截斷 适用于寫操作
createTableOptions 這是一個與JDBC相關的選項。如果指定,此選項允許在建立表時設定特定于資料庫的表和分區選項(例如:CREATE TABLE t (name string) ENGINE=InnoDB.)。此選項僅适用于寫操作。
createTableColumnTypes 建立表時要使用的資料庫列資料類型,而不是預設值。資料類型資訊的格式應與建立表列文法相同(例如:“name char(64),comments varchar(1024)”)。指定的類型應為有效的Spark SQL資料類型。此選項僅适用于寫入。
customSchema 用于從JDBC連接配接器讀取資料的自定義架構。例如,“id DECIMAL(38, 0), name STRING”。您還可以指定部分字段,其他字段使用預設類型映射。例如,“id DECIMAL(38, 0)”。列名應與JDBC表的相應列名相同。使用者可以指定Spark SQL的相應資料類型,而不是使用預設值。此選項僅适用于閱讀。
pushDownPredicate 用于啟用或禁用謂詞下推到JDBC資料源的選項。預設值為true,在這種情況下,Spark會盡可能地将過濾器下推到JDBC資料源。否則,如果設定為false,則不會将過濾器下推到JDBC資料源,是以所有過濾器都将由Spark處理。當Spark通過比JDBC資料源更快地執行謂詞過濾時,謂詞下推通常會被關閉。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
           

完整例子examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala

Apache Avro Data Source Guide

部署

該spark-avro子產品是外部的,預設情況下不包括在spark-submit或spark-shell。

與任何Spark應用程式一樣,spark-submit用于啟動您的應用程式。spark-avro_2.12 通過–packages添加依賴關系可以直接添加到spark-submit使用,例如,

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.3 ...
           

對于調試 也可通過park-shell --packages進行添加

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.3 ...
           

詳細資訊參照應用送出指南

加載和儲存功能

要以Avro格式加載/儲存資料,您需要将資料源選項指定format為avro(或org.apache.spark.sql.avro)。

val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
           

更多操作

其他操作

Troubleshooting(故障排除)

  • JDBC driver 程式類必須對用戶端會話和所有執行程式上的原始類加載器可見。這是因為 Java 的 DriverManager 類執行安全檢查,導緻它忽略原始類加載器不可見的所有 driver 程式,當打開連接配接時。一個友善的方法是修改所有工作節點上的compute_classpath.sh 以包含您的 driver 程式 JAR。
  • 一些資料庫,例如 H2,将所有名稱轉換為大寫。您需要使用大寫字母來引用 Spark SQL 中的這些名稱。
  • 使用者可以在資料源選項中指定特定于供應商的JDBC連接配接屬性以進行特殊處理。例如,spark.read.format(“jdbc”).option(“url”, oracleJdbcUrl).option(“oracle.jdbc.mapDateToTimestamp”, “false”)。oracle.jdbc.mapDateToTimestamp預設為true,使用者通常需要禁用此标志以避免Oracle日期被解析為時間戳。

性能調優

對于某些工作負載,可以通過緩存記憶體中的資料或打開一些實驗選項來提高性能。

在記憶體中緩存資料

Spark SQL 可以通過調用 spark.catalog.cacheTable(“tableName”) 或 dataFrame.cache() 來使用記憶體中的列格式來緩存表。然後,Spark SQL 将隻掃描所需的列,并将自動調整壓縮以最小化記憶體使用量和 GC 壓力。您可以調用 spark.catalog.uncacheTable(“tableName”) 從記憶體中删除該表。

記憶體緩存的配置可以使用 SparkSession 上的 setConf 方法或使用 SQL 運作 SET key=value 指令來完成。

屬性名稱 預設 含義
spark.sql.inMemoryColumnarStorage.compressed true 當設定為 true 時,Spark SQL 将根據資料的統計資訊為每個列自動選擇一個壓縮編解碼器。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制批量的緩存的大小。更大的批量大小可以提高記憶體使用率和壓縮率,但是在緩存資料時會有 OOM 風險。

其他配置選項

以下選項也可用于調整查詢執行的性能。這些選項可能會在将來的版本中被廢棄,因為更多的優化是自動執行的。

屬性名稱 預設值 含義
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在讀取檔案時,将單個分區的最大位元組數。
spark.sql.files.openCostInBytes 4194304 (4 MB) 按照位元組數來衡量的打開檔案的估計費用可以在同一時間進行掃描。将多個檔案放入分區時使用。最好過度估計,那麼具有小檔案的分區将比具有較大檔案的分區(首先計劃的)更快。
spark.sql.broadcastTimeout 300 廣播連接配接中的廣播等待時間逾時(秒)
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置執行連接配接時将廣播給所有工作節點的表的最大大小(以位元組為機關)。通過将此值設定為-1可以禁用廣播。請注意,目前的統計資訊僅支援 Hive Metastore 表,其中已運作指令 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan。
spark.sql.shuffle.partitions 200 配置分區數 在使用joins 或者aggregations 産生shuffler的時候。

Broadcast Hint for SQL Queries

The BROADCAST hint guides Spark to broadcast each specified table when joining them with another table or view. When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is above the configuration spark.sql.autoBroadcastJoinThreshold. When both sides of a join are specified, Spark broadcasts the one having the lower statistics. Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ. When the broadcast nested loop join is selected, we still respect the hint.

import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()
           

分布式 SQL 引擎

Spark SQL 也可以充當使用其 JDBC/ODBC 或指令行界面的分布式查詢引擎。在這種模式下,最終使用者或應用程式可以直接與 Spark SQL 互動運作 SQL 查詢,而不需要編寫任何代碼。

運作 Thrift JDBC/ODBC 伺服器

這裡實作的 Thrift JDBC/ODBC 伺服器對應于 Hive 1.2.1 中的 HiveServer2。您可以使用 Spark 或 Hive 1.2.1 附帶的直線腳本測試 JDBC 伺服器。

要啟動 JDBC/ODBC 伺服器,請在 Spark 目錄中運作以下指令:

./sbin/start-thriftserver.sh
           

此腳本接受所有 bin/spark-submit 指令行選項,以及 --hiveconf 選項來指定 Hive 屬性。您可以運作 ./sbin/start-thriftserver.sh --help 檢視所有可用選項的完整清單。預設情況下,伺服器監聽 localhost:10000\。您可以通過環境變量覆寫此行為,即:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...
           

or system properties:

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...
           

現在,您可以使用 beeline 來測試 Thrift JDBC/ODBC 伺服器:

./bin/beeline 
           

使用 beeline 方式連接配接到 JDBC/ODBC 伺服器:

beeline> !connect jdbc:hive2://localhost:10000 
           

Beeline 将要求您輸入使用者名和密碼。在非安全模式下,隻需輸入機器上的使用者名和空白密碼即可。對于安全模式,請按照 beeline 文檔 中的說明進行操作。

配置Hive是通過将 hive-site.xml, core-site.xml 和 hdfs-site.xml 檔案放在 conf/ 中完成的。

您也可以使用 Hive 附帶的 beeline 腳本。

Thrift JDBC 伺服器還支援通過 HTTP 傳輸發送 thrift RPC 消息。使用以下設定啟用 HTTP 模式作為系統屬性或在 conf/ 中的 hive-site.xml 檔案中啟用:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice 
           

要測試,請使用 beeline 以 http 模式連接配接到 JDBC/ODBC 伺服器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint> 
           

運作 Spark SQL CLI

Spark SQL CLI 是在本地模式下運作 Hive 轉移服務并執行從指令行輸入的查詢的友善工具。請注意,Spark SQL CLI 不能與 Thrift JDBC 伺服器通信。

要啟動 Spark SQL CLI,請在 Spark 目錄中運作以下指令:

./bin/spark-sql 
           

配置 Hive 是通過将 hive-site.xml,core-site.xml 和 hdfs-site.xml 檔案放在 conf/ 中完成的。您可以運作 ./bin/spark-sql --help 擷取所有可用選項的完整清單。

參考:

  1. https://spark.apache.org/docs/latest/sql-programming-guide.html
  2. https://www.cnblogs.com/BYRans/p/5057110.html
  3. https://blog.csdn.net/s127838498/article/details/84107849
  4. https://www.cnblogs.com/bigbigtree/p/5691206.html
  5. spark2.4.0 視訊參考
  6. Parquet 使用
  7. spark 2.2.0 中文文檔
  8. ORC FILE 檔案結構

備注

  1. spark 倉庫

繼續閱讀