天天看點

《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 程式設計指南(二)資料源

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_6"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_6"><b>java</b></a>

如果不能事先通過case class定義schema(例如,記錄的字段結構是儲存在一個字元串,或者其他文本資料集中,需要先解析,又或者字段對不同使用者有所不同),那麼你可能需要按以下三個步驟,以程式設計方式的建立一個dataframe:

從已有的rdd建立一個包含row對象的rdd

用structtype建立一個schema,和步驟1中建立的rdd的結構相比對

把得到的schema應用于包含row對象的rdd,調用這個方法來實作這一步:sqlcontext.createdataframe

for example:

例如:

spark sql支援基于dataframe操作一系列不同的資料源。dataframe既可以當成一個普通rdd來操作,也可以将其注冊成一個臨時表來查詢。把dataframe注冊為table之後,你就可以基于這個table執行sql語句了。本節将描述加載和儲存資料的一些通用方法,包含了不同的spark資料源,然後深入介紹一下内建資料源可用選項。

在最簡單的情況下,所有操作都會以預設類型資料源來加載資料(預設是parquet,除非修改了spark.sql.sources.default 配置)。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_7"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_7"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_7"><b>python</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_r_7"><b>r</b></a>

你也可以手動指定資料源,并設定一些額外的選項參數。資料源可由其全名指定(如,org.apache.spark.sql.parquet),而對于内建支援的資料源,可以使用簡寫名(json, parquet, jdbc)。任意類型資料源建立的dataframe都可以用下面這種文法轉成其他類型資料格式。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_8"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_8"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_8"><b>python</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_r_8"><b>r</b></a>

spark sql還支援直接對檔案使用sql查詢,不需要用read方法把檔案加載進來。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_9"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_9"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_9"><b>python</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_r_9"><b>r</b></a>

save操作有一個可選參數savemode,用這個參數可以指定如何處理資料已經存在的情況。很重要的一點是,這些儲存模式都沒有加鎖,是以其操作也不是原子性的。另外,如果使用overwrite模式,實際操作是,先删除資料,再寫新資料。

僅scala/java

所有支援的語言

含義

<code>savemode.errorifexists </code>(default)

<code>"error" </code>(default)

(預設模式)從dataframe向資料源儲存資料時,如果資料已經存在,則抛異常。

<code>savemode.append</code>

<code>"append"</code>

如果資料或表已經存在,則将dataframe的資料追加到已有資料的尾部。

<code>savemode.overwrite</code>

<code>"overwrite"</code>

如果資料或表已經存在,則用dataframe資料覆寫之。

<code>savemode.ignore</code>

<code>"ignore"</code>

如果資料已經存在,那就放棄儲存dataframe資料。這和sql裡create table if not exists有點類似。

在使用hivecontext的時候,dataframe可以用saveastable方法,将資料儲存成持久化的表。與registertemptable不同,saveastable會将dataframe的實際資料内容儲存下來,并且在hivemetastore中建立一個遊标指針。持久化的表會一直保留,即使spark程式重新開機也沒有影響,隻要你連接配接到同一個metastore就可以讀取其資料。讀取持久化表時,隻需要用用表名作為參數,調用sqlcontext.table方法即可得到對應dataframe。

預設情況下,saveastable會建立一個”managed table“,也就是說這個表資料的位置是由metastore控制的。同樣,如果删除表,其資料也會同步删除。

仍然使用上面例子中的資料:

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_10"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_10"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_10"><b>python</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_r_10"><b>r</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_sql_10"><b>sql</b></a>

像hive這樣的系統,一個很常用的優化手段就是表分區。在一個支援分區的表中,資料是儲存在不同的目錄中的,并且将分區鍵以編碼方式儲存在各個分區目錄路徑中。parquet資料源現在也支援自動發現和推導分區資訊。例如,我們可以把之前用的人口資料存到一個分區表中,其目錄結構如下所示,其中有2個額外的字段,gender和country,作為分區鍵:

在這個例子中,如果需要讀取parquet檔案資料,我們隻需要把 path/to/table 作為參數傳給 sqlcontext.read.parquet 或者 sqlcontext.read.load。spark sql能夠自動的從路徑中提取出分區資訊,随後傳回的dataframe的schema如下:

注意,分區鍵的資料類型将是自動推導出來的。目前,隻支援數值類型和字元串類型資料作為分區鍵。

有的使用者可能不想要自動推導出來的分區鍵資料類型。這種情況下,你可以通過 spark.sql.sources.partitioncolumntypeinference.enabled (預設是true)來禁用分區鍵類型推導。禁用之後,分區鍵總是被當成字元串類型。

