天天看点

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

前言

最近一直在做项目之余研究MapReduce的资料,自己刚开始几天尝试直接读源码进行分析,但是尝试了几天之后发现非常的艰难,因为整个代码还是比较多的,如果直接读很容易抓不住重点,被淹没了在了海一般的代码中。所以又开始找相关的资料。比较幸运的是,再看了很多个资料之后,发现了下面这本我认为是目前自己看到的写的最清晰的一本关于MapReduce的书。这本书的作者目前是Hulu的架构师,如果大家对MapReduce感兴趣的话,强烈推荐一下。需要说明的是,这本书成书时间比较早,当时还处于Hadoop2.x出来没多久的时候,所以这本书中分析的代码还主要是以1.x版本为主。但我觉得这个不影响对整个MR架构和设计的理解,如果想要对着源码去学习的同学,可以去下载下来代码。作为参考,我个人是对着hadoop-1.2.1版本的源码来阅读这本书的。

《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》(董西成)电子书下载、在线阅读、内容简介、评论 - 京东电子书频道​e.jd.com

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

MapReduce代码太多了,轮廓性质的内容我觉得这本书写的已经很不错了,感兴趣可以直接读。这篇专栏的系列文章更倾向于从一个个小问题入手,把每个小问题说明白,并希望借此能对整个MapReduce的部分懂得更加明白。

我们都知道MapReduce是计算向数据移动的, 任务分配时是会考虑数据本地性的(locality)。MR把用户提交得Job文件划分成了很多个部分(splits),每一个数据块(split)对应一个Map任务(Task),然后MR会把这些Map任务分配给集群中的机器进行执行。所谓计算向数据移动就是我们尽可能的把Map任务分配到该任务所需要处理的数据块(split)所在的机器节点上,这样该任务执行时就可以尽可能地从本地进行数据读取,从而避免了跨机器地数据传输。那么本篇文章,我们就从用户提交任务,一直到JobTracker调度任务来介绍MapReduce是如何实现计算向数据迁移的。

同时,我们需要注意的是,计算本地性考虑只是针对Map任务的,Reduce任务调度不需要考虑数据的本地性。

MR1.0核心组件介绍

由于接下来会可能涉及到许多的概念,所以先在这里把MR1.0的整体架构和一些核心概念介绍下。

1.0的架构中三个核心组件为

JobTracker

TaskTracker

TaskScheduler

JobTracker

是整个MR的核心组件,用户会向

JobTracker

提交作业(Job),

JobTracker

会将用户的作业(Job)分解成一个个任务(Task)。

TaskTracker

部署运行在集群中的机器节点上,并定时向

JobTracker

通过心跳信息汇报自己目前的情况,包括自己所在节点的资源信息和自己所负责的所有任务的运行状态信息。常规情况下,一般是一个机器上部署运行一个TaskTracker。除了汇报信息之外,

TaskTracker

也会在自己还有空余资源来运行任务的时候,向

JobTracker

请求新的任务。当

JobTracker

接收到请求之后,会借助

TaskScheduler

来为该

TaskTracker

调度指派任务。

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

除了分解作业、处理TaskTracker的心跳信息之外,JobTracker很重要的工作就是监控、维护各种状态。包括每一个作业的状态,每一个任务的状态,每一个TaskTracker的状态。在JobTracker中,每一个作业的所有状态信息用

JobInProgress

类来表示(JIP), 每一个任务的所有状态信息用

TaskInProgress

来表示(TIP)。显然,一个JIP里面应该包含很多个TIP,因为一个作业里面包含很多个任务。同时为了更好的容错性,Hadoop种引入了

TaskAttempt

(TA)来表示每一个任务的执行示例。

TaskInProgress

TaskAttempt的关系可以理解为程序和进程的关系。

即一个

TaskInProgress

对象可以对应多个

TaskAttempt

对象。每启动一个执行该Task的实例,就会多一个

TaskAttempt

。不管其中哪一个

TaskAttempt

先执行完,该Task被认为执行结束。

实际上真正在机器上运行的是TaskAttempt

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

那么有了上面的知识,我们就知道在Hadoop中

一台机器上

机器节点-TaskTracker-Task-Job的对应关系了,

我们用Node-TaskTracker-TA-TIP-JIP来进行表示。

而在JobTracker中也用了很多的Map结构体来保存这样的对应关系,用于快速查询。

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

MapReduce整体任务调度介绍

