天天看点

Spark权威指南(中文版)----第2章 Spark简介

Spark The Definitive Guide(Spark权威指南) 中文版。本书详细介绍了Spark2.x版本的各个模块,目前市面上最好的Spark2.x学习书籍!!!

扫码关注公众号:登峰大数据,阅读中文Spark权威指南(完整版),系统学习Spark大数据框架!

Spark权威指南(中文版)----第2章 Spark简介
上一张我们学习了Apache Spark是什么,现在是开始使用和应用它的时候了! 本章对Spark进行了一个简要的介绍,其中我们将介绍集群的核心架构、Spark应用程序和使用DataFrames和SQL的Spark的结构化api。在此过程中,我们将接触Spark的核心术语和概念,以便您可以立即使用Spark。让我们从一些基本的背景信息开始。

2.1.  Spark的基本架构

通常,当你想到一台“计算机”时,你会想到一台机器在你家里或工作的桌子上。这台机器非常适合看电影或使用电子表格软件。但是,许多用户可能在某个时间点上体验过,有些东西是您的计算机没有足够的能力来运行的。一个特别具有挑战性的领域是数据处理。单个机器没有足够的能力和资源来执行大量的信息(或者用户可能没有时间等待计算完成)。计算机的集群或组,将许多计算机的资源集合在一起,使我们能够像使用一台计算机一样使用所有的累积资源。现在,一组机器本身并不强大,您需要一个框架来协调它们之间的工作。Spark就是这样做的,它管理和协调跨集群计算机上的数据执行任务。

Spark将用于执行任务的集群交由集群管理器管理,如Spark Standalone集群管理器、YARN集群管理器、Mesos集群管理器。然后,我们向这些集群管理器提交Spark应用程序,它将为我们的应用程序提供资源,以便我们能够完成我们的工作。

2.1.1.  Spark应用程序

Spark应用程序由一个driver进程(驱动程序)和一组Executor进程组成。driver进程负责运行你的main函数,此进程位于集群中的一个节点上。负责三件事:维护有关Spark应用程序的信息; 响应用户的程序或输入; 分析、分配和调度executor的工作(稍后讨论)。driver驱动程序是非常重要的。它是Spark应用程序的核心,并在应用程序的生命周期内维护所有相关信息。

executors进程实际执行driver分配给他们的工作。这意味着每个executor只负责两件事: 执行由驱动程序分配给它的代码,并将执行器executor的计算状态报告给驱动(driver)节点。

图2-1演示了集群管理器如何控制物理机器并分配资源给spark应用程序。这可以是三个核心集群管理器之一: Spark  standalone cluster manager, YARN, 或者 Mesos.。这意味着可以同时在集群上运行多个Spark应用程序。我们将在第四部分讨论集群管理器。

Spark权威指南(中文版)----第2章 Spark简介

在图2-1中,我们可以看到左边的驱动程序和右边的四个执行器。在这个图中,我们删除了集群节点的概念。用户可以通过配置指定每个节点上有多少个执行器。

                                         注意
Spark除了集群cluster模式之外,还具有本地local模式。driver驱动程序和executor执行器是简单的进程,这意味着它们可以在同一台机器或不同的机器上运行。在本地local模式中,驱动程序和执行程序(作为线程)在您的个人计算机上运行,而不是集群。我们写这本书时考虑了本地模式,所以你应该能够在一台机器上运行所有的东西。      

以下是当前需要掌握的理解Spark应用程序的要点:

  • Spark使用一个集群管理器来跟踪可用的资源。
  • driver驱动程序负责执行驱动程序的命令,在executor执行器中完成给定的任务。

在大多数情况下,executor执行器将始终运行Spark代码。但是,通过Spark的语言api,驱动程序可以由许多不同的语言“驱动”。让我们来看看下一节的内容。

2.2.  Spark语言API

Spark的语言api使您可以使用各种编程语言运行Spark代码。在很大程度上,Spark在每种语言中都呈现了一些核心的“概念”;然后将这些概念转换成在机器集群上运行的Spark代码。如果只使用结构化api,那么所有语言都具有类似的性能特征。这里有一个简短的纲要:

