天天看點

《Spark 官方文檔》Spark SQL, DataFrames 以及 Datasets 程式設計指南(四)更新指南參考

spark sql cli是一個很友善的工具,它可以用local mode運作hive metastore service,并且在指令行中執行輸入的查詢。注意spark sql cli目前還不支援和thrift jdbc server通信。

用如下指令,在spark目錄下啟動一個spark sql cli

hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設定。你可以用這個指令檢視完整的選項清單:./bin/spark-sql –help

從spark-1.6.0起,預設thrift server 将運作于多會話并存模式下(multi-session)。這意味着,每個jdbc/odbc連接配接有其獨立的sql配置和臨時函數系統資料庫。table的緩存仍然是公用的。如果你更喜歡老的單會話模式,隻需設定spark.sql.hive.thriftserver.singlesession為true即可。當然,你也可在spark-defaults.conf中設定,或者将其值傳給start-thriftserver.sh –conf(如下):

tungsten引擎現在預設是啟用的,tungsten是通過手動管理記憶體優化執行計劃,同時也優化了表達式求值的代碼生成。這兩個特性都可以通過把spark.sql.tungsten.enabled設為false來禁用。

parquet schema merging預設不啟用。需要啟用的話,設定spark.sql.parquet.mergeschema為true即可

python接口支援用點(.)來通路字段内嵌值,例如df[‘table.column.nestedfield’]。但這也意味着,如果你的字段名包含點号(.)的話,你就必須用重音符來轉義,如:table.`column.with.dots`.nested。

列式存儲記憶體分區剪枝預設是啟用的。要禁用,設定spark.sql.inmemorycolumarstorage.partitionpruning為false即可

不再支援無精度限制的decimal。spark sql現在強制最大精度為38位。對于bigdecimal對象,類型推導将會使用(38,18)精度的decimal類型。如果ddl中沒有指明精度,預設使用的精度是(10,0)

時間戳精确到1us(微秒),而不是1ns(納秒)

在“sql”這個sql變種設定中,浮點數将被解析為decimal。hiveql解析保持不變。

标準sql/dataframe函數均為小寫,例如:sum vs sum。

當推測任務被啟用是,使用directoutputcommitter是不安全的,是以,directoutputcommitter在推測任務啟用時,将被自動禁用,且忽略相關配置。

json資料源不再自動加載其他程式産生的新檔案(例如,不是spark sql插入到dataset中的檔案)。對于一個json的持久化表(如:hive metastore中儲存的表),使用者可以使用refresh table這個sql指令或者hivecontext.refreshtable來把新檔案包括進來。

根據使用者的回報,我們提供了一個新的,更加流暢的api,用于資料讀(sqlcontext.read)寫(dataframe.write),同時老的api(如:sqlcontext.parquetfile, sqlcontext.jsonfile)将被廢棄。

有關sqlcontext.read和dataframe.write的更詳細資訊,請參考api文檔。

根據使用者的回報,我們改變了dataframe.groupby().agg()的預設行為,在傳回的dataframe結果中保留了分組字段。如果你想保持1.3中的行為,設定spark.sql.retaingroupcolumns為false即可。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_16"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_16"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_16"><b>python</b></a>

在spark 1.3中,我們去掉了spark sql的”alpha“标簽,并清理了可用的api。從spark 1.3起,spark sql将對1.x系列二進制相容。這個相容性保證不包括顯式的标注為”unstable(如:developerapi或experimental)“的api。

對于使用者來說,spark sql 1.3最大的改動就是schemardd改名為dataframe。主要原因是,dataframe不再直接由rdd派生,而是通過自己的實作提供rdd的功能。dataframe隻需要調用其rdd方法就能轉成rdd。

在scala中仍然有schemardd,隻不過這是dataframe的一個别名,以便相容一些現有代碼。但仍然建議使用者改用dataframe。java和python使用者就沒這個福利了,他們必須改代碼。

在spark 1.3之前,有單獨的java相容類(javasqlcontext和javaschemardd)及其在scala api中的鏡像。spark 1.3中将java api和scala api統一。兩種語言的使用者都應該使用sqlcontext和dataframe。一般這些類中都會使用兩種語言中都有的類型(如:array取代各語言獨有的集合)。有些情況下,沒有通用的類型(例如:閉包或者maps),将會使用函數重載來解決這個問題。

另外,java特有的類型api被删除了。scala和java使用者都應該用org.apache.spark.sql.types來程式設計描述一個schema。