接下来我们进入真正的任务调度部分。首先我们需要了解Hadoop中机器节点的三层拓扑结构表示:数据中心-机架-节点。比如下图中的H1节点,其拓扑结构地址为/D1/R1/H1,

即1号数据中心的1号机架中的1号节点

。同时下图中的拓扑结构为树结构,

R1为H1节点的父节点。

一般而言我们都使用机架-机器两层架构,任务在同一个数据中心内进行分配、调度执行。

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

Hadoop根据输入数据与实际分配的计算资源之间的距离将任务分成三类:

node-local

(输入数据与计算资源同节点),

rack-local

(同机架)和

off-switch

(跨机架)。当输入数据与计算资源位于不同节点上时,Hadoop需将输入数据远程复制到计算资源所在节点进行处理。两者距离越远,需要的网络开销越大,因此调度器进行任务分配时尽量选择离输入数据近的节点资源。任务选择的优先级从高到低依次为:

node-local

rack-local

off-switch

我们以上面的拓扑图为例说明三种调度优先级。假设某一时刻,TaskTracker X出现空闲的计算资源,向JobTracker汇报心跳请求新的任务,调度器根据一定的调度策略为之选择了任务Y。

  • 场景1 如果X是H1,任务Y输入数据块为b1,则该任务为node-local。
  • 场景2 如果X是H1,任务Y输入数据块为b2,则该任务为rack-local。
  • 场景3 如果X是H1,任务Y输入数据块为b4,则该任务为off-switch。

接下来我们从一个作业提交到调度执行的流程来介绍任务调度是如何考虑本地性的。

1、作业提交时的Split生成

在Hadoop中,用户提交任务之前会先在本地进行数据的逻辑切分成一个个的Split。我们以Hadoop自带的FileSplit为例。一个FileSplit包含的信息就是文件名(

file

)、split数据在整个文件中的开始位置的偏移(

start

), split数据的长度(

length

), split所在的机器节点列表(

hosts

)。

public 
           

一个可能的FileSplit例子如下。

这里面hosts信息的确定对后续的计算本地性实现至关重要。
file 
           

我们提过hosts是该split所在的机器节点位置信息,但是事实上Split是可能跨节点的。比如HDFS中的block大小是64M,而该Split大小是128M, 那么该Split是一定会跨block的。跨block也就大概率会跨机器的,那么这个时候hosts列表该是哪些节点信息呢?Hadoop选择的策略是

选择包含该Split数据量最大的前K个节点列入hosts信息中。

那么在到时候分配该split对应的任务的时候,Hadoop更倾向于分配给hosts中包含的节点来执行,这样可以尽可能的减少数据地网络传输。

为此,FileInputFormat设计了一个简单有效的启发式算法:

首先按照rack包含的数据量对rack进行排序,然后在rack内部按照每个node包含的数据量对node排序,最后取前N个node的host作为InputSplit的host列表,这里的N为block副本数。

这样,当任务调度器调度Task时,只要将Task调度给位于host列表的节点,就认为该Task满足本地性。

举个例子

, 某个Hadoop集群的网络拓扑结构如图所示,HDFS中block副本数为3,某个InputSplit包含3个block,大小依次是100、150和75,很容易计算,4个rack包含的(该InputSplit的)数据量分别是175、250、150和75。rack2中的node3和node4,rack1中的node1将被添加到该InputSplit的host列表中。

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

具体Hadoop中对于一个Split的hosts信息确定主要借助的就是下面的NodeInfo结构体。其中

value

就代表该节点包含该split的数据的总量。前面提过,节点拓扑结构构成一个树结构,

Hadoop中父节点所包含的split的数据总量为所有子节点包含的Split数据量总和。

统计完每个节点的包含该Split的数据总量之后,我们先对所有的机架节点进行排序,然后对包含数据量最多的机架节点的所有叶子节点(leaves)按照其包含Split的数据量进行排序,最后取出排序最靠前的N个节点。具体的代码比较长,就不贴在这里了,如果要去读,就去读

FileInputFormat.java
// FileInputFormat.java
           

2. 作业分解——任务生成

当用户把作业提交给JobTracker之后,JobTracker会根据用户提供的所有split信息来预先生成所有需要执行任务,每个split对应生成一个Map任务。在Hadoop中,所有的任务都用TaskInProgress类来表示。根据上述过程中生成的split信息,Hadoop用一个

Map<Node, List<TaskInProgress>>

数据结构体来保存对于每一个Node来说符合计算本地性的所有任务。

比如对应下面所述的一个split,就会在三个节点的任务列表中都加入一个该split对应的Map 任务。

file 
           

同时,拓扑结构中机架节点的任务列表为其中所有的处于该机架的机器节点符合计算本地性的任务列表的并集。这样,当一个任务没办法分配给最适合他的节点之后,Hadoop倾向于把任务分配给和该节点处于相同机架的其他节点。

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

有了上面的Map<Node, List<TaskInProgress>>,当一个机器节点向JobTracker请求任务的时候,JobTracker可以通过查询该构体查找该节点符合计算本地性的所有任务列表,如果适合该节点的所有任务已经执行完了,那么JobTracker就会从其父节点——机架节点获取所有的任务,然后将其中未执行的任务分配该机器节点。

该结构体建立的最核心的两个函数为initTasks()和createCache()函数

// JobTracker.java
           

3、任务调度

上述的作业分解完成后,当TaskTracker来请求任务时,JobTracker就会调度指派任务。我们以下图所示的例子来介绍任务调度的过程:

当运行在

Node1

上的TaskTracker向JobTracker请求任务的时候,JobTracker会查询步骤2中所构建的Map结构体,从Node1中的符合计算本地性的任务列表中选择一个任务分配给该TaskTracker,如果分配成功,此时该任务的本地性级别为

node-local

但如果该任务列表中没有任务可以被调度到Node1上时,JobTracker会查询Node1的父节点,即Rack节点。从Rack节点中的符合计算本地性的任务列表中选择一个任务分配给Node1上的TaskTracker,这时分配的任务的本地性级别就为

rack-local。
mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

上面的描述是一个很粗糙的图景描述,目的是为了让大家明白整个任务调度的全貌。知道如何按照本地性调度任务的。

接下来重点介绍下任务调度的细节。

作业调度主要分为下图所示的两层架构:

作业调度和任务调度

。TaskSchduler主要决定Job之间的调度顺序,比如按照FIFO顺序。

  • JobTracker通过调用 assignTasks(TaskTracker) 来获取为该TaskTracker分配的新的任务。
  • TaskTracker首先会计算该TaskTracker所在的节点还能够执行几个Map任务和几个Reduce任务。在1.0版本的Hadoop中,每个节点的计算资源被均匀划分成了一个个的槽(slot), 一般情况下一个Map任务占据一个Map slot, 一个Reduce任务占据一个Reduce slot。计算能分配给该TaskSchduler的任务数目取决于该taskSchduler总共的任务槽数和现有正在执行的任务数目。二者的差值便是我们还能向该TaskSchduler分配的任务最大数目。

比如我们现在计算得到请求的TaskTracker还能够执行两个Map任务,和一个Reduce任务。那么TaskSchduler会按照一定的顺序去遍历询问每个作业(Job)。常用的一个遍历顺序就是FIFO,即先到的作业更容易被第一个遍历询问。

当TaskSchduler遍历第一个Job的时候,会按照下图所示的顺序去分别调用Job的三个获取任务的接口。当获取Map任务的时候,首先调用

obtainNewNodeOrRackLocalMapTask()

询问该Job有没有符合该TaskTracker所在的节点的计算本地性的任务,这里的计算本地性包括

node-local

rack-local

。如果有而且该任务可以被分配给该TaskTracker,那么就分配给它。如果没有,那么就会调用obtainNewNonLocalMapTask()询问该Job是否有其他的Map任务可以分配给该TaskTracker,即使这个任务所需要的数据和当前TaskTracker所在的节点不在同一个机架内。

当TaskSchduler需要调度Reduce任务的时候,还是按照顺序去遍历所有的作业(Job), 然后调用每个Job的obtainNewReduceTask()来询问是否有可以分配给该TaskTracker的Reduce任务。如果有则分配给它。如果没有,那么TaskSchduler就会遍历下一个Job来进行询问。

mapreduce工作流程_MapReduce(1)——MapReduce是如何实现计算向数据迁移的

总结

本篇文章我们主要介绍了Hadoop1.0中是如何实现计算向数据移动的。需要再次说明的是,任务调度分配时的计算本地性考虑只是针对Map任务,对Reduce的任务分配,我们不需要考虑计算本地性。希望上述的文章,对大家了解MapReduce的任务调度能有个大致的了解。后续还会继续写一系列的MapReduce文章,如果感兴趣,欢迎关注我的专栏~