天天看点

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)升级指南参考

spark sql cli是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询。注意spark sql cli目前还不支持和thrift jdbc server通信。

用如下命令,在spark目录下启动一个spark sql cli

hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置。你可以用这个命令查看完整的选项列表:./bin/spark-sql –help

从spark-1.6.0起,默认thrift server 将运行于多会话并存模式下(multi-session)。这意味着,每个jdbc/odbc连接有其独立的sql配置和临时函数注册表。table的缓存仍然是公用的。如果你更喜欢老的单会话模式,只需设置spark.sql.hive.thriftserver.singlesession为true即可。当然,你也可在spark-defaults.conf中设置,或者将其值传给start-thriftserver.sh –conf(如下):

tungsten引擎现在默认是启用的,tungsten是通过手动管理内存优化执行计划,同时也优化了表达式求值的代码生成。这两个特性都可以通过把spark.sql.tungsten.enabled设为false来禁用。

parquet schema merging默认不启用。需要启用的话,设置spark.sql.parquet.mergeschema为true即可

python接口支持用点(.)来访问字段内嵌值,例如df[‘table.column.nestedfield’]。但这也意味着,如果你的字段名包含点号(.)的话,你就必须用重音符来转义,如:table.`column.with.dots`.nested。

列式存储内存分区剪枝默认是启用的。要禁用,设置spark.sql.inmemorycolumarstorage.partitionpruning为false即可

不再支持无精度限制的decimal。spark sql现在强制最大精度为38位。对于bigdecimal对象,类型推导将会使用(38,18)精度的decimal类型。如果ddl中没有指明精度,默认使用的精度是(10,0)

时间戳精确到1us(微秒),而不是1ns(纳秒)

在“sql”这个sql变种设置中,浮点数将被解析为decimal。hiveql解析保持不变。

标准sql/dataframe函数均为小写,例如:sum vs sum。

当推测任务被启用是,使用directoutputcommitter是不安全的,因此,directoutputcommitter在推测任务启用时,将被自动禁用,且忽略相关配置。

json数据源不再自动加载其他程序产生的新文件(例如,不是spark sql插入到dataset中的文件)。对于一个json的持久化表(如:hive metastore中保存的表),用户可以使用refresh table这个sql命令或者hivecontext.refreshtable来把新文件包括进来。

根据用户的反馈,我们提供了一个新的,更加流畅的api,用于数据读(sqlcontext.read)写(dataframe.write),同时老的api(如:sqlcontext.parquetfile, sqlcontext.jsonfile)将被废弃。

有关sqlcontext.read和dataframe.write的更详细信息,请参考api文档。

根据用户的反馈,我们改变了dataframe.groupby().agg()的默认行为,在返回的dataframe结果中保留了分组字段。如果你想保持1.3中的行为,设置spark.sql.retaingroupcolumns为false即可。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_16"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_16"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_16"><b>python</b></a>

在spark 1.3中,我们去掉了spark sql的”alpha“标签,并清理了可用的api。从spark 1.3起,spark sql将对1.x系列二进制兼容。这个兼容性保证不包括显式的标注为”unstable(如:developerapi或experimental)“的api。

对于用户来说,spark sql 1.3最大的改动就是schemardd改名为dataframe。主要原因是,dataframe不再直接由rdd派生,而是通过自己的实现提供rdd的功能。dataframe只需要调用其rdd方法就能转成rdd。

在scala中仍然有schemardd,只不过这是dataframe的一个别名,以便兼容一些现有代码。但仍然建议用户改用dataframe。java和python用户就没这个福利了,他们必须改代码。

在spark 1.3之前,有单独的java兼容类(javasqlcontext和javaschemardd)及其在scala api中的镜像。spark 1.3中将java api和scala api统一。两种语言的用户都应该使用sqlcontext和dataframe。一般这些类中都会使用两种语言中都有的类型(如:array取代各语言独有的集合)。有些情况下,没有通用的类型(例如:闭包或者maps),将会使用函数重载来解决这个问题。

另外,java特有的类型api被删除了。scala和java用户都应该用org.apache.spark.sql.types来编程描述一个schema。

spark 1.3之前的很多示例代码,都在开头用 import sqlcontext._,这行将会导致所有的sqlcontext的函数都被引入进来。因此,在spark 1.3我们把rdds到dataframes的隐式转换隔离出来,单独放到sqlcontext.implicits对象中。用户现在应该这样写:import sqlcontext.implicits._