spark 1.3之前的很多示例代碼,都在開頭用 import sqlcontext._,這行将會導緻所有的sqlcontext的函數都被引入進來。是以,在spark 1.3我們把rdds到dataframes的隐式轉換隔離出來,單獨放到sqlcontext.implicits對象中。使用者現在應該這樣寫:import sqlcontext.implicits._

另外,隐式轉換也支援由product(如:case classes或tuples)組成的rdd,但需要調用一個todf方法,而不是自動轉換。

如果需要使用dsl(被dataframe取代的api)中的方法,使用者之前需要導入dsl(import org.apache.spark.sql.catalyst.dsl), 而現在應該要導入 dataframe api(import org.apache.spark.sql.functions._)

spark 1.3删除了sql包中的datatype類型别名。現在,使用者應該使用 org.apache.spark.sql.types中的類。

注冊udf的函數,不管是dataframe,dsl或者sql中用到的,都被挪到sqlcontext.udf中。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_17"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_17"><b>java</b></a>

python udf注冊保持不變。

在python中使用datatypes,你需要先構造一個對象(如:stringtype()),而不是引用一個單例。

在shark中,預設的reducer個數是1,并且由mapred.reduce.tasks設定。spark sql廢棄了這個屬性,改為 spark.sql.shuffle.partitions, 并且預設200,使用者可通過如下set指令來自定義:

你也可以把這個屬性放到hive-site.xml中來覆寫預設值。

目前,mapred.reduce.tasks屬性仍然能被識别,并且自動轉成spark.sql.shuffle.partitions

shark.cache表屬性已經不存在了,并且以”_cached”結尾命名的表也不再會自動緩存。取而代之的是,cache table和uncache table語句,用以顯式的控制表的緩存:

注意:cache table tbl 現在預設是饑餓模式,而非懶惰模式。再也不需要手動調用其他action來觸發cache了!

從spark-1.2.0開始,spark sql新提供了一個語句,讓使用者自己控制表緩存是否是懶惰模式

以下幾個緩存相關的特性不再支援:

使用者定義分區級别的緩存逐出政策

rdd 重加載

記憶體緩存直接寫入政策

spark sql thrift jdbc server采用了”out of the box”(開箱即用)的設計,使用很友善,并相容已有的hive安裝版本。你不需要修改已有的hive metastore或者改變資料的位置,或者表分區。

spark sql 支援絕大部分hive功能,如:

hive查詢語句:

<code>select</code>

<code>group by</code>

<code>order by</code>

<code>cluster by</code>

<code>sort by</code>

所有的hive操作符:

relational operators (<code>=</code>, <code>⇔</code>, <code>==</code>, <code>&lt;&gt;</code>, <code>&lt;</code>, <code>&gt;</code>, <code>&gt;=</code>, <code>&lt;=</code>, etc)

arithmetic operators (<code>+</code>, <code>-</code>, <code>*</code>, <code>/</code>, <code>%</code>, etc)

logical operators (<code>and</code>, <code>&amp;&amp;</code>, <code>or</code>, <code>||</code>, etc)

complex type constructors

mathematical functions (<code>sign</code>, <code>ln</code>, <code>cos</code>, etc)

string functions (<code>instr</code>, <code>length</code>, <code>printf</code>, etc)

使用者定義函數(udf)

使用者定義聚合函數(udaf)

使用者定義序列化、反序列化(serdes)

視窗函數(window functions)

joins

<code>join</code>

<code>{left|right|full} outer join</code>

<code>left semi join</code>

<code>cross join</code>

unions

查詢子句

<code>select col from ( select a + b as col from t1) t2</code>

采樣

執行計劃詳細(explain)

分區表,包括動态分區插入

視圖

所有hive ddl(data definition language):

<code>create table</code>

<code>create table as select</code>

<code>alter table</code>

絕大部分hive資料類型:

<code>tinyint</code>

<code>smallint</code>

<code>int</code>

<code>bigint</code>

<code>boolean</code>

<code>float</code>

<code>double</code>

<code>string</code>

<code>binary</code>

<code>timestamp</code>

<code>date</code>

<code>array&lt;&gt;</code>

<code>map&lt;&gt;</code>

<code>struct&lt;&gt;</code>

以下是目前不支援的hive特性的清單。多數是不常用的。

不支援的hive常見功能

bucket表:butcket是hive表的一個哈希分區

不支援的hive進階功能

union類操作

去重join

字段統計資訊收集:spark sql不支援同步的字段統計收集

hive輸入、輸出格式

cli檔案格式:對于需要回顯到cli中的結果,spark sql僅支援textoutputformat。

hadoop archive — hadoop歸檔

hive優化

