<b>3.7 创建和启动dagscheduler</b>
dagscheduler主要用于在任务正式交给taskschedulerimpl提交之前做一些准备工作,包括:创建job,将dag中的rdd划分到不同的stage,提交stage,等等。创建dag-scheduler的代码如下。
@volatile private[spark] var dagscheduler:
dagscheduler = _
dagscheduler = new dagscheduler(this)
dagscheduler的数据结构主要维护jobid和stageid的关系、stage、activejob,以及缓存的rdd的partitions的位置信息,见代码清单3-32。
代码清单3-32 dagscheduler维护的数据结构
private[scheduler] val nextjobid = new
atomicinteger(0)
private[scheduler] def numtotaljobs: int =
nextjobid.get()
private val nextstageid = new
private[scheduler] val jobidtostageids =
new hashmap[int, hashset[int]]
private[scheduler] val stageidtostage = new
hashmap[int, stage]
private[scheduler] val shuffletomapstage =
new hashmap[int, stage]
private[scheduler] val jobidtoactivejob =
new hashmap[int, activejob]
// stages we need to run whose parents aren't done
private[scheduler] val waitingstages = new hashset[stage]
// stages we are running right now
private[scheduler] val runningstages = new hashset[stage]
// stages that must be resubmitted due to fetch failures
private[scheduler] val failedstages = new hashset[stage]
private[scheduler] val activejobs = new hashset[activejob]
// contains the locations that each rdd's partitions are cached on
private val cachelocs = new hashmap[int, array[seq[tasklocation]]]
private val failedepoch = new hashmap[string, long]
private val dagscheduleractorsupervisor =
env.actorsystem.actorof(props(new dagscheduleractorsupervisor(this)))
private val closureserializer =
sparkenv.get.closureserializer.newinstance()
在构造dagscheduler的时候会调用initializeeventprocessactor方法创建dagscheduler-eventprocessactor,见代码清单3-33。
代码清单3-33 dagschedulereventprocessactor的初始化
private[scheduler] var eventprocessactor: actorref = _
private def initializeeventprocessactor() {
// blocking the thread until supervisor is started, which ensures
eventprocess-actor is
// not null before any job is submitted
implicit val timeout = timeout(30 seconds)
val initeventactorreply =
dagscheduleractorsupervisor ? props(new
dagschedulereventprocessactor(this))
eventprocessactor = await.result(initeventactorreply, timeout.duration).
asinstanceof[actorref]
}
initializeeventprocessactor()
这里的dagscheduleractorsupervisor主要作为dagschedulereventprocessactor的监管者,负责生成dagschedulereventprocessactor。从代码清单3-34可以看出,dagscheduler-actorsupervisor对于dagschedulereventprocessactor采用了akka的一对一监管策略。dag-scheduleractorsupervisor一旦生成dagschedulereventprocessactor,并注册到actorsystem,actorsystem就会调用dagschedulereventprocessactor的prestart,taskscheduler于是就持有了dagscheduler,见代码清单3-35。从代码清单3-35我们还看到dag-schedulereventprocessactor所能处理的消息类型,比如jobsubmitted、beginevent、completionevent等。dagscheduler-eventprocessactor接受这些消息后会有不同的处理动作。在本章,读者只需要理解到这里即可,后面章节用到时会详细分析。
代码清单3-34 dagscheduleractorsupervisor的监管策略
private[scheduler] class
dagscheduleractorsupervisor(dagscheduler: dagscheduler)
extends actor with logging {
override val supervisorstrategy =
oneforonestrategy() {
case x: exception =>
logerror("eventprocesseractor failed; shutting down
sparkcontext", x)
try {
dagscheduler.docancelalljobs()
} catch {
case t: throwable => logerror("dagscheduler
failed to cancel all jobs.", t)
}
dagscheduler.sc.stop()
stop
}
def receive = {
case p: props => sender ! context.actorof(p)
case _ => logwarning("received unknown message in
dagscheduleractorsupervisor")
代码清单3-35 dagschedulereventprocessactor的实现
dagschedulereventprocessactor(dagscheduler: dags-cheduler)
override def prestart() {
dagscheduler.taskscheduler.setdagscheduler(dagscheduler)
/**
*
the main event loop of the dag scheduler.
*/
case jobsubmitted(jobid, rdd, func, partitions, allowlocal, callsite,
listener, properties) =>
dagscheduler.handlejobsubmitted(jobid, rdd, func, partitions,
allowlocal, callsite,
listener, properties)
case stagecancelled(stageid) =>
dagscheduler.handlestagecancellation(stageid)
case jobcancelled(jobid) =>
dagscheduler.handlejobcancellation(jobid)
case jobgroupcancelled(groupid) =>
dagscheduler.handlejobgroupcancelled(groupid)
case alljobscancelled =>
case executoradded(execid, host) =>
dagscheduler.handleexecutoradded(execid, host)
case executorlost(execid) =>
dagscheduler.handleexecutorlost(execid, fetchfailed = false)
case beginevent(task, taskinfo) =>
dagscheduler.handlebeginevent(task, taskinfo)
case gettingresultevent(taskinfo) =>
dagscheduler.handlegettaskresult(taskinfo)
case completion @ completionevent(task, reason, _, _, taskinfo,
taskmetrics) =>
dagscheduler.handletaskcompletion(completion)
case tasksetfailed(taskset, reason) =>
dagscheduler.handletasksetfailed(taskset, reason)
case resubmitfailedstages =>
dagscheduler.resubmitfailedstages()
override def poststop() {
// cancel any active jobs in poststop hook
dagscheduler.cleanupafterschedulerstop()