另外,隐式转换也支持由product(如:case classes或tuples)组成的rdd,但需要调用一个todf方法,而不是自动转换。

如果需要使用dsl(被dataframe取代的api)中的方法,用户之前需要导入dsl(import org.apache.spark.sql.catalyst.dsl), 而现在应该要导入 dataframe api(import org.apache.spark.sql.functions._)

spark 1.3删除了sql包中的datatype类型别名。现在,用户应该使用 org.apache.spark.sql.types中的类。

注册udf的函数,不管是dataframe,dsl或者sql中用到的,都被挪到sqlcontext.udf中。

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_17"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_17"><b>java</b></a>

python udf注册保持不变。

在python中使用datatypes,你需要先构造一个对象(如:stringtype()),而不是引用一个单例。

在shark中,默认的reducer个数是1,并且由mapred.reduce.tasks设定。spark sql废弃了这个属性,改为 spark.sql.shuffle.partitions, 并且默认200,用户可通过如下set命令来自定义:

你也可以把这个属性放到hive-site.xml中来覆盖默认值。

目前,mapred.reduce.tasks属性仍然能被识别,并且自动转成spark.sql.shuffle.partitions

shark.cache表属性已经不存在了,并且以”_cached”结尾命名的表也不再会自动缓存。取而代之的是,cache table和uncache table语句,用以显式的控制表的缓存:

注意:cache table tbl 现在默认是饥饿模式,而非懒惰模式。再也不需要手动调用其他action来触发cache了!

从spark-1.2.0开始,spark sql新提供了一个语句,让用户自己控制表缓存是否是懒惰模式

以下几个缓存相关的特性不再支持:

用户定义分区级别的缓存逐出策略

rdd 重加载

内存缓存直接写入策略

spark sql thrift jdbc server采用了”out of the box”(开箱即用)的设计,使用很方便,并兼容已有的hive安装版本。你不需要修改已有的hive metastore或者改变数据的位置,或者表分区。

spark sql 支持绝大部分hive功能,如:

hive查询语句:

<code>select</code>

<code>group by</code>

<code>order by</code>

<code>cluster by</code>

<code>sort by</code>

所有的hive操作符:

relational operators (<code>=</code>, <code>⇔</code>, <code>==</code>, <code>&lt;&gt;</code>, <code>&lt;</code>, <code>&gt;</code>, <code>&gt;=</code>, <code>&lt;=</code>, etc)

arithmetic operators (<code>+</code>, <code>-</code>, <code>*</code>, <code>/</code>, <code>%</code>, etc)

logical operators (<code>and</code>, <code>&amp;&amp;</code>, <code>or</code>, <code>||</code>, etc)

complex type constructors

mathematical functions (<code>sign</code>, <code>ln</code>, <code>cos</code>, etc)

string functions (<code>instr</code>, <code>length</code>, <code>printf</code>, etc)

用户定义函数(udf)

用户定义聚合函数(udaf)

用户定义序列化、反序列化(serdes)

窗口函数(window functions)

joins

<code>join</code>

<code>{left|right|full} outer join</code>

<code>left semi join</code>

<code>cross join</code>

unions

查询子句

<code>select col from ( select a + b as col from t1) t2</code>

采样

执行计划详细(explain)

分区表,包括动态分区插入

视图

所有hive ddl(data definition language):

<code>create table</code>

<code>create table as select</code>

<code>alter table</code>

绝大部分hive数据类型:

<code>tinyint</code>

<code>smallint</code>

<code>int</code>

<code>bigint</code>

<code>boolean</code>

<code>float</code>

<code>double</code>

<code>string</code>

<code>binary</code>

<code>timestamp</code>

<code>date</code>

<code>array&lt;&gt;</code>

<code>map&lt;&gt;</code>

<code>struct&lt;&gt;</code>

以下是目前不支持的hive特性的列表。多数是不常用的。

不支持的hive常见功能

bucket表:butcket是hive表的一个哈希分区

不支持的hive高级功能

union类操作

去重join

字段统计信息收集:spark sql不支持同步的字段统计收集

hive输入、输出格式

cli文件格式:对于需要回显到cli中的结果,spark sql仅支持textoutputformat。

hadoop archive — hadoop归档

hive优化

一些比较棘手的hive优化目前还没有在spark中提供。有一些(如索引)对应spark sql这种内存计算模型来说并不重要。另外一些,在spark sql未来的版本中会支持。