一些比較棘手的hive優化目前還沒有在spark中提供。有一些(如索引)對應spark sql這種記憶體計算模型來說并不重要。另外一些,在spark sql未來的版本中會支援。

塊級别位圖索引和虛拟字段(用來建索引)

自動計算reducer個數(join和groupby算子):目前在spark sql中你需要這樣控制混洗後(post-shuffle)并發程度:”set spark.sql.shuffle.partitions=[num_tasks];”

中繼資料查詢:隻查詢中繼資料的請求,spark sql仍需要啟動任務來計算結果

資料傾斜标志:spark sql不會理會hive中的資料傾斜标志

<code>streamtable</code> join提示:spark sql裡沒有這玩藝兒

傳回結果時合并小檔案:如果傳回的結果有很多小檔案,hive有個選項設定,來合并小檔案,以避免超過hdfs的檔案數額度限制。spark sql不支援這個。

spark sql和dataframes支援如下資料類型:

numeric types(數值類型)

<code>bytetype</code>: 1位元組長的有符号整型,範圍:<code>-128</code> 到 <code>127</code>.

<code>shorttype</code>: 2位元組長有符号整型,範圍:<code>-32768</code> 到 <code>32767</code>.

<code>integertype</code>: 4位元組有符号整型,範圍:<code>-2147483648</code> 到 <code>2147483647</code>.

<code>longtype</code>: 8位元組有符号整型,範圍: <code>-9223372036854775808</code> to <code>9223372036854775807</code>.

<code>floattype</code>: 4位元組單精度浮點數。

<code>doubletype</code>: 8位元組雙精度浮點數

<code>decimaltype</code>: 任意精度有符号帶小數的數值。内部使用java.math.bigdecimal, bigdecimal包含任意精度的不縮放整型,和一個32位的縮放整型

string type(字元串類型)

<code>stringtype</code>: 字元串

binary type(二進制類型)

<code>binarytype</code>: 位元組序列

boolean type(布爾類型)

<code>booleantype</code>: 布爾類型

datetime type(日期類型)

<code>timestamptype</code>: 表示包含年月日、時分秒等字段的日期

<code>datetype</code>: 表示包含年月日字段的日期

complex types(複雜類型)

<code>arraytype(elementtype, containsnull)</code>:數組類型,表達一系列的elementtype類型的元素組成的序列,containsnull表示數組能否包含null值

<code>maptype(keytype, valuetype, valuecontainsnull)</code>:映射集合類型,表示一個鍵值對的集合。鍵的類型是keytype,值的類型則由valuetype指定。對應maptype來說,鍵是不能為null的,而值能否為null則取決于valuecontainsnull。

<code>structtype(fields):</code>表示包含structfield序列的結構體。

structfield(name, datatype, nullable): 表示structtype中的一個字段,name是字段名,datatype是資料類型,nullable表示該字段是否可以為空

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_18"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_18"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_18"><b>python</b></a>

所有spark sql支援的資料類型都在這個包裡:org.apache.spark.sql.types,你可以這樣導入之:

data type

value type in scala

api to access or create a data type

<b>bytetype</b>

byte

bytetype

<b>shorttype</b>

short

shorttype

<b>integertype</b>

int

integertype

<b>longtype</b>

long

longtype

<b>floattype</b>

float

floattype

<b>doubletype</b>

double

doubletype

<b>decimaltype</b>

java.math.bigdecimal

decimaltype

<b>stringtype</b>

string

stringtype

<b>binarytype</b>

array[byte]

binarytype

<b>booleantype</b>

boolean

booleantype

<b>timestamptype</b>

java.sql.timestamp

timestamptype

<b>datetype</b>

java.sql.date

datetype

<b>arraytype</b>

scala.collection.seq

arraytype(elementtype, [containsnull])注意:預設containsnull為true

<b>maptype</b>

scala.collection.map

maptype(keytype, valuetype, [valuecontainsnull])注意:預設valuecontainsnull為true

<b>structtype</b>

org.apache.spark.sql.row

structtype(fields)注意:fields是一個structfields的序列,并且同名的字段是不允許的。

<b>structfield</b>

定義字段的資料對應的scala類型(例如,如果structfield的datatype為integertype,則其資料對應的scala類型為int)

structfield(name, datatype, nullable)

這是not-a-number的縮寫,某些float或double類型不符合标準浮點數語義,需要對其特殊處理:

nan == nan,即:nan和nan總是相等

在聚合函數中,所有nan分到同一組

nan在join操作中可以當做一個普通的join key

nan在升序排序中排到最後,比任何其他數值都大