Scala

Spark主要是用Scala编写的,这使它成为Spark的“默认”语言。本书将包括Scala代码示例。

Java

尽管Spark是用Scala编写的,Spark的作者们还是小心翼翼地确保您可以在Java中编写Spark代码。本书将主要关注Scala,但将提供与之相关的Java示例。

Python

几乎所有的特性都支持Python语言。只要我们包含Scala代码示例和Python API,本书就会包含Python代码示例。

R语言

Spark有两个常用的R库。一个作为Spark core (SparkR)的一部分,另一个作为R社区驱动的包(sparklyr)。我们在第32章中讨论了这两个库的集成。

图2-2给出了这种关系的简单说明。

Spark权威指南(中文版)----第2章 Spark简介

每个语言API都维护我们前面描述的相同的核心概念。用户可以使用SparkSession对象,这是运行Spark代码的入口点。当使用来自Python或R的Spark时,您不会编写显式的JVM指令; 相反,您编写的Python和R代码可以将Spark转换为代码,然后可以在executor jvm上运行。

Spark’s APIs

尽管您可以从各种语言中驱动Spark,但它在这些语言中提供的功能还是值得一提的。Spark有两个基本的api集: low-level的“非结构化”api,以及higher-level高级的结构化api。我们将在本书中讨论这两个问题,但是这些介绍性的章节将主要讨论更高层次的结构化api。

2.3.  启动Spark

到目前为止,我们讨论了Spark应用程序的基本概念。这在本质上都是概念性的。当我们真正着手编写Spark应用程序时,我们需要一种方法来将用户命令和数据发送给spark,让其为我们计算结果。我们首先创建一个SparkSession。

                                        注意
为此,我们将启动Spark的本地模式,就像我们在第1章中所做的那样。这意味着运行./bin/spark-shell访问Scala控制台以启动交互式会话。您还可以使用./bin/pyspark启动Python控制台。这将启动一个交互式Spark应用程序。还有一个方式,可以提交独立的应用程序到Spark,称为Spark -submit,这样您就可以提交一个预编译的应用程序到spark集群。我们会在第三章中告诉你们怎么做。      

当您在此交互模式中启动Spark时,您将隐式地创建一个SparkSession来管理Spark应用程序。当您通过一个独立的应用程序启动它时,您必须在应用程序代码中创建SparkSession对象。

2.4.  SparkSession

正如本章开头所讨论的,您通过一个名为SparkSession的驱动程序控制您的Spark应用程序。SparkSession实例是Spark在集群中执行用户定义操作的方式。在Scala和Python中,当您启动控制台时,SparkSession被实例化为spark变量,可以直接使用。让我们来看看Scala和/或Python中的SparkSession:

在刚启动的Scala控制台中输入spark,您应该看到如下内容:

res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@...           

在刚启动的Python控制台中输入spark,您应该看到如下内容:

<pyspark.sql.session.SparkSession at 0x7efda4c1ccd0>           

现在让我们执行创建一系列数字的简单任务。这一系列数字就像电子表格中的一个命名列:

// in Scala              val myRange = spark.range(1000).toDF("number")              # in Python              myRange = spark.range(1000).toDF("number")           

你刚刚运行了你的第一行spark代码! 我们创建了一个DataFrame,其中一个列包含1000行,值从0到999。这一系列数字代表一个分布式集合。当在一个集群上运行时,这个范围的每个部分都存在于一个不同的executor上。这是一个Spark DataFrame。

2.5.  DataFrames

DataFrame是最常见的结构化API,它只是表示包含行和列的数据表。定义列和列类型的列表称为schema(模式)。您可以将DataFrame看作是带有指定列的电子表格。

图2-3说明了基本的区别: 位于一台计算机上的电子表格,存在一个特定位置上。而Spark DataFrame可以跨越数千台计算机。把数据放在一台以上电脑上的原因应该是直观的: 要么是数据太大而无法安装在一台机器上,要么就是花费太长时间在一台机器上执行计算。

Spark权威指南(中文版)----第2章 Spark简介

DataFrame的概念并不是Spark特有的。R和Python都有类似的概念。然而,Python/R DataFrames(有一些例外)存在于一台机器上,而不是多台机器上。这限制了给定的DataFrame只能使用某一台特定机器上存在的资源。但是,因为Spark具有Python和R的语言接口。很容易将Pandas(Python)的DataFrames、R DataFrames转换为Spark DataFrames。

                                       注意
Spark有几个核心抽象: Datasets、DataFrames、SQL表和弹性分布式数据集(RDDs)。这些不同的抽象都表示数据的分布式集合。最简单和最有效的是DataFrames,它在所有语言中都可用。我们在第二部分的末尾学习dataset,在第三部分中学习RDDs。      
2.5.1.  Partitions

为了使每个executor执行器并行执行任务,Spark将数据分解成块,这些数据块称为partition(分区)。一个分区是集群中的一个物理机器上的行集合。DataFrame的分区表示了在执行过程中数据是如何在机器集群中物理分布的。如果您有一个分区,Spark将只有一个并行任务,即使您有数千个Executor执行器。如果您有多个分区,但只有一个Executor执行器,Spark仍然只有一个并行任务,因为只有一个计算资源。

需要注意的一件重要的事情是,对于DataFrames,您不(大多数情况下)手动或单独操作分区。您只需在物理分区中指定数据的高级转换,Spark将确定该工作将如何在集群上执行。底层api确实存在(通过RDD接口),我们将在第3部分中介绍这些api。

2.6.  Transformations

在Spark中,核心数据结构是不可变的,这意味着它们在创建之后无法更改。乍一看,这似乎是个奇怪的概念: 如果你不能改变它,你应该如何使用它? 要“更改”一个DataFrame,您需要指导Spark如何修改它以实现您想要的功能。这些指导指令称为Transformations转换。让我们执行一个简单的转换,以在当前的DataFrame中找到所有偶数:

// in Scala              val divisBy2 = myRange.where("number % 2 = 0")              # in Python              divisBy2 = myRange.where("number % 2 = 0")           

注意,这些返回没有输出。这是因为我们只声明了一个抽象转换where。Spark将不会对转换进行操作,直到我们调用一个action操作(我们将在稍后讨论它)。Transformations转换是使用Spark表达业务逻辑的核心方法。有两种类型的转换:窄依赖的转换、宽依赖的转换。

窄依赖的转换是每个输入数据分区只对一个数据输出分区。在前面的代码片段中,where语句指定了一个窄依赖。其中一个输入分区最多一个输出分区,如图2-4所示:

Spark权威指南(中文版)----第2章 Spark简介

宽依赖的转换,一个输入数据分区对应多个输出分区。您经常会听到shuffle这样的说法,即Spark将在集群中交换分区中的数据。窄依赖转换,Spark将自动执行称为流水线pipeline的操作。这意味着如果我们在DataFrames上指定多个过滤器。它们都将在内存中执行,不会产生shuffle。当我们进行shuffle洗牌时,Spark将结果写入磁盘。图2-5显示宽依赖转换:

您在网络上会看到许多关于shuffle优化的讨论,因为这是一个重要的主题,但是现在,您需要了解的是,有两种类型的转换。现在,您可以看到转换是如何简单地指定不同的数据操作。这就引出了一个叫Lazy Evaluation延迟计算的话题。

2.6.1.  Lazy Evaluation

延迟evaulation意味着Spark将等到最后一刻才执行计算一些列指令。在Spark中,不会在执行某个转换操作时立即修改数据,spark会构建了一个您想要应用于您的源数据的转换计划。通过等待直到最后一分钟执行代码,Spark将这个计划从原始的DataFrame转换到一个流线型的物理计划,该计划将在整个集群中尽可能高效地运行。这提供了巨大的好处,因为Spark可以从端到端优化整个数据流。其中一个例子是DataFrames上的谓词下推pushdown。如果我们构建一个大型的Spark作业,但在最后指定一个过滤器,只需要我们从源数据中获取一行。最有效的执行方式是访问我们需要的单个记录。Spark实际上是通过自动将过滤器向下推来优化它的。

2.7.  Actions

transformation转换允许我们构建逻辑转换计划。为了触发计算,我们运行一个action操作。action操作指示Spark通过执行一系列transformation转换计算结果。最简单的action操作是count,它给出了DataFrame中记录的总数:

divisBy2.count()           

前面代码的输出应该是500。然而,count并不是唯一的action操作。有三种类型的action:

  • 在控制台中查看数据的action
  • 数据收集的action操作。
  • 输出到第三方存储系统的action操作。

在指定这个count操作时,我们启动了一个Spark job,运行我们的过滤器filter转换(一个窄依赖转换),然后是一个聚合(一个宽依赖转换),它在每个分区基础上执行计数,然后是一个收集action,它将我们的结果driver端。通过检查Spark UI,您可以看到所有这一切。Spark UI是一个包含在Spark中的工具,您可以通过它监控在集群上运行的Spark作业。

2.8.  Spark UI

您可以通过Spark web UI监控作业的进度。Spark UI在driver节点的4040端口上可用。如果在本地模式下运行,则将是http://localhost:4040。Spark UI显示关于您的Spark作业状态、环境和集群状态的信息。它非常有用,特别是对于调优和调试。图2-6展示了一个Spark job的示例UI,其中执行了包含9个任务的两个阶段。

Spark权威指南(中文版)----第2章 Spark简介

本章将不会详细讨论Spark作业执行和Spark UI。我们将在第18章讨论这个问题。此时,您需要了解的是,Spark job作业表示由单个action触发的一组转换,您可以从Spark UI监控该作业。

2.9.  一个相对完整的例子

在前面的示例中,我们创建了一个包含一系列数字的DataFrame;这并不是什么突破性的大数据。在这一节中,我们将用一个更现实的例子来巩固我们在本章中所学到的所有内容,并逐步解释在幕后发生的事情。我们将用Spark来分析美国运输统计局的一些飞行数据。

在CSV文件夹中,您将看到我们有许多文件。还有一些其他包含不同文件格式的文件夹,我们将在第9章中讨论。现在,让我们关注CSV文件。

每个文件都有许多行数据。这些文件都是CSV文件,这意味着它们是一种半结构化的数据格式,文件中的每一行表示将来的DataFrame中的一行:

$ head /data/flight-data/csv/2015-summary.csv              DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count              United States,Romania,15              United States,Croatia,1              United States,Ireland,344           

Spark具有从大量数据源中读写数据的能力。要读取这些数据,我们将使用与我们的SparkSession关联的DataFrameReader对象。在这样做时,我们将指定文件格式以及我们想要指定的任何选项。在我们的例子中,我们想做一个叫做schema inference(模式推理)的东西,这意味着我们希望Spark能够对DataFrame的schema(模式)进行最好的猜测。我们还希望指定第一行是文件的头,因此我们也将指定它作为一个选项。

为了获得模式信息,Spark会读取一些数据,然后根据Spark中可用的类型尝试解析这些行中的类型。在读取数据时,您还可以选择严格地指定模式(在生产场景中,我们建议这样做):

// in Scala              val flightData2015 = spark              .read              .option("inferSchema", "true")              .option("header", "true")              .csv("/data/flight-data/csv/2015-summary.csv")                  # in Python              flightData2015 = spark\              .read\              .option("inferSchema", "true")\              .option("header", "true")\              .csv("/data/flight-data/csv/2015-summary.csv")           

每个DataFrames(在Scala和Python中)都有一组列,其中列的数据行数不确定。行数不确定的原因是读取数据是一个transformation转换操作,因此是一个延迟操作。Spark只查看了几行数据,试图猜测每个列应该是什么类型。图2-7提供了将被读入DataFrame的CSV文件的示例,然后将其转换为本地数组或行列表。

Spark权威指南(中文版)----第2章 Spark简介

如果我们在DataFrame上执行操作,我们将能够看到我们在使用命令行之前看到的相同结果:

flightData2015.take(3)              Array([United States,Romania,15], [United States,Croatia...           

让我们指定一些更多的转换!现在,让我们根据count列对数据进行排序,这是一个整数类型。图2-8说明了这一过程。

                                      注意
记住,sort不会修改DataFrame。我们使用sort作为transformation转换操作,通过转换之前的DataFrame返回一个新的DataFrame。让我们来说明当我们在新DataFrame上调用take转换方法时,发生了什么(图2-8)。      
Spark权威指南(中文版)----第2章 Spark简介

当我们调用sort时,数据不会发生任何变化,因为它只是一个转换。但是,我们可以看到Spark正在构建一个计划,通过explain查看计划,可以看到spark将如何跨集群执行这个计划。

flightData2015.sort("count").explain()                  == Physical Plan ==              *Sort [count#195 ASC NULLS FIRST], true, 0              +- Exchange rangepartitioning(count#195 ASC NULLS FIRST, 200)              +- *FileScan csv [DEST_COUNTRY_NAME#193,ORIGIN_COUNTRY_NAME#194,count#195] ...           

恭喜你,你刚刚读了你的第一个解释计划!解释计划有点神秘,但稍加练习就会变成第二技能。你可以从上到下阅读解释计划,顶部是最终结果,底部是数据的来源。在本例中,查看每行的第一个关键字。你将会看到sort, exchange, 和 FileScan三个关键字。因为数据排序实际上是一个宽依赖转换,因为位于不同分区中的数据行需要相互比较。在这一点上,不要过于担心理解所有的解释计划,它们只是帮助你调试和提高你的知识的工具。

现在,就像我们之前做的那样,我们可以指定一个action来启动这个执行计划。然而,在此之前,我们将设置一个配置项。默认情况下,当我们执行shuffle时,Spark会输出200个shuffle分区。让我们将这个值设为5,以减少来自shuffle的输出分区的数量:

spark.conf.set("spark.sql.shuffle.partitions", "5")              flightData2015.sort("count").take(2)              ... Array([United States,Singapore,1], [Moldova,United States,1])           

图2-9演示了这个操作。请注意,除了逻辑转换之外,我们还包括物理分区计数。

Spark权威指南(中文版)----第2章 Spark简介

我们构建的转换的逻辑计划为DataFrame定义了一个血统关系,以便在任何给定的时间点,Spark都知道如何通过执行之前在相同输入数据上执行的所有操作来重新计算任何分区。这是Spark编程模型-函数式编程的核心,当数据的转换保持不变时,相同的输入总是会导致相同的输出。

我们不操纵物理数据;相反,我们通过类似于前面设置的shuffle分区参数来配置物理执行特性。我们最后得到了5个输出分区,因为这是在shuffle分区中指定的值。您可以更改它以帮助控制您的Spark作业的物理执行特性。继续尝试不同的值,并查看您自己的分区数量。在尝试不同的值时,您应该看到截然不同的运行时。请记住,您可以通过导航到4040端口上的Spark UI来监控工作进度,以查看作业的物理和逻辑执行特性。

2.9.1.  DataFrames 和 SQL

我们在前面的示例中完成了一个简单的转换,现在让我们来学习一个更复杂的例子,并在DataFrames和SQL中进行跟踪。Spark可以使用完全相同的方式运行相同的转换,不管语言是什么。您可以在SQL或DataFrames(在R、Python、Scala或Java)中表达业务逻辑,Spark将在实际执行代码之前将该逻辑编译成一个底层计划(您可以在explain计划中看到)。使用Spark SQL,您可以将任何DataFrame注册为表或视图(临时表),并使用纯SQL查询它。在编写SQL查询或编写DataFrame代码之间没有性能差异,它们都“编译”到我们在DataFrame代码中指定的相同的底层计划。

您可以通过一个简单的方法调用将任何DataFrame转换为一个表或视图:

flightData2015.createOrReplaceTempView("flight_data_2015")           

现在我们可以用SQL查询我们的数据了。为此,我们将使用spark.sql函数(记住,spark是我们的SparkSession变量),它方便地返回一个新的DataFrame。这使得您可以在任何给定的时间点以最方便的方式指定转换,而不牺牲任何效率来这样做! 为了理解这一点,让我们来看看两个解释计划:

// in Scala              val sqlWay = spark.sql("""              SELECT DEST_COUNTRY_NAME, count(1)              FROM flight_data_2015              GROUP BY DEST_COUNTRY_NAME              """)                  val dataFrameWay = flightData2015              .groupBy('DEST_COUNTRY_NAME)              .count()                  sqlWay.explain              dataFrameWay.explain                  # in Python              sqlWay = spark.sql("""              SELECT DEST_COUNTRY_NAME, count(1)              FROM flight_data_2015              GROUP BY DEST_COUNTRY_NAME              """)                  dataFrameWay = flightData2015\              .groupBy("DEST_COUNTRY_NAME")\              .count()                  sqlWay.explain()              dataFrameWay.explain()              == Physical Plan ==              *HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[count(1)])              +- Exchange hashpartitioning(DEST_COUNTRY_NAME#182, 5)              +- *HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[partial_count(1)])              +- *FileScan csv [DEST_COUNTRY_NAME#182] ...              == Physical Plan ==              *HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[count(1)])              +- Exchange hashpartitioning(DEST_COUNTRY_NAME#182, 5)              +- *HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[partial_count(1)])              +- *FileScan csv [DEST_COUNTRY_NAME#182] ...           

请注意,这些计划编译成完全相同的底层计划!

让我们从数据中找出一些有趣的统计数据。要理解的一点是,Spark中的DataFrames(和SQL)已经有大量可用的操作。您可以使用和导入数百个函数来帮助您更快地解决大数据问题。我们将使用max函数,来确定进出任何给定位置的最大航班数。这只是扫描DataFrame中相关列中的每个值,并检查它是否大于前面所看到的值。这是一个transformation,因为我们可以有效地过滤到一行。让我们看看这是什么样子:

spark.sql("SELECT max(count) from flight_data_2015").take(1)              // in Scala              import org.apache.spark.sql.functions.max                  flightData2015.select(max("count")).take(1)              # in Python              from pyspark.sql.functions import max                  flightData2015.select(max("count")).take(1)           

很好,这是一个简单的例子,给出了370,002的结果。让我们执行一些更复杂的任务,并在数据中找到前五个目的地国家。这是我们的第一个多转换查询,因此我们将逐步进行。让我们从一个相当简单的SQL聚合开始:

// in Scala              val maxSql = spark.sql("""              SELECT DEST_COUNTRY_NAME, sum(count) as destination_total              FROM flight_data_2015              GROUP BY DEST_COUNTRY_NAME              ORDER BY sum(count) DESC              LIMIT 5              """)              maxSql.show()              # in Python              maxSql = spark.sql("""              SELECT DEST_COUNTRY_NAME, sum(count) as destination_total              FROM flight_data_2015              GROUP BY DEST_COUNTRY_NAME              ORDER BY sum(count) DESC              LIMIT 5              """)                  maxSql.show()              +-----------------+-----------------+              |DEST_COUNTRY_NAME|destination_total|              +-----------------+-----------------+              |    United States|           411352|              |           Canada|             8399|              |           Mexico|             7140|              |   United Kingdom|             2025|              |            Japan|             1548|              +-----------------+-----------------+           

现在,让我们移动到与语义相似但在实现和排序上略有不同的DataFrame语法。但是,正如我们提到的,两者的基本计划是一样的。让我们运行查询,并将其结果视为完整性检查:

// in Scala              import org.apache.spark.sql.functions.desc                  flightData2015              .groupBy("DEST_COUNTRY_NAME")              .sum("count")              .withColumnRenamed("sum(count)", "destination_total")              .sort(desc("destination_total"))              .limit(5)              .show()              # in Python              from pyspark.sql.functions import desc                  flightData2015\              .groupBy("DEST_COUNTRY_NAME")\              .sum("count")\              .withColumnRenamed("sum(count)", "destination_total")\              .sort(desc("destination_total"))\              .limit(5)\              .show()              +-----------------+-----------------+              |DEST_COUNTRY_NAME|destination_total|              +-----------------+-----------------+              |    United States|           411352|              |           Canada|             8399|              |           Mexico|             7140|              |   United Kingdom|             2025|              |            Japan|             1548|              +-----------------+-----------------+           

现在有7个步骤可以让我们回到源数据。您可以在这些DataFrames的解释计划中看到这一点。图2-10显示了我们在“代码”中执行的步骤。真正的执行计划(在explain中可见)将与图2-10所示不同,因为物理执行进行了优化。然而,这是一个很好的起点。这个执行计划是一个有向无环图(DAG)的transformation,每一个transformation都产生一个新的不可变的DataFrame,我们调用一个action来生成结果。

Spark权威指南(中文版)----第2章 Spark简介

第一步是读取数据。我们之前定义了DataFrame,但是,作为提醒,Spark实际上并没有读取它,直到在DataFrame上调用了一个action,或者从原始DataFrame派生出一个action。

第二步是分组;当我们调用groupBy,我们最终RelationalGroupedDataset,这是一个有趣的名称,用于DataFrame分组指定但需要用户指定一个聚合,才能进一步查询。我们基本上指定了我们将被一个键(或一组键)分组,现在我们要对这些键的每一个进行聚合。

因此,第三步是指定聚合。让我们使用sum聚合方法。这需要输入一个列表达式,或者,简单地说,是一个列名。sum方法调用的结果是一个新的DataFrame。您将看到它有一个新的模式,但是它知道每个列的类型。需要再次强调的是:截止目前还没有发生真正的计算。这只是已经表达的另一个transformation,Spark仅仅能够通过它跟踪我们的类型信息。

第四步是简单的重命名。我们使用withColumnRenamed方法,它接受两个参数,原始列名和新的列名。当然,此时仍然不会执行计算:这只是另一个transformation转换!

第五步对数据进行排序,这样如果我们从DataFrame的顶部获取结果,它们将在destination_total列中拥有最大的值。

那可能已经注意到,我们导入了一个函数desc,来做这个排序。您可能还注意到,desc不返回字符串,而是返回一个Column对象。通常,许多DataFrame方法将接受字符串(作为列名称)或Column类型或表达式。Column类型和表达式实际上是一样的。

最后,我们将指定一个limit。这只是指定我们只想在最后的DataFrame中返回前5个值,而不是返回所有数据。

最后一步是我们的action操作。现在,我们开始真正收集DataFrame的结果,Spark将返回我们正在执行的语言中的列表或数组。为了加强这一切,让我们看看前面的查询的解释计划:

# in Python              flightData2015\              .groupBy("DEST_COUNTRY_NAME")\              .sum("count")\              .withColumnRenamed("sum(count)", "destination_total")\              .sort(desc("destination_total"))\              .limit(5)\              .explain()              == Physical Plan ==              TakeOrderedAndProject(limit=5, orderBy=[destination_total#16194L DESC], outpu...              +- *HashAggregate(keys=[DEST_COUNTRY_NAME#7323], functions=[sum(count#7325L)])              +- Exchange hashpartitioning(DEST_COUNTRY_NAME#7323, 5)              +- *HashAggregate(keys=[DEST_COUNTRY_NAME#7323], functions=[partial_sum...              +- InMemoryTableScan [DEST_COUNTRY_NAME#7323, count#7325L]              +- InMemoryRelation [DEST_COUNTRY_NAME#7323, ORIGIN_COUNTRY_NA...              +- *Scan csv [DEST_COUNTRY_NAME#7578,ORIGIN_COUNTRY_NAME...           

2.10.  结束语