天天看点

深入理解Spark:核心思想与源码分析. 3.7 创建和启动DAGScheduler

<b>3.7 创建和启动dagscheduler</b>

dagscheduler主要用于在任务正式交给taskschedulerimpl提交之前做一些准备工作,包括:创建job,将dag中的rdd划分到不同的stage,提交stage,等等。创建dag-scheduler的代码如下。

@volatile private[spark] var dagscheduler:

dagscheduler = _

dagscheduler = new dagscheduler(this)

dagscheduler的数据结构主要维护jobid和stageid的关系、stage、activejob,以及缓存的rdd的partitions的位置信息,见代码清单3-32。

代码清单3-32 dagscheduler维护的数据结构

private[scheduler] val nextjobid = new

atomicinteger(0)

private[scheduler] def numtotaljobs: int =

nextjobid.get()

private val nextstageid = new

private[scheduler] val jobidtostageids =

new hashmap[int, hashset[int]]

private[scheduler] val stageidtostage = new

hashmap[int, stage]

private[scheduler] val shuffletomapstage =

new hashmap[int, stage]

private[scheduler] val jobidtoactivejob =

new hashmap[int, activejob]

// stages we need to run whose parents aren't done

private[scheduler] val waitingstages = new hashset[stage]

// stages we are running right now

private[scheduler] val runningstages = new hashset[stage]

// stages that must be resubmitted due to fetch failures

private[scheduler] val failedstages = new hashset[stage]

private[scheduler] val activejobs = new hashset[activejob]

// contains the locations that each rdd's partitions are cached on

private val cachelocs = new hashmap[int, array[seq[tasklocation]]]

private val failedepoch = new hashmap[string, long]

private val dagscheduleractorsupervisor =

env.actorsystem.actorof(props(new dagscheduleractorsupervisor(this)))

private val closureserializer =

sparkenv.get.closureserializer.newinstance()

在构造dagscheduler的时候会调用initializeeventprocessactor方法创建dagscheduler-eventprocessactor,见代码清单3-33。

代码清单3-33 dagschedulereventprocessactor的初始化

private[scheduler] var eventprocessactor: actorref = _

private def initializeeventprocessactor() {

// blocking the thread until supervisor is started, which ensures

eventprocess-actor is

// not null before any job is submitted

implicit val timeout = timeout(30 seconds)

val initeventactorreply =

dagscheduleractorsupervisor ? props(new

dagschedulereventprocessactor(this))

eventprocessactor = await.result(initeventactorreply, timeout.duration).

asinstanceof[actorref]

}

initializeeventprocessactor()

这里的dagscheduleractorsupervisor主要作为dagschedulereventprocessactor的监管者,负责生成dagschedulereventprocessactor。从代码清单3-34可以看出,dagscheduler-actorsupervisor对于dagschedulereventprocessactor采用了akka的一对一监管策略。dag-scheduleractorsupervisor一旦生成dagschedulereventprocessactor,并注册到actorsystem,actorsystem就会调用dagschedulereventprocessactor的prestart,taskscheduler于是就持有了dagscheduler,见代码清单3-35。从代码清单3-35我们还看到dag-schedulereventprocessactor所能处理的消息类型,比如jobsubmitted、beginevent、completionevent等。dagscheduler-eventprocessactor接受这些消息后会有不同的处理动作。在本章,读者只需要理解到这里即可,后面章节用到时会详细分析。

代码清单3-34 dagscheduleractorsupervisor的监管策略

private[scheduler] class

dagscheduleractorsupervisor(dagscheduler: dagscheduler)

extends actor with logging {

override val supervisorstrategy =

oneforonestrategy() {

            case x: exception =&gt;

logerror("eventprocesseractor failed; shutting down

sparkcontext", x)

                try {

dagscheduler.docancelalljobs()

                } catch {

                    case t: throwable =&gt; logerror("dagscheduler

failed to cancel all jobs.", t)

                }

                dagscheduler.sc.stop()

                stop

    }

def receive = {

case p: props =&gt; sender ! context.actorof(p)

case _ =&gt; logwarning("received unknown message in

dagscheduleractorsupervisor")

代码清单3-35 dagschedulereventprocessactor的实现

dagschedulereventprocessactor(dagscheduler: dags-cheduler)

override def prestart() {

dagscheduler.taskscheduler.setdagscheduler(dagscheduler)

/**

    *

the main event loop of the dag scheduler.

*/

case jobsubmitted(jobid, rdd, func, partitions, allowlocal, callsite,

listener, properties) =&gt;

dagscheduler.handlejobsubmitted(jobid, rdd, func, partitions,

allowlocal, callsite,

                listener, properties)

case stagecancelled(stageid) =&gt;

dagscheduler.handlestagecancellation(stageid)

case jobcancelled(jobid) =&gt;

dagscheduler.handlejobcancellation(jobid)

case jobgroupcancelled(groupid) =&gt;

dagscheduler.handlejobgroupcancelled(groupid)

case alljobscancelled =&gt;

case executoradded(execid, host) =&gt;

dagscheduler.handleexecutoradded(execid, host)

case executorlost(execid) =&gt;

dagscheduler.handleexecutorlost(execid, fetchfailed = false)

case beginevent(task, taskinfo) =&gt;

dagscheduler.handlebeginevent(task, taskinfo)

case gettingresultevent(taskinfo) =&gt;

dagscheduler.handlegettaskresult(taskinfo)

case completion @ completionevent(task, reason, _, _, taskinfo,

taskmetrics) =&gt;

dagscheduler.handletaskcompletion(completion)

case tasksetfailed(taskset, reason) =&gt;

dagscheduler.handletasksetfailed(taskset, reason)

case resubmitfailedstages =&gt;

dagscheduler.resubmitfailedstages()

override def poststop() {

// cancel any active jobs in poststop hook

dagscheduler.cleanupafterschedulerstop()

继续阅读