從spark-1.6.0開始,分區發現預設隻在指定目錄的子目錄中進行。以上面的例子來說,如果使用者把 path/to/table/gender=male 作為參數傳給 sqlcontext.read.parquet 或者 sqlcontext.read.load,那麼gender就不會被作為分區鍵。如果使用者想要指定分區發現的基礎目錄,可以通過basepath選項指定。例如,如果把 path/to/table/gender=male作為資料目錄,并且将basepath設為 path/to/table,那麼gender仍然會最為分區鍵。

像protobuffer、avro和thrift一樣,parquet也支援schema演變。使用者從一個簡單的schema開始,逐漸增加所需的新字段。這樣的話,使用者最終會得到多個schema不同但互相相容的parquet檔案。目前,parquet資料源已經支援自動檢測這種情況,并合并所有檔案的schema。

因為schema合并相對代價比較大,并且在多數情況下不是必要的,是以從spark-1.5.0之後,預設是被禁用的。你可以這樣啟用這一功能:

讀取parquet檔案時,将選項mergeschema設為true(見下面的示例代碼)

或者,将全局選項spark.sql.parquet.mergeschema設為true

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_11"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_11"><b>python</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_r_11"><b>r</b></a>

在讀寫hive metastore parquet 表時,spark sql用的是内部的parquet支援庫,而不是hive serde,因為這樣性能更好。這一行為是由spark.sql.hive.convertmetastoreparquet 配置項來控制的,而且預設是啟用的。

hive和parquet在表結構處理上主要有2個不同點:

hive大小寫敏感,而parquet不是

hive所有字段都是nullable的,而parquet需要顯示設定

由于以上原因,我們必須在hive metastore parquet table轉spark sql parquet table的時候,對hive metastore schema做調整,調整規則如下:

兩種schema中字段名和字段類型必須一緻(不考慮nullable)。調和後的字段類型必須在parquet格式中有相對應的資料類型,是以nullable是也是需要考慮的。

調和後spark sql parquet table schema将包含以下字段:

隻出現在parquet schema中的字段将被丢棄

隻出現在hive metastore schema中的字段将被添加進來,并顯式地設為nullable。

spark sql會緩存parquet中繼資料以提高性能。如果hive metastore parquet table轉換被啟用的話,那麼轉換過來的schema也會被緩存。這時候,如果這些表由hive或其他外部工具更新了,你必須手動重新整理中繼資料。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_12"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_12"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_12"><b>python</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_sql_12"><b>sql</b></a>

parquet配置可以通過 sqlcontext.setconf 或者 sql語句中 set key=value來指定。

屬性名

預設值

<code>spark.sql.parquet.binaryasstring</code>

false

有些老系統,如:特定版本的impala,hive,或者老版本的spark sql,不區分二進制資料和字元串類型資料。這個标志的意思是,讓spark sql把二進制資料當字元串處理,以相容老系統。

<code>spark.sql.parquet.int96astimestamp</code>

true

有些老系統,如:特定版本的impala,hive,把時間戳存成int96。這個配置的作用是,讓spark sql把這些int96解釋為timestamp,以相容老系統。

<code>spark.sql.parquet.cachemetadata</code>

緩存parquet schema中繼資料。可以提升查詢靜态資料的速度。

<code>spark.sql.parquet.compression.codec</code>

gzip

設定parquet檔案的壓縮編碼格式。可接受的值有:uncompressed, snappy, gzip(預設), lzo

<code>spark.sql.parquet.filterpushdown</code>

啟用過濾器下推優化,可以講過濾條件盡量推導最下層,已取得性能提升

<code>spark.sql.hive.convertmetastoreparquet</code>

如果禁用,spark sql将使用hive serde,而不是内建的對parquet tables的支援

<code>spark.sql.parquet.output.committer.class</code>

<code>org.apache.parquet.hadoop. parquetoutputcommitter</code>

parquet使用的資料輸出類。這個類必須是 org.apache.hadoop.mapreduce.outputcommitter的子類。一般來說,它也應該是 org.apache.parquet.hadoop.parquetoutputcommitter的子類。注意:1. 如果啟用spark.speculation, 這個選項将被自動忽略

2. 這個選項必須用hadoop configuration設定,而不是spark sqlconf

3. 這個選項會覆寫 spark.sql.sources.outputcommitterclass

spark sql有一個内建的org.apache.spark.sql.parquet.directparquetoutputcommitter, 這個類的在輸出到s3的時候比預設的parquetoutputcommitter類效率高。

<code>spark.sql.parquet.mergeschema</code>

<code>false</code>

如果設為true,那麼parquet資料源将會merge 所有資料檔案的schema,否則,schema是從summary file擷取的(如果summary file沒有設定,則随機選一個)