天天看点

Spark源码分析之二:Job的调度模型与运行反馈

        1、Job的调度模型与运行反馈;

        2、Stage划分;

        3、Stage提交:对应TaskSet的生成。

        今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈。

Spark源码分析之二:Job的调度模型与运行反馈

        首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行。入口方法为DAGScheduler的runJon()方法。代码如下:

        runJob()方法就做了三件事:

        首先,获取开始时间,方便最后计算Job执行时间;

        其次,调用submitJob()方法,提交Job,返回JobWaiter类型的对象waiter;

        最后,waiter调用JobWaiter的awaitResult()方法等待Job运行结果,这个运行结果就俩:JobSucceeded代表成功,JobFailed代表失败。

        awaitResult()方法通过轮询标志位_jobFinished,如果为false,则调用this.wait()继续等待,否则说明Job运行完成,返回JobResult,其代码如下:

        而这个标志位_jobFinished是在Task运行完成后,如果已完成Task数目等于总Task数目时,或者整个Job运行失败时设置的,随着标志位的设置,Job运行结果jobResult也同步进行设置,代码如下:

        接下来,看看submitJob()方法,代码定义如下:

        submitJob()方法一共做了5件事情:

        第一,数据检测,检测rdd分区以确保我们不会在一个不存在的partition上launch一个task,并且,如果partitions大小为0,即没有需要执行任务的分区,快速返回;

        第二,为Job生成一个jobId,该jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增;

        第三,将func转化下,否则JobSubmitted无法接受这个func参数,T转变为_;

        第四,创建一个JobWaiter对象waiter,该对象会在方法结束时返回给上层方法,以用来监测Job运行结果;

        第五,将一个JobSubmitted事件加入到事件队列eventProcessLoop中,等待工作线程轮询调度(速度很快)。

        这里,我们有必要研究下事件队列eventProcessLoop,eventProcessLoop为DAGSchedulerEventProcessLoop类型的,在DAGScheduler初始化时被定义并赋值,代码如下:

        DAGSchedulerEventProcessLoop继承自EventLoop,我们先来看看这个EventLoop的定义。

        我们可以看到,EventLoop实际上就是一个任务队列及其对该队列一系列操作的封装。在它内部,首先定义了一个LinkedBlockingDeque类型的事件队列,队列元素为E类型,其中DAGSchedulerEventProcessLoop存储的则是DAGSchedulerEvent类型的事件,代码如下:

        并提供了一个后台线程,专门对事件队列里的事件进行监控,并调用onReceive()方法进行处理,代码如下:

        那么如何向队列中添加事件呢?调用其post()方法,传入事件即可。如下:

        言归正传,上面提到,submitJob()方法利用eventProcessLoop的post()方法加入一个JobSubmitted事件到事件队列中,那么DAGSchedulerEventProcessLoop对于JobSubmitted事件是如何处理的呢?我们看它的onReceive()方法,源码如下:

        继续看doOnReceive()方法,代码如下:

        对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。

        好了,到这里,第一阶段Job的调度模型与运行反馈大体已经分析完了,至于后面的第二、第三阶段,留待后续博文继续分析吧~

继续阅读