天天看點

Spark SQL的Parquet那些事兒

Parquet是一種列式存儲格式,很多種處理引擎都支援這種存儲格式,也是sparksql的預設存儲格式。Spark SQL支援靈活的讀和寫Parquet檔案,并且對parquet檔案的schema可以自動解析。當Spark SQL需要寫成Parquet檔案時,處于相容的原因所有的列都被自動轉化為了nullable。

1讀寫Parquet檔案

// Encoders for most common types are automatically provided by importing spark.implicits._

import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information

peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above

// Parquet files are self-describing so the schema is preserved

// The result of loading a Parquet file is also a DataFrame

val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements

parquetFileDF.createOrReplaceTempView("parquetFile")

val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")

namesDF.map(attributes => "Name: " + attributes(0)).show()

// +------------+

// | value|

// |Name: Justin|

2分區發現

分區表時很多系統支援的,比如hive,對于一個分區表,往往是采用表中的某一或多個列去作為分區的依據,分區是以檔案目錄的形式展現。所有内置的檔案源(Text/CSV/JSON/ORC/Parquet)都支援自動的發現和推測分區資訊。例如,我們想取兩個分區列,gender和country,先按照性别分區,再按照國家分區:

path

└── to

└── table

├── gender=male
   │   ├── ...
   │   │
   │   ├── country=US
   │   │   └── data.parquet
   │   ├── country=CN
   │   │   └── data.parquet
   │   └── ...
   └── gender=female
       ├── ...
       │
       ├── country=US
       │   └── data.parquet
       ├── country=CN
       │   └── data.parquet
       └── ...SparkSession.read.parquet 或者 SparkSession.read.load讀取的目錄為path/to/table的時候,會自動從路徑下抽取分區資訊。傳回DataFrame的表結構為:
           

root

|-- name: string (nullable = true)

|-- age: long (nullable = true)

|-- gender: string (nullable = true)

|-- country: string (nullable = true)細細分析一下你也會發現分區列的資料類型也是自動推斷的。目前支援的資料類型有,數字類型,date,timestamp和string類型。有時候使用者可能不希望自動推斷分區列的類型,這時候隻需要将spark.sql.sources.partitionColumnTypeInference.enabled配置為false即可。如果分區列的類型推斷這個

遊戲拍賣

參數設定為了false,那麼分區列的類型會被認為是string。

從spark 1.6開始,分區發現預設情況隻會發現給定路徑下的分區。比如,上面的分區表,假如你講路徑path/to/table/gender=male傳遞給SparkSession.read.parquet 或者 SparkSession.read.load 那麼gender不會被認為是分區列。如果想檢測到該分區,傳給spark的路徑應該是其父路徑也即是path/to/table/,這樣gender就會被認為是分區列。

3schema合并

跟protocol buffer,avro,thrift一樣,parquet也支援schema演變更新。使用者可以在剛開始的時候建立簡單的schema,然後根據需要随時擴充新的列。

spark sql 用Parquet 資料源支援自動檢測新增列并且會合并schema。

由于合并schema是一個相當耗費性能的操作,而且很多情況下都是不必要的,是以從spark 1.5開始就預設關閉掉該功能。有兩種配置開啟方式:

1.通過資料源option設定mergeSchema為true。

2.在全局sql配置中設定spark.sql.parquet.mergeSchema 為true.

// This is used to implicitly convert an RDD to a DataFrame.

// Create a simple DataFrame, store into a partition directory

val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")

squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,

// adding a new column and dropping an existing column

val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i i i)).toDF("value", "cube")

cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table

val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")

mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together

// with the partitioning column appeared in the partition directory paths

// root

// |-- value: int (nullable = true)

// |-- square: int (nullable = true)

// |-- cube: int (nullable = true)

// |-- key: int (nullable = true)

4hive metastore Parquet表轉換

當讀寫hive metastore parquet格式表的時候,Spark SQL為了較好的性能會使用自己預設的parquet格式而不是采用hive SerDe。該行為是通過參數spark.sql.hive.convertMetastoreParquet空值,預設是true。

5Hive和parquet相容性

從表schema處理角度講hive和parquet有兩個主要的差別

hive是大小寫敏感的,但是parquet不是。

hive會講所有列視為nullable,但是nullability在parquet裡有獨特的意義。

由于上面的原因,在将hive metastore parquet轉化為spark parquet表的時候,需要處理相容一下hive的schema和parquet的schema。相容處理的原則是:

有相同名字的字段必須要有相同的資料類型,忽略nullability。相容處理的字段應該保持parquet側的資料類型,這樣就可以處理到nullability類型了。

相容處理的schema應直接包含在hive中繼資料裡的schema資訊:

任何僅僅出現在parquet schema的字段将會被删除

任何僅僅出現在hive 中繼資料裡的字段将會被視為nullable。

6中繼資料重新整理

Spark SQL為了更好的性能會緩存parquet的中繼資料。當spark 讀取hive表的時候,schema一旦從hive轉化為spark sql的,就會被spark sql緩存,如果此時表的schema被hive或者其他外部工具更新,必須要手動的去重新整理中繼資料,才能保證中繼資料的一緻性。

spark.catalog.refreshTable("my_table")

7配置

parquet的相關的參數可以通過setconf或者set key=value的形式配置。

spark.sql.parquet.binaryAsString 預設值是false。一些parquet生産系統,尤其是impala,hive和老版本的spark sql,不區分binary和string類型。該參數告訴spark 講binary資料當作字元串處理。

spark.sql.parquet.int96AsTimestamp 預設是true。有些parquet生産系統,尤其是parquet和hive,将timestamp翻譯成INT96.該參數會提示Spark SQL講INT96翻譯成timestamp。

spark.sql.parquet.compression.codec 預設是snappy。當寫parquet檔案的時候設定壓縮格式。如果在option或者properties裡配置了compression或者parquet.compression優先級依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支援的配置類型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安裝ZstandardCodec,brotli需要安裝BrotliCodec。

spark.sql.parquet.filterPushdown 預設是true。設定為true代表開啟parquet下推執行優化。

spark.sql.hive.convertMetastoreParquet 預設是true。假如設定為false,spark sql會讀取hive parquet表的時候使用Hive SerDe,替代内置的。

spark.sql.parquet.mergeSchema 預設是false。當設定為true的時候,parquet資料源會合并讀取所有的parquet檔案的schema,否則會從summary檔案或者假如沒有summary檔案的話随機的選一些資料檔案來合并schema。

spark.sql.parquet.writeLegacyFormat 預設是false。如果設定為true 資料會以spark 1.4和更早的版本的格式寫入。比如,decimal類型的值會被以apache parquet的fixed-length byte array格式寫出,該格式是其他系統例如hive,impala等使用的。如果是false,會使用parquet的新版格式。例如,decimals會以int-based格式寫出。如果spark sql要以parquet輸出并且結果會被不支援新格式的其他系統使用的話,需要設定為true。

繼續閱讀