Parquet是一種列式存儲格式,很多種處理引擎都支援這種存儲格式,也是sparksql的預設存儲格式。Spark SQL支援靈活的讀和寫Parquet檔案,并且對parquet檔案的schema可以自動解析。當Spark SQL需要寫成Parquet檔案時,處于相容的原因所有的列都被自動轉化為了nullable。
1讀寫Parquet檔案
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// |Name: Justin|
2分區發現
分區表時很多系統支援的,比如hive,對于一個分區表,往往是采用表中的某一或多個列去作為分區的依據,分區是以檔案目錄的形式展現。所有内置的檔案源(Text/CSV/JSON/ORC/Parquet)都支援自動的發現和推測分區資訊。例如,我們想取兩個分區列,gender和country,先按照性别分區,再按照國家分區:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...SparkSession.read.parquet 或者 SparkSession.read.load讀取的目錄為path/to/table的時候,會自動從路徑下抽取分區資訊。傳回DataFrame的表結構為:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)細細分析一下你也會發現分區列的資料類型也是自動推斷的。目前支援的資料類型有,數字類型,date,timestamp和string類型。有時候使用者可能不希望自動推斷分區列的類型,這時候隻需要将spark.sql.sources.partitionColumnTypeInference.enabled配置為false即可。如果分區列的類型推斷這個
遊戲拍賣參數設定為了false,那麼分區列的類型會被認為是string。
從spark 1.6開始,分區發現預設情況隻會發現給定路徑下的分區。比如,上面的分區表,假如你講路徑path/to/table/gender=male傳遞給SparkSession.read.parquet 或者 SparkSession.read.load 那麼gender不會被認為是分區列。如果想檢測到該分區,傳給spark的路徑應該是其父路徑也即是path/to/table/,這樣gender就會被認為是分區列。
3schema合并
跟protocol buffer,avro,thrift一樣,parquet也支援schema演變更新。使用者可以在剛開始的時候建立簡單的schema,然後根據需要随時擴充新的列。
spark sql 用Parquet 資料源支援自動檢測新增列并且會合并schema。
由于合并schema是一個相當耗費性能的操作,而且很多情況下都是不必要的,是以從spark 1.5開始就預設關閉掉該功能。有兩種配置開啟方式:
1.通過資料源option設定mergeSchema為true。
2.在全局sql配置中設定spark.sql.parquet.mergeSchema 為true.
// This is used to implicitly convert an RDD to a DataFrame.
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i i i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
4hive metastore Parquet表轉換
當讀寫hive metastore parquet格式表的時候,Spark SQL為了較好的性能會使用自己預設的parquet格式而不是采用hive SerDe。該行為是通過參數spark.sql.hive.convertMetastoreParquet空值,預設是true。
5Hive和parquet相容性
從表schema處理角度講hive和parquet有兩個主要的差別
hive是大小寫敏感的,但是parquet不是。
hive會講所有列視為nullable,但是nullability在parquet裡有獨特的意義。
由于上面的原因,在将hive metastore parquet轉化為spark parquet表的時候,需要處理相容一下hive的schema和parquet的schema。相容處理的原則是:
有相同名字的字段必須要有相同的資料類型,忽略nullability。相容處理的字段應該保持parquet側的資料類型,這樣就可以處理到nullability類型了。
相容處理的schema應直接包含在hive中繼資料裡的schema資訊:
任何僅僅出現在parquet schema的字段将會被删除
任何僅僅出現在hive 中繼資料裡的字段将會被視為nullable。
6中繼資料重新整理
Spark SQL為了更好的性能會緩存parquet的中繼資料。當spark 讀取hive表的時候,schema一旦從hive轉化為spark sql的,就會被spark sql緩存,如果此時表的schema被hive或者其他外部工具更新,必須要手動的去重新整理中繼資料,才能保證中繼資料的一緻性。
spark.catalog.refreshTable("my_table")
7配置
parquet的相關的參數可以通過setconf或者set key=value的形式配置。
spark.sql.parquet.binaryAsString 預設值是false。一些parquet生産系統,尤其是impala,hive和老版本的spark sql,不區分binary和string類型。該參數告訴spark 講binary資料當作字元串處理。
spark.sql.parquet.int96AsTimestamp 預設是true。有些parquet生産系統,尤其是parquet和hive,将timestamp翻譯成INT96.該參數會提示Spark SQL講INT96翻譯成timestamp。
spark.sql.parquet.compression.codec 預設是snappy。當寫parquet檔案的時候設定壓縮格式。如果在option或者properties裡配置了compression或者parquet.compression優先級依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支援的配置類型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安裝ZstandardCodec,brotli需要安裝BrotliCodec。
spark.sql.parquet.filterPushdown 預設是true。設定為true代表開啟parquet下推執行優化。
spark.sql.hive.convertMetastoreParquet 預設是true。假如設定為false,spark sql會讀取hive parquet表的時候使用Hive SerDe,替代内置的。
spark.sql.parquet.mergeSchema 預設是false。當設定為true的時候,parquet資料源會合并讀取所有的parquet檔案的schema,否則會從summary檔案或者假如沒有summary檔案的話随機的選一些資料檔案來合并schema。
spark.sql.parquet.writeLegacyFormat 預設是false。如果設定為true 資料會以spark 1.4和更早的版本的格式寫入。比如,decimal類型的值會被以apache parquet的fixed-length byte array格式寫出,該格式是其他系統例如hive,impala等使用的。如果是false,會使用parquet的新版格式。例如,decimals會以int-based格式寫出。如果spark sql要以parquet輸出并且結果會被不支援新格式的其他系統使用的話,需要設定為true。