块级别位图索引和虚拟字段(用来建索引)

自动计算reducer个数(join和groupby算子):目前在spark sql中你需要这样控制混洗后(post-shuffle)并发程度:”set spark.sql.shuffle.partitions=[num_tasks];”

元数据查询:只查询元数据的请求,spark sql仍需要启动任务来计算结果

数据倾斜标志:spark sql不会理会hive中的数据倾斜标志

<code>streamtable</code> join提示:spark sql里没有这玩艺儿

返回结果时合并小文件:如果返回的结果有很多小文件,hive有个选项设置,来合并小文件,以避免超过hdfs的文件数额度限制。spark sql不支持这个。

spark sql和dataframes支持如下数据类型:

numeric types(数值类型)

<code>bytetype</code>: 1字节长的有符号整型,范围:<code>-128</code> 到 <code>127</code>.

<code>shorttype</code>: 2字节长有符号整型,范围:<code>-32768</code> 到 <code>32767</code>.

<code>integertype</code>: 4字节有符号整型,范围:<code>-2147483648</code> 到 <code>2147483647</code>.

<code>longtype</code>: 8字节有符号整型,范围: <code>-9223372036854775808</code> to <code>9223372036854775807</code>.

<code>floattype</code>: 4字节单精度浮点数。

<code>doubletype</code>: 8字节双精度浮点数

<code>decimaltype</code>: 任意精度有符号带小数的数值。内部使用java.math.bigdecimal, bigdecimal包含任意精度的不缩放整型,和一个32位的缩放整型

string type(字符串类型)

<code>stringtype</code>: 字符串

binary type(二进制类型)

<code>binarytype</code>: 字节序列

boolean type(布尔类型)

<code>booleantype</code>: 布尔类型

datetime type(日期类型)

<code>timestamptype</code>: 表示包含年月日、时分秒等字段的日期

<code>datetype</code>: 表示包含年月日字段的日期

complex types(复杂类型)

<code>arraytype(elementtype, containsnull)</code>:数组类型,表达一系列的elementtype类型的元素组成的序列,containsnull表示数组能否包含null值

<code>maptype(keytype, valuetype, valuecontainsnull)</code>:映射集合类型,表示一个键值对的集合。键的类型是keytype,值的类型则由valuetype指定。对应maptype来说,键是不能为null的,而值能否为null则取决于valuecontainsnull。

<code>structtype(fields):</code>表示包含structfield序列的结构体。

structfield(name, datatype, nullable): 表示structtype中的一个字段,name是字段名,datatype是数据类型,nullable表示该字段是否可以为空

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_scala_18"><b>scala</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_java_18"><b>java</b></a>

<a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#tab_python_18"><b>python</b></a>

所有spark sql支持的数据类型都在这个包里:org.apache.spark.sql.types,你可以这样导入之:

data type

value type in scala

api to access or create a data type

<b>bytetype</b>

byte

bytetype

<b>shorttype</b>

short

shorttype

<b>integertype</b>

int

integertype

<b>longtype</b>

long

longtype

<b>floattype</b>

float

floattype

<b>doubletype</b>

double

doubletype

<b>decimaltype</b>

java.math.bigdecimal

decimaltype

<b>stringtype</b>

string

stringtype

<b>binarytype</b>

array[byte]

binarytype

<b>booleantype</b>

boolean

booleantype

<b>timestamptype</b>

java.sql.timestamp

timestamptype

<b>datetype</b>

java.sql.date

datetype

<b>arraytype</b>

scala.collection.seq

arraytype(elementtype, [containsnull])注意:默认containsnull为true

<b>maptype</b>

scala.collection.map

maptype(keytype, valuetype, [valuecontainsnull])注意:默认valuecontainsnull为true

<b>structtype</b>

org.apache.spark.sql.row

structtype(fields)注意:fields是一个structfields的序列,并且同名的字段是不允许的。

<b>structfield</b>

定义字段的数据对应的scala类型(例如,如果structfield的datatype为integertype,则其数据对应的scala类型为int)

structfield(name, datatype, nullable)

这是not-a-number的缩写,某些float或double类型不符合标准浮点数语义,需要对其特殊处理:

nan == nan,即:nan和nan总是相等

在聚合函数中,所有nan分到同一组

nan在join操作中可以当做一个普通的join key

nan在升序排序中排到最后,比任何其他数值都大