文章目录
- 摘要
- 1. 简介
- 2. 编程模型
-
- 2.1 示例
- 2.2 类型
- 2.3 更多示例
- 3. 实现
-
- 3.1 执行概述
- 3.2 主数据结构
- 3.3 容错
-
- Worker故障
- Master故障
- 存在失败的语义
- 3.4 本地化
- 3.5 任务粒度
- 3.6 备份任务
- 4. 优化
-
- 4.1 分区函数
- 4.2 排序保证
- 4.3 Combiner函数
- 4.4 输入和输出类型
- 4.5 副作用
- 4.6 跳过错误记录
- 4.7 本地执行
- 4.8 状态信息
- 4.9 计数器
- 5. 性能
-
- 5.1 集群配置
- 5.2 Grep
- 5.3 排序
- 5.4 备份任务的影响
- 5.5 机器故障
- 6. 经验
-
- 6.1 大规模索引
- 7. 相关工作
- 8. 总结
- 致谢
- 参考文献
- 附录A. 词频
摘要
MapReduce是一种编程模型,也是一种处理和生成大型数据集的相应实现。用户指定处理键/值对以生成一组中间键/值对的map函数,以及合并与相同中间键关联的所有中间值的reduce函数。如本文所示,该模型中可以表达许多现实的任务。
以此函数风格编写的程序将自动并行化并在大型商用机器集群上运行。运行时系统负责对输入数据分区的细节,调度程序在一组机器上的执行,处理机器故障以及管理所需的机器间通信。这使得没有任何并行和分布式系统经验的开发者可以轻松利用大型分布式系统的资源。
我们的MapReduce实现在大型商用机器集群上运行,并且具有高度可扩展性:典型的MapReduce计算可处理数千台机器上的数TB数据。开发者发现该系统易于使用:已经实施了数百个MapReduce程序,每天在Google的集群上执行超过一千个MapReduce作业。
1. 简介
在过去五年中,在谷歌工作的作者和许多其他人已经实施了数百个专业计算,这些计算处理大量原始数据,例如爬虫文档,Web请求日志等,以计算各种派生数据,例如作为反向索引,Web文档的图形结构的各种表示,每台机器爬取的页面数量的摘要,给定日期中最频繁查询的集合等。大多数这样的计算在概念上是直截了当的。但是,输入数据通常很大,并且计算必须分布在数百或数千台机器上,以便在合理的时间内完成。如何并行化计算、分配数据和处理故障的问题使得需要大量复杂代码处理这些问题,从而隐藏了原始的简单计算。
为应对这种复杂性,我们设计了一种新的抽象,它允许我们表达我们试图执行的简单计算,并且在库中隐藏了并行化、容错、数据分布和负载均衡的烦琐细节。这种抽象的灵感来自Lisp和许多其他函数式语言中的map和reduce原语。我们意识到我们的大多数计算都涉及将map操作应用于输入中的每个逻辑“记录”,以便计算出一组中间键/值对,然后对拥有相同键的所有值应用reduce操作,以适当地生成派生数据。我们使用具有用户指定的map和reduce操作的函数模型,使我们能够轻松地并行化大型计算并使用重新执行作为容错的主要机制。
这项工作的主要贡献是一个简单而强大的可实现大规模计算的自动并行化和分发的接口,并结合在大型商用PC集群上对该接口高性能的实现。
第2节描述了基本的编程模型,并给出了几个示例。第3节描述了针对基于集群的计算环境定制的MapReduce接口的实现。第4节描述了我们发现的针对编程模型有用的几个改进。第5节对各种任务的实现进行了性能测试。第6节探讨了MapReduce在Google中的使用,包括我们使用它作为重写线上索引系统基础的经验。第7部分讨论了相关的和未来的工作。
2. 编程模型
计算拿到一组输入键/值对,并产生一组输出键/值对。MapReduce库的用户将计算表达为两个函数:Map和Reduce。
由用户编写的Map获取到一个输入对并产生一组中间键/值对。MapReduce库将具有相同中间键I的所有中间值聚合在一起,并将它们传递给Reduce函数。
Reduce函数也是由用户编写的,它接受一个中间键I以及该键值对应的一组中间值。它将这些值合并在一起,形成一组可能较小的值。通常,每次Reduce调用只产生零个或一个输出值。中间值通过迭代器提供给用户的reduce函数。这允许我们可以处理太大而不适合放入内存中的值列表。
2.1 示例
考虑统计大量文档中每个单词出现次数的问题。用户将编写类似于以下伪代码的代码:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map函数产出每个单词加上相关的出现次数(在这个简单的例子中只是’1’)。reduce函数将特定单词生成的所有次数加总在一起。
此外,用户编写代码以输入和输出文件的名称以及可选的调整参数填充mapreduce规范对象。然后,用户调用MapReduce函数,并将规范对象传递给它。用户的代码与MapReduce库(用C ++实现)链接在一起。附录A包含此示例的完整程序文本。
2.2 类型
尽管先前的伪代码是根据字符串输入和输出编写的,但从概念上讲,用户提供的map和reduce函数具有相关的类型:
map(k1,v1) -> list(k2,v2)
reduce(k2,list(v2)) -> list(v2)
即,输入键和值是来自与输出键和值不同的域中。此外,中间键和值来自与输出键和值相同的域。
我们的C++实现将字符串传递给用户定义的函数以及从中获取,并将自定义函数留给用户代码实现以在字符串和适当的类型之间进行转换。
2.3 更多示例
以下是一些有趣程序的简单示例,可以很容易地表示为MapReduce计算。
- 分布式Grep:如果map函数与提供的模式匹配,则会产出一行。reduce函数是一个恒等函数,它只是将提供的中间数据复制到输出。
- URL访问频率计数:map函数处理网页请求日志并输出<URL,1>。reduce函数将同一URL的所有值相加,并输出<URL,total count>对。
- 反向Web链接图:map函数将每个链接输出<target,source>对,表示在名为source的页面中找到了target链接。reduce函数将与给定target链接关联的所有source链接的列表拼接,并输出一对:<target,list(source)>。
- 每个主机的关键词向量:关键词向量将文档或一组文档中出现的最重要的单词概括为<word,frequency>对的列表。map函数为每个输入文档输出<hostname,term vector>对(其中主机名是从文档的URL中提取的)。将给定主机的所有文档关键词向量传递给reduce函数。它将这些关键词向量加到一起,丢弃不常用的关键词,然后得到一个最终的<hostname,term vector>对。
- 反向索引:map函数解析每个文档,并产出一系列<word,document ID>对。reduce函数接受给定单词的所有对,对相应的文档ID进行排序并输出<word,list(document ID)>对。所有输出对的集合形成简单的倒排索引。很容易扩大该计算以跟踪单词位置。
- 分布式排序:map函数从每个记录中提取key,并产出<key,record>对。reduce函数未加改变地输出所有对。此计算依赖于第4.1节中描述的分区工具和第4.2节中描述的排序属性。
3. 实现
MapReduce接口可能有许多不同的实现。正确的选择取决于环境。例如,一种实现方式可能适用于小型共享内存机器,另一种实现方式适用于大型NUMA多核处理器,而另一种实现方式适用于甚至更大型的联网机器集合。
本节描述的一种实现是针对Google广泛使用的计算环境:使用交换式以太网连接在一起的大型商用PC集群[4]。在我们的环境中:
- 机器通常是运行Linux的双核x86处理器,每台机器有2-4GB的内存。
- 使用商用网络硬件 - 通常在机器级别上为100Mb/s或1Gb/s,但在总体双向带宽下平均下来要少得多。
- 集群由数百或数千台机器组成,因此机器故障很常见。
- 存储由直接连接到各个机器的廉价IDE磁盘提供。使用内部开发的分布式文件系统[8]来管理存储在这些磁盘上的数据。文件系统使用复本在不可靠的硬件之上提供可用性和可靠性。
- 用户将作业提交给调度系统。每个作业由一组任务组成,并由调度器划分到集群中的一组可用机器上。
3.1 执行概述
通过自动将输入数据分区为一组M个分片,在多台机器上分布式调用Map。输入分片可以由不同的机器并行处理。通过使用分区函数(例如,
hash(key) mod R
),将中间键key空间划分为R片来分发给reduce调用。分区数(R)和分区函数由用户指定。
图1显示了我们实现的MapReduce操作的总体流程。当用户程序调用MapReduce函数时,会发生以下操作序列(图1中的编号标签对应于下面列表中的数字):
- 用户程序中的MapReduce库首先将输入文件划分为每片通常16MB到64MB的M个分片(可由用户通过可选参数控制)。然后它在一组机器上启动程序的许多副本。
- 该程序的其中一个副本是特殊的 - master。其余的是由master分配工作的worker。有M个map任务和R个reduce任务要分配。Master挑选闲置的worker并为每一个分配一个map任务或reduce任务。
- 分配了map任务的worker将读取相应输入分片的内容。它从输入数据中解析键/值对,并将每个键/值对传递给用户定义的Map函数。Map函数生成的中间键/值对缓存在内存中。
- 缓存的中间键值对会被周期性地写入本地磁盘,通过分区函数划分为R个区域。这些缓存对在本地磁盘上的位置将回传给master,它负责将这些位置转发给reduce任务的worker。
- 当执行reduce任务的worker收到master关于这些位置的通知后,它使用远程过程调用从执行map任务的worker的本地磁盘中读取缓存数据。当reduce任务的worker读取了所有中间数据时,它会通过中间结果的键对其进行排序,从而将所有出现的相同键的键值对组合在一起。排序是必要的,因为通常许多不同的键映射到同一个reduce任务。如果中间结果数据量太大而无法容纳在内存中,则使用外部排序。
- reduce任务的worker遍历已排序的中间数据,并且对于遇到的每个唯一中间键,它将该键和相应的中间值集合传递给用户的Reduce函数。Reduce函数的输出附加到此reduce分区的最终输出文件中。
- 完成所有map任务和reduce任务后,master会唤醒用户程序。此时,用户程序中的MapReduce调用将返回到用户代码。
成功完成后,MapReduce执行的输出将在R个输出文件中可用(每个reduce任务一个,文件名由用户指定)。通常,用户不需要将R个输出文件合并到一个文件中 - 他们经常将这些文件作为输入传递给另一个MapReduce调用,或者从另一个能够处理输入是分区的多个文件的分布式应用程序中使用它们。
3.2 主数据结构
Master保存了几个数据结构。它存储每个map任务和reduce任务的状态(空闲,正在运行或已完成)以及worker机器的标识(用于非空闲任务)。
Master是管道,通过该管道,中间文件的区域的位置从map任务传播到reduce任务。因此,对于每个完成的map任务,master保存了由map任务产生的R个中间文件区域的位置和大小。map任务完成时,将收到对此位置和大小信息的更新。该信息将增量地推送给正在进行reduce任务的worker。
3.3 容错
由于MapReduce库是设计用来帮助使用数百或数千台机器处理大量数据,因此库必须能够优雅地容忍机器故障。
Worker故障
Master定期发消息给每个Worker。如果在一定时间内未收到worker的响应,则master将worker标记为失败。worker完成的任何map任务都将重置回其初始空闲状态,因此有资格在其他worker上调度。同样,在失败的worker上正在运行的任何map任务或reduce任务也将重置为空闲状态,并有资格重新调度。
完成了的map任务在发生故障时将重新执行,因为它们的输出存储在故障机器的本地磁盘上,因此无法访问。完成了的reduce任务不需要重新执行,因为它们的输出存储在全局文件系统中。
当一个map任务首先由worker A执行然后由worker B执行时(因为A失败),所有执行reduce任务的worker都会收到重新执行的通知。任何尚未从worker A读取数据的reduce任务将从worker B读取数据。
MapReduce可以抵御大规模的worker故障。例如,在一个MapReduce操作期间,运行集群上的网络维护导致一组80台机器同时持续几分钟无法访问。MapReduce的master简单地重新执行了无法访问的worker机器完成的工作,并继续运行,最终完成了MapReduce操作。
Master故障
让master周期性地写下上述主数据结构的检查点是容易的。如果master任务终止了,则新副本可以从上次检查点状态开始运行。但是,鉴于只有一个master,其失败的可能性不大;因此,如果master失败,我们当前的实现将中止MapReduce计算。客户端可以检查此情况并根据需要重试MapReduce操作。
存在失败的语义
当用户提供的map和reduce运算符是其输入值的确定性函数时,我们的分布式实现产生的输出与整个程序的顺序非错误执行产生的输出相同。
我们依靠map的原子提交和reduce任务输出来实现这个属性。每个正在运行的任务将其输出写入专用临时文件。reduce任务生成一个这样的文件,map任务生成R这样的文件(每个reduce任务一个)。当map任务完成时,worker向master发送消息,并在消息中包含R个临时文件的名称。如果master收到已完成的map任务的完成消息,则忽略该消息。否则,它会在主数据结构中记录这R个文件的名称。
当reduce任务完成时,reduce worker会将其临时输出文件原子重命名为最终输出文件。如果在多台机器上执行相同的reduce任务,则将对同一个最终输出文件执行多次重命名调用。我们依赖底层文件系统提供的原子重命名操作来保证最终文件系统状态只包含一次执行reduce任务所产生的数据。
绝大多数map和reduce运算符都是确定性的,并且在这种情况下,我们的语义等同于顺序执行,这使得开发者很容易推理出程序的行为。当map和/或reduce运算符不确定时,我们提供较弱但仍然合理的语义。在存在非确定性运算符的情况下,特定reduce任务R1的输出等同于由非确定性程序的顺序执行产生的R1的输出。然而,另一个reduce任务R2的输出可能对应于由非确定性程序的不同顺序执行产生的R2的输出。
考虑map任务M和reduce任务R1和R2。假设e(Ri)是提交的Ri的执行(只有一个这样的执行)。较弱的语义出现是因为e(R1)可能已经读取了M一次执行所产生的输出,而e(R2)可能已经读取了由M的不同执行产生的输出。
3.4 本地化
在我们的计算环境中,网络带宽是一种相对稀缺的资源。我们利用输入数据(由GFS[8]管理)存储在组成我们集群的机器的本地磁盘上,从而节省网络带宽。GFS将每个文件划分为64 MB块,并将每个块的多个副本(通常为3个副本)存储在不同的机器上。MapReduce的master会考虑输入文件的位置信息,并尝试在包含相应输入数据副本的机器调度map任务。如果做不到这一点,它会尝试在该任务的输入数据的副本附近调度map任务(例如,在与包含数据的机器位于同一网络交换机上的worker机器上)。在集群中的大部分worker上运行大型MapReduce操作时,大多数输入数据在本地读取,不会消耗网络带宽。
3.5 任务粒度
如上所述,我们将map阶段细分为M片并将reduce阶段细分为R片。理想情况下,M和R应远大于worker机器的数量。让每个worker执行许多不同的任务可以提高动态负载均衡,并且还可以在worker出现故障时加快恢复速度:已完成的许多map任务可以分布在所有其他worker机器上。
在我们的实现中,M和R可以有多大是有实际限定的,因为master必须做出O(M + R)调度决策并且如上所述在内存中保持O(M * R)状态。(但是,内存使用的常数因子很小:状态的O(M * R)部分由map任务/reduce任务对大约一个字节的数据组成。)
此外,R通常被用户限制,因为每个reduce任务的输出最终都在一个单独的输出文件中。在实践中,我们倾向于选择M使得每个单独的任务大约是16 MB到64 MB的输入数据(这样上面描述的局部优化是最有效的),并且我们使R成为我们期望使用的worker机器数量的一小部分。我们经常使用2000个worker机器执行M = 200000和R = 5000的MapReduce计算。
3.6 备份任务
延长MapReduce操作总时间的常见原因之一是“straggler”:一台机器需要花费非常长的时间来完成计算中最后几个map或reduce任务之一。Straggler可能出于各种原因而出现。例如,具有坏磁盘的机器可能会遇到频繁的可纠正错误,从而将其读取性能从30 MB/s降低到1 MB/s。集群调度系统可能已在机器上安排了其他任务,导致它由于CPU、内存、本地磁盘或网络带宽的竞争而更慢地执行MapReduce代码。我们遇到的最近一个问题是机器初始化代码中的一个错误导致处理器缓存被禁用:受影响的计算机上的计算速度减慢了一百多倍。
我们有一个缓解straggler问题的通用机制。当MapReduce操作接近完成时,master会调度剩余正在进行的任务的备份执行。无论原始执行还是备份执行完成,任务都会标记为已完成。我们已经调整了这种机制,因此它通常会将操作使用的计算资源增加不超过百分之几。我们发现这大大减少了完成大型MapReduce操作的时间。例如,当禁用备份任务机制时,第5.3节中描述的排序程序需要多花44%的时间才能完成。
4. 优化
虽然简单地编写Map和Reduce函数所提供的基本功能足以满足大多数需求,但我们发现一些扩展很有用。 本节将介绍这些内容。
4.1 分区函数
MapReduce的用户指定他们想要的reduce任务/输出文件的数量(R)。对中间键值使用分区函数,数据将在这些任务之间被分区。提供了使用散列的默认分区函数(例如“hash(key) mod R”)。这通常会产生比较均衡的分区。但是,在某些情况下,通过基于键的其他函数对数据进行分区是很有用的。例如,有时输出键是URL,我们希望单个主机的所有输入最终都在同一个输出文件中。为了支持这样的情况,MapReduce库的用户可以提供特殊的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数会导致来自同一主机的所有URL最终出现在同一输出文件中。
4.2 排序保证
我们保证在给定分区内,中间键/值对是被按键递增地顺序处理的。这种排序保证可以很容易地为每个分区生成一个排序的输出文件,这在输出文件格式需要通过键值支持有效的随机访问查找时很有用,或者用户发现将输出的数据排序很方便。
4.3 Combiner函数
在某些情况下,每个map任务产生的中间键都会有大量的重复,而用户指定的Reduce函数是可交换和结合的。一个很好的例子是第2.1节中的单词计数示例。由于单词频率倾向于遵循Zipf分布,因此每个map任务将产生数百或数千个形式为<the,1>的记录。所有这些计数将通过网络发送到单个reduce任务,然后通过Reduce函数加总并产出一个数字。我们允许用户指定一个可选的Combiner函数,该函数在网络发送之前对该数据进行部分合并。
Combiner函数在执行map任务的每台机器上执行。通常,combiner和reduce函数使用相同的代码实现。reduce函数和combiner函数之间的唯一区别是MapReduce库如何处理函数的输出。reduce函数的输出被写入最终输出文件。combiner函数的输出被写入将发送到reduce任务的中间文件。
局部合并显著加快了某些种类的MapReduce操作。 附录A包含使用combiner的示例。
4.4 输入和输出类型
MapReduce库支持以多种不同格式读取输入数据。例如,“text”模式输入将每一行视为一个键/值对:键是文件中的偏移量,值是行的内容。另一种常见支持格式是存储按键排序的一系列键/值对。每个输入类型实现都知道如何将自身拆分为有意义的范围以便作为单独的map任务进行处理(例如,文本模式的范围拆分确保范围拆分仅在行边界处发生)。用户可以通过提供简单reader接口的实现来添加对新输入类型的支持,但大多数用户只需要使用少量预定义输入类型之一。
reader不一定需要提供从文件读取的数据。例如,很容易定义一个reader,它从数据库或映射到内存中的数据结构中读取记录。
以类似的方式,我们支持一组输出类型,用于生成不同格式的数据,用户代码很容易添加对新输出类型的支持。
4.5 副作用
在某些情况下,MapReduce的用户发现生成辅助文件作为其map和/或reduce运算符的附加输出很方便。我们依靠应用程序writer使这种副作用成为原子和幂等的。通常,应用程序写入临时文件,并在完全生成文件后以原子方式重命名该文件。
我们不支持单个任务生成的多个输出文件的原子两阶段提交。因此,生成具有跨文件一致性要求的多个输出文件的任务应该是确定性的。这种限制在实践中从未成为问题。
4.6 跳过错误记录
有时用户代码中存在错误导致Map或Reduce函数在某些记录上确定性地崩溃。此类错误会阻止MapReduce操作完成。通常的做法是修复bug,但有时这是不可行的;也许该错误发生在第三方库中,其源代码不可获得的。此外,有时可以忽略一些记录,例如在对大型数据集进行统计分析时。我们提供了一种可选的执行模式,其中MapReduce库检测哪些记录导致确定性崩溃并跳过这些记录以便继续运行。
每个worker进程都安装一个信号处理程序,用于捕获分段违例和总线错误。在调用用户Map或Reduce操作之前,MapReduce库将参数的序列号存储在全局变量中。如果用户代码产生一个信号,则信号处理程序将包含序列号的“last gasp”UDP数据包发送给MapReduce的master。当master在特定记录上看到多次故障时,它表示在下一次重新执行相应的Map或Reduce任务时应该跳过该记录。
4.7 本地执行
Map或Reduce函数的调试问题可能很棘手,因为实际计算发生在分布式系统中,通常在几千台机器上,master动态地做出任务分配决策。为了帮助简化调试、分析和小规模测试,我们开发了一种MapReduce库的替代实现,在本地计算机上顺序执行MapReduce操作的所有任务。向用户提供控件,使得计算可以限于特定的map任务。用户使用特殊标志调用他们的程序,然后可以轻松使用他们认为有用的任何调试或测试工具(例如gdb)。
4.8 状态信息
master运行了一个内部HTTP服务器并导出一组状态页以供人工使用。状态页面显示计算的进度,例如已完成的任务数,正在运行的数量,输入的字节数,中间数据的字节数、输出字节数和处理速率等。页面还包含指向每个任务生成的标准错误和标准输出文件链接。用户可以使用此数据来预测计算将花费多长时间,以及是否应将更多资源添加到计算中。这些页面还可用于判断何时计算速度比预期慢得多。
此外,顶级状态页面显示哪些worker失败了,以及他们在失败时处理的是哪个map或reduce任务。此信息对于尝试诊断用户代码中的错误非常有用。
4.9 计数器
MapReduce库提供计数器工具来计算各种事件的发生次数。例如,用户代码可能想要计算处理的单词总数或索引的德语文档数等。
要使用此工具,用户代码需要创建一个命名计数器对象,然后在Map和/或Reduce函数中相应地递增计数器。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
来自各个worker机器的计数器值定期传播给master(在ping响应中捎带发送)。master合计来自成功map和reduce任务的计数器值,并在MapReduce操作完成时将它们返回给用户代码。当前计数器值也显示在master状态页面上,以便人可以实时观察计算的进度。合计计数器值时,master消除了重复执行相同map或reduce任务的影响,以避免重复计数。(重复执行可能源于我们使用备份任务以及因故障而重新执行任务。)
某些计数器值由MapReduce库自动维护,例如处理的输入键/值对的数量以及生成的输出键/值对的数量。
用户发现计数器工具对于检查MapReduce操作的行为有用。例如,在某些MapReduce操作中,用户代码可能希望确保生成的输出对的数量完全等于处理的输入对的数量,或者处理的德语文档的比例在处理的文件总数的一些可容忍的范围内。
5. 性能
在本节中,我们将测量MapReduce在大型机器集群上运行的两个计算的性能。一个计算搜索大约1TB的数据以寻找特定模式。另一个计算对大约1TB的数据进行排序。
这两个程序是MapReduce用户编写的大部分实际程序中具有代表性的 - 一类程序将数据从一个表示重组为另一个表示,另一个类从大数据集中提取少量有趣数据。
5.1 集群配置
所有程序都在由大约1800台机器组成的集群上运行。每台机器都有两个2GHz Intel Xeon处理器,支持超线程,4GB内存,两个160GB IDE磁盘和一个千兆以太网链路。这些机器被安排在一个两级树形交换网络中,实际上有大约100-200 Gbps的总带宽。所有机器都在同一个托管设施中,因此任意两台机器之间的往返时间不到一毫秒。
在4GB内存中,大约1-1.5GB的内存保留给了集群上运行的其他任务。这些程序是在周末下午运行的,当时CPU、磁盘和网络大部分处于空闲状态。
5.2 Grep
grep程序扫描了10^10行100字节的记录,搜索相对罕见的三字符模式(该模式在92337条记录中出现了)。输入划分为大约64MB的分片(M = 15000),整个输出放在一个文件中(R = 1)。
图2显示了计算随时间的进度。Y轴显示扫描输入数据的速率。随着为此MapReduce计算分配更多机器,速率逐渐增加,当分配了1764个worker时,峰值超过30 GB/s。当map任务完成时,速率开始下降并在计算中大约80秒时达到零。整个计算从开始到结束大约需要150秒。这包括大约一分钟的启动开销。开销是由于程序传播到所有worker机器,并且与GFS交互以打开1000个输入文件集合及获得本地性优化所需的信息产生的延迟。
5.3 排序
排序程序对10^10行100字节的记录(大约1TB的数据)进行排序。 该程序以TeraSort基准[10]为模型。
排序程序包含少于50行的用户代码。三行Map函数从文本行中提取10字节的排序键,并将键和原始文本行作为中间键/值对输出。我们使用内置的Identity函数作为Reduce运算符。该函数将中间键/值对不变地作为输出键/值对传递。最终排序的输出被写入一组双向复制的GFS文件(即,2TB被写为程序的输出)。
和以前一样,输入数据被划分成64MB的分片(M = 15000)。我们将排序的输出分区为4000个文件(R = 4000)。分区函数使用键的原始字节将其分隔为R个分片之一。
我们对此基准测试的分区函数已经内置了键值分布的认识。在一般的排序程序中,我们将添加一个预传MapReduce操作,该操作将收集键的样本并使用采样键的分布来计算最终排序过程的分割点。
图3(a)显示了排序程序的正常执行进度。左上角的图表显示了读取输入的速率。由于所有map任务在200秒之前完成,因此速率达到约13 GB/s并且相当快地消失。请注意,该程序输入速率小于Grep。这是因为排序map任务花费大约一半的时间和I/O带宽将中间输出写入其本地磁盘。Grep相应的中间输出大小可以忽略不计。
左中图显示了数据通过网络从map任务发送到reduce任务的速率。第一个map任务一完成,这种数据混洗传输就会开始。图中的第一个驼峰是第一批大约1700个reduce任务(整个MapReduce分配了大约1700台机器,每台机器一次最多执行一个reduce任务)。计算大约300秒后,这些第一批reduce任务中的一些完成,我们开始为剩余的reduce任务混洗数据。所有的混洗在计算中花费了约600秒。
左下图显示了reduce任务将排序数据写入最终输出文件的速率。在第一次混洗期结束和写入期开始之间存在延迟,因为机器正忙于对中间数据进行排序。写入以一段时间内以大约2-4 GB/s的速率持续。所有写入在计算中花费了大约850秒完成。包括启动开销,整个计算需要891秒。这类似于TeraSort基准测试中当前最佳报告的1057秒结果[18]。
需要注意的一点是:由于我们的本地化优化,输入速率高于混洗速率和输出速率 - 大多数数据从本地磁盘读取并绕过我们相对带宽受限的网络。混洗速率高于输出速率,因为输出阶段写入已排序数据的两个副本(出于可靠性和可用性原因,我们输出了两个副本数据)。我们写两个副本数据,因为这是我们的底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用擦除编码[14]而不是复制,则写入数据的网络带宽要求将会降低。
5.4 备份任务的影响
在图3(b)中,我们显示了禁用备份任务的排序程序的执行。执行流程类似于图3(a)中所示的执行流程,除了有很长的几乎没有任何写入活动发生的尾部。在960秒之后,除了5个reduce任务之外的所有任务都完成了。然而,这最后几个straggler直到300秒后才完成。整个计算需要1283秒,花费的时间增加了44%。
5.5 机器故障
在图3©中,我们展示了一个排序程序的执行,我们故意在计算中的几分钟内杀死了1746个worker进程中的200个。底层集群调度器立即在这些计算机上重新启动新worker进程(因为只有进程被终止,机器仍然正常运行)。
worker死亡显示为负输入率,因为一些先前完成的map工作消失了(因为相应的map任务的worker被杀掉了)并且需要重做。重新执行此map工作的速度相对较快。整个计算在933秒内完成,包括启动开销(只是比正常执行时间增加了5%)。
6. 经验
我们在2003年2月编写了MapReduce库的第一个版本,并在2003年8月对其进行了重大改进,包括本地化优化和跨worker机器的任务执行的动态负载均衡等。从那时起,我们对MapReduce库可广泛应用于我们所处理的各种问题的程度感到十分惊喜。它已在Google的各个领域中使用,包括:
- 大规模的机器学习问题;
- Google新闻和Froogle产品的集群问题;
- 提取用于生成热门查询报告的数据(例如Google Zeitgeist);
- 提取新实验和产品的网页属性(例如从大型网页集中提取地理位置以进行本地化搜索);
- 大规模图计算。
图4显示了随着时间的推移,加入到我们的主源代码管理系统中的独立MapReduce程序数量的显著增长,从2004年初的0到2004年9月底的近900个独立的实例。MapReduce非常成功,因为它使得编写一个简单的程序并在半小时内在一千台机器上高效运行成为可能,大大加快了开发和原型设计周期。此外,它允许没有分布式和/或并行系统经验的开发者轻松地利用大规模资源。
在每个作业结束时,MapReduce库会记录有关作业使用的计算资源的统计信息。在表1中,我们显示了2004年8月Google中运行的MapReduce作业子集的一些统计数据。
6.1 大规模索引
迄今为止,MapReduce最重要的用途之一是完全重写线上索引系统,该系统生成用于Google Web搜索服务的数据结构。索引系统将我们的爬虫系统检索到的大量文档作为输入,并存储为一组GFS文件。这些文档的原始内容超过20 TB的数据。索引过程以五到十个顺序MapReduce操作运行。使用MapReduce(而不是索引系统的先前版本中的ad-hoc分布式传递)提供了以下几个好处:
- 因为处理容错、分发和并行化的代码隐藏在MapReduce库中,索引代码更简单、更小且更易于理解。例如,当使用MapReduce表示时,一个计算阶段的大小从大约3800行C ++代码下降到大约700行。
- MapReduce库的性能足够好,我们可以将概念上不相关的计算分开,而不是将它们混合在一起以避免额外的数据传递。这样可以轻松更改索引过程。例如,在我们的旧索引系统中花费几个月时间进行的一项更改,在新系统中仅花费了几天时间就能实现。
- 索引过程变得更容易操作,因为由机器故障、慢速机器和网络卡顿引起的大多数问题由MapReduce库自动处理而无需操作员干预。此外,通过向索引集群添加新机器,可以轻松提高索引过程的性能。
7. 相关工作
许多系统提供了受限制的编程模型,并使用这些限制来自动并行化计算。例如,可以使用并行前缀计算[6, 9, 13]在N个处理器上以logN时间在N个元素阵列的所有前缀上计算关联函数。根据我们对大型实际计算的经验,MapReduce可以被认为是对这些模型中的一些模型的简化和升华。更重要的是,我们提供容错实现,并可扩展到数千个处理器。相比之下,大多数并行处理系统仅在较小规模上实施,并将处理机器故障的细节留给开发者。
批量同步编程[17]和一些MPI原语[11]提供了更高级的抽象,使开发者更容易编写并行程序。这些系统与MapReduce之间的关键区别在于MapReduce利用受限编程模型自动并行化用户程序并提供透明的容错。
我们的本地化优化从诸如活跃磁盘[12,15]等技术中汲取灵感,其中计算被推送到接近本地磁盘去处理元素,以减少通过I/O子系统或网络发送的数据量。我们在少量磁盘直接连接的商用处理器上运行,而不是直接在磁盘控制器处理器上运行,但其通常方法类似。
我们的备份任务机制类似于Charlotte System[3]中采用的热切调度机制。简单的热切调度的缺点之一是,如果给定任务导致重复失败,则整个计算无法完成。我们用跳过错误记录的机制修复了这个问题的一些实例。
MapReduce实现依赖于内部集群管理系统,该系统负责在大量共享机器上分发和运行用户任务。虽然不是本文的重点,但集群管理系统在本质上与其他系统类似,如Condor[16]。
作为MapReduce库一部分的排序功能在操作上类似于NOW-Sort[1]。源计算机(map worker)对要排序的数据进行分区,并将其发送给R个reduce worker之一。每个reduce worker在本地对其数据进行排序(如果可能,在内存中)。当然,NOW-Sort没有用户可定义的Map和Reduce函数,这些函数使我们的库得到广泛应用。
River[2]提供了一种编程模型,其中进程通过分布式队列发送数据来相互通信。与MapReduce一样,即使存在由异构硬件或系统扰动引入的非一致性,River系统也会尝试提供良好的平均案例性能。River通过细致调度磁盘和网络传输来实现这一目标,以实现平衡的完成时间。MapReduce有不同的方法。通过限制编程模型,MapReduce框架能够将问题划分为大量细粒度的任务。这些任务可以在可用的worker上动态调度,以便更快的worker可以处理更多的任务。受限制的编程模型还允许我们在作业临近结束时调度任务冗余执行,这大大减少了存在不一致性时的完成时间(例如worker缓慢或卡住)。
BAD-FS[5]与MapReduce有着截然不同的编程模型,与MapReduce不同,它的目标是在广域网上执行作业。但是,有两个基本的相似之处。(1) 两个系统都使用冗余执行来从故障引起的数据丢失中恢复。(2) 两者都使用本地感知调度来减少在拥塞的网络链路上发送的数据量。
TACC[7]是一个旨在简化高可用网络服务构建的系统。与MapReduce一样,它依赖于重新执行作为实现容错的机制。
8. 总结
MapReduce编程模型已在Google成功用于多种用途。我们将这一成功归结于几个原因。首先,该模型易于使用,即使对于没有并行和分布式系统经验的开发者也是如此,因为它隐藏了并行化、容错、本地化优化和负载均衡的细节。其次,作为MapReduce计算,很容易表达各种各样的问题。例如,MapReduce用于为Google的线上Web搜索服务生成数据、用于排序、数据挖掘、机器学习以及许多其他系统。第三,我们开发的MapReduce实现,可以扩展到包含数千台机器的大型机器集群。该实现有效地利用了这些机器资源,因此适用于Google遇到的许多大型计算问题。
我们从这项工作中学到了很多东西。首先,限制编程模型可以轻松地并行化和分发计算并使这些计算具有容错能力。其次,网络带宽是一种稀缺资源。因此,我们系统中的许多优化都旨在减少通过网络发送的数据量:位置优化允许我们从本地磁盘读取数据,并将中间数据的单个副本写入本地磁盘可节省网络带宽。第三,冗余执行可用于降低慢速机器的影响,并处理机器故障和数据丢失。
致谢
Josh Levenberg根据他使用MapReduce的经验和其他人的优化建议,在修改和扩展用户级MapReduce API方面发挥了重要作用。MapReduce读取其输入并将其输出写入Google文件系统[8]。我们要感谢Mohit Aron,Howard Gobioff,Markus Gutschke,David Kramer,Shun-Tak Leung和Josh Redstone在开发GFS方面所做的工作。我们还要感谢Percy Liang和Olcan Sercinoglu在开发MapReduce使用的集群管理系统方面所做的工作。Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,Rob Pike和Debby Wallach对本文的早期草稿提供了有用的评论。匿名的OSDI审稿人和我们的上级Eric Brewer为可以改进论文提供了许多有用的领域建议。最后,我们感谢Google工程师组织中MapReduce的所有用户提供有用的反馈、建议和错误报告。
参考文献
- Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson, Arizona, May 1997.
- Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS '99), pages 10-22, Atlanta, Georgia, May 1999.
- Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996.
- Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22-28, April 2003.
- John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
- Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989.
- Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78-91, Saint-Malo, France, 1997.
- Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29-43, Lake George, New York, 2003.
- S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401-408. Springer-Verlag, 1996.
- Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.
- William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.
- L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
- Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831-838, 1980.
- Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335-348, 1989.
- Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68-74, June 2001.
- Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
- L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103-111, 1997.
- Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.
附录A. 词频
此部分包含一个程序,用于统计命令行中指定的一组输入文件中每个单词的出现次数。
#include "mapreduce/mapreduce.h"
// User's map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User's reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: 'result' structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}