2020年6月18日,开发了近两年(自2018年10月份至今)的Apache Spark 3.0.0 正式发布!
Apache Spark 3.0.0版本包含3400多个补丁,是开源社区做出巨大贡献的结晶,在Python和SQL功能方面带来了重大进展并且将重点聚焦在了开发和生产的易用性上。同时,今年也是Spark开源10周年,这些举措反映了Spark自开源以来,是如何不断的满足更广泛的受众需求以及更多的应用场景。
首先来看一下Apache Spark 3.0.0主要的新特性:
- 在TPC-DS基准测试中,通过启用自适应查询执行、动态分区裁剪等其他优化措施,相比于Spark 2.4,性能提升了2倍
- 兼容ANSI SQL
- 对pandas API的重大改进,包括python类型hints及其他的pandas UDFs
- 简化了Pyspark异常,更好的处理Python error
- structured streaming的新UI
- 在调用R语言的UDF方面,速度提升了40倍
- 超过3400个Jira问题被解决,这些问题在Spark各个核心组件中分布情况如下图:
此外,采用Spark3.0版本,主要代码并没有发生改变。
改进的Spark SQL引擎
Spark SQL是支持大多数Spark应用的引擎。例如,在Databricks,超过 90%的Spark API调用使用了DataFrame、Dataset和SQL API及通过SQL优化器优化的其他lib包。这意味着即使是Python和Scala开发人员也通过Spark SQL引擎处理他们的大部分工作。
如下图所示,Spark3.0在整个runtime,性能表现大概是Spark2.4的2倍:
接下来,我们将介绍Spark SQL引擎的新特性。
即使由于缺乏或者不准确的数据统计信息和对成本的错误估算导致生成的初始计划不理想,但是自适应查询执行(Adaptive Query Execution)通过在运行时对查询执行计划进行优化,允许Spark Planner在运行时执行可选的执行计划,这些计划将基于运行时统计数据进行优化,从而提升性能。
由于Spark数据存储和计算是分离的,因此无法预测数据的到达。基于这些原因,对于Spark来说,在运行时自适应显得尤为重要。AQE目前提供了三个主要的自适应优化:
- 动态合并shuffle partitions
可以简化甚至避免调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区合并为较大的分区。
- 动态调整join策略
在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行次优计划的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能
- 动态优化倾斜的join(skew joins)
skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更好的整体性能。
基于3TB的TPC-DS基准测试中,与不使用AQE相比,使用AQE的Spark将两个查询的性能提升了1.5倍以上,对于另外37个查询的性能提升超过了1.1倍。
动态分区裁剪
当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区裁剪。这在星型模型中很常见,星型模型是由一个或多个并且引用了任意数量的维度表的事实表组成。在这种连接操作中,我们可以通过识别维度表过滤之后的分区来裁剪从事实表中读取的分区。在一个TPC-DS基准测试中,102个查询中有60个查询获得2到18倍的速度提升。
更多动态分区裁剪介绍可参考:https://databricks.com/session_eu19/dynamic-partition-pruning-in-apache-spark#:~:text=Dynamic%20partition%20pruning%20occurs%20when,any%20number%20of%20dimension%20tables
ANSI SQL兼容性
对于将工作负载从其他SQL引擎迁移到Spark SQL来说至关重要。为了提升兼容性,该版本采用Proleptic Gregorian日历,用户可以禁止使用ANSI SQL的保留关键字作为标识符。此外,在数字类型的操作中,引入运行时溢出检查,并在将数据插入具有预定义schema的表时引入了编译时类型强制检查,这些新的校验机制提高了数据的质量。更多ASNI兼容性介绍,可参考:https://spark.apache.org/docs/3.0.0/sql-ref-ansi-compliance.html
Join hints
尽管社区一直在改进编译器,但仍然不能保证编译器可以在任何场景下做出最优决策——join算法的选择是基于统计和启发式算法。当编译器无法做出最佳选择时,用户可以使用join hints来影响优化器以便让它选择更好的计划。
Apache Spark 3.0对已存在的join hints进行扩展,主要是通过添加新的hints方式来进行的,包括:
SHUFFLE_MERGE、SHUFFLE_HASH和SHUFFLE_REPLICATE_NL。
增强的Python API:PySpark和Koalas
Python现在是Spark中使用较为广泛的编程语言,因此也是Spark 3.0的重点关注领域。Databricks有68%的notebook命令是用Python写的。PySpark在 Python Package Index上的月下载量超过 500 万。
很多Python开发人员在数据结构和数据分析方面使用pandas API,但仅限于单节点处理。Databricks会持续开发Koalas——基于Apache Spark的pandas API实现,让数据科学家能够在分布式环境中更高效地处理大数据。
通过使用Koalas,在PySpark中,数据科学家们就不需要构建很多函数(例如,绘图支持),从而在整个集群中获得更高性能。
经过一年多的开发,Koalas实现对pandas API将近80%的覆盖率。Koalas每月PyPI下载量已迅速增长到85万,并以每两周一次的发布节奏快速演进。虽然Koalas可能是从单节点pandas代码迁移的最简单方法,但很多人仍在使用PySpark API,也意味着PySpark API也越来越受欢迎。
Spark 3.0为PySpark API做了多个增强功能:
-
带有类型提示的新pandas API
pandas UDF最初是在Spark 2.3中引入的,用于扩展PySpark中的用户定义函数,并将pandas API集成到PySpark应用中。但是,随着UDF类型的增多,现有接口就变得难以理解。该版本引入了一个新的pandas UDF接口,利用Python的类型提示来解决pandas UDF类型激增的问题。新接口变得更具Python风格化和自我描述性。
-
新的pandas UDF类型和pandas函数API
该版本增加了两种新的pandas UDF类型,即系列迭代器到系列迭代器和多个系列迭代器到系列迭代器。这对于数据预取和昂贵的初始化操作来说非常有用。此外,该版本还添加了两个新的pandas函数API,map和co-grouped map。更多详细信息请参考:https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html。
-
更好的错误处理
对于Python用户来说,PySpark的错误处理并不友好。该版本简化了PySpark异常,隐藏了不必要的JVM堆栈跟踪信息,并更具Python风格化。
改进Spark中的Python支持和可用性仍然是我们最优先考虑的问题之一。
Hydrogen、流和可扩展性
Spark 3.0完成了Hydrogen项目的关键组件,并引入了新功能来改善流和可扩展性。
-
加速器感知调度
Hydrogen项目旨在更好地统一基于Spark的深度学习和数据处理。GPU和其他加速器已经被广泛用于加速深度学习工作负载。为了使Spark能够利用目标平台上的硬件加速器,该版本增强了已有的调度程序,使集群管理器可以感知到加速器。用户可以通过配置来指定加速器(详细配置介绍可参考:https://spark.apache.org/docs/3.0.0/configuration.html#custom-resource-scheduling-and-configuration-overview)。然后,用户可以调用新的RDD API来利用这些加速器。
-
结构化流的新UI
结构化流最初是在Spark 2.0中引入的。在Databricks,使用量同比增长4倍后,每天使用结构化流处理的记录超过了5万亿条。
Apache Spark添加了一个专门的新Spark UI用于查看流jobs。新UI提供了两组统计信息:
- 流查询作业已完成的聚合信息
- 流查询的详细统计信息,包括Input Rate, Process Rate, Input Rows, Batch Duration, Operation Duration等
- 可观察的指标
持续监控数据质量变化是管理数据管道的一种重要功能。Spark 3.0引入了对批处理和流应用程序的功能监控。可观察的指标是可以在查询上定义的聚合函数(DataFrame)。一旦DataFrame执行达到一个完成点(如,完成批查询)后会发出一个事件,该事件包含了自上一个完成点以来处理的数据的指标信息。
- 新的目录插件API
现有的数据源API缺乏访问和操作外部数据源元数据的能力。新版本增强了数据源V2 API,并引入了新的目录插件API。对于同时实现了目录插件API和数据源V2 API的外部数据源,用户可以通过标识符直接操作外部表的数据和元数据(在相应的外部目录注册了之后)。
Spark 3.0的其他更新
Spark 3.0是社区的一个重要版本,解决了超过3400个Jira问题,这是440多个contributors共同努力的结果,这些contributors包括个人以及来自Databricks、谷歌、微软、英特尔、IBM、阿里巴巴、Facebook、英伟达、Netflix、Adobe等公司的员工。
在这篇博文中,我们重点介绍了Spark在SQL、Python和流技术方面的关键改进。
除此之外,作为里程碑的Spark 3.0版本还有很多其他改进功能在这里没有介绍。详情可以查阅版本发行说明:https://spark.apache.org/releases/spark-release-3-0-0.html。发行文档中提供了更多详尽的本次版本的改进信息,包括数据源、生态系统、监控等。
最后,热烈祝贺Spark开源发展10周年!
Spark诞生于UC Berkeley’s AMPlab,该实验室致力于数据密集型计算的研究。AMPLab研究人员与大型互联网公司合作,致力于解决数据和AI问题。但是他们发现,对于那些那些拥有海量数据并且数据不断增长的公司同样面临类似的问题需要解决。于是,该团队研发了一个新引擎来处理这些新兴的工作负载,同时使处理数据的APIs,对于开发人员更方便使用。
社区很快将Spark扩展到不同领域,在流、Python和SQL方面提供了新功能,并且这些模式现在已经构成了Spark的一些主要用例。作为数据处理、数据科学、机器学习和数据分析工作负载事实上的引擎,持续不断的投入成就了Spark的今天。Apache Spark 3.0通过对SQL和Python(如今使用Spark的两种最广泛的语言)支持的显著改进,以及对性能、可操作性等方面的优化,延续了这种趋势。
本文主要参考自Databricks博客和Apache Spark官网,包括不局限于以下文章:
1.https://databricks.com/blog/2020/06/18/introducing-apache-spark-3-0-now-available-in-databricks-runtime-7-0.html
2.https://spark.apache.org/releases/spark-release-3-0-0.html
关于Apache SparkTM 3.0.0重要特性更详尽的介绍,除了文中内容,也可参考来自Databricks的其他技术博客:
- Adaptive Query Execution blog
https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
- Pandas UDFs and Python Type Hints blog
https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html