天天看点

spark源码分析之Checkpoint的过程概述checkpoint流程分析

checkpoint 的机制保证了需要访问重复数据的应用 spark 的dag执行行图可能很庞大,task 中计算链可能会很长,这时如果 task 中途运行出错,那么 task 的整个需要重算非常耗时,因此,有必要将计算代价较大的 rdd checkpoint 一下,当下游 rdd 计算出错时,可以直接从 checkpoint 过的 rdd 那里读取数据继续算。

<b>我们先来看一个例子,checkpoint的使用</b>

我们可以看到最先调用了<code>sparkcontext</code>的<code>setcheckpointdir</code> 设置了一个checkpoint 目录

我们跟进这个方法看一下

这个方法挺简单的,就创建了一个目录,接下来我们看rdd核心的<code>checkpoint</code> 方法,跟进去

这个方法没有返回值,逻辑只有一个判断,<code>checkpointdir</code>刚才设置过了,不为空,然后创建了一个<code>reliablerddcheckpointdata</code>,我们来看<code>reliablerddcheckpointdata</code>

这个<code>reliablerddcheckpointdata</code>的父类<code>rddcheckpointdata</code>我们再继续看它的父类

rdd 需要经过

[ initialized --&gt; checkpointinginprogress--&gt; checkpointed ]

这几个阶段才能被 checkpoint。

这类里面有一个枚举来标识checkpoint的状态,第一次初始化时是initialized。

checkpoint这个一步已经完成了,回到我们的rdd成员变量里<code>checkpointdata</code>这个变量指向的<code>rddcheckpointdata</code>的实例。

<b>checkpoint初始化时序图</b>

spark源码分析之Checkpoint的过程概述checkpoint流程分析

我们知道一个spark job运行最终会调用<code>sparkcontext</code>的<code>runjob</code>方法将任务提交给executor去执行,我们来看<code>runjob</code>

最后一行代码调用了<code>docheckpoint</code>,在<code>dagscheduler</code>将任务提交给集群运行之后,我来看这个<code>docheckpoint</code>方法

这个是一个递归,遍历rdd依赖链条,当rdd是<code>checkpointdata</code>不为空时,调用<code>checkpointdata</code>的<code>checkpoint()</code>方法。还记得<code>checkpointdata</code>类型是什么吗?就是<code>rddcheckpointdata</code> ,我们来看它的<code>checkpoint</code>方法,以下

这个方法开始做checkpoint操作了,将<code>docheckpoint</code>交给子类去实现checkponit的逻辑,我们去看子类怎么实现<code>docheckpoint</code>

上面的代码最终会返回新的<code>checkpointrdd</code> ,父类将它复值给成员变量<code>cprdd</code>,最终标记当前状态为checkpointed并清空当rdd的依赖链。到此checkpoint的数据就被序列化到hdfs上了。

<b> checkpoint 写数据时序图</b>

spark源码分析之Checkpoint的过程概述checkpoint流程分析

我们知道task是saprk运行任务的最小单元,当task执行失败的时候spark会重新计算,这里task进行计算的地方就是读取checkpoint的入口。我们可以看一下<code>shufflemaptask</code> 里的计算方法<code>runtask</code>,如下

这是spark真正调用计算方法的逻辑<code>runtask</code>调用 <code>rdd.iterator()</code> 去计算该 rdd 的 partition 的,我们来看rdd的<code>iterator()</code>

这里会继续调用<code>computeorreadcheckpoint</code>,我们看该方法

当调用<code>rdd.iterator()</code>去计算该 rdd 的 partition 的时候,会调用 <code>computeorreadcheckpoint(split: partition)</code>去查看该 rdd 是否被 checkpoint 过了,如果是,就调用该 rdd 的 parent rdd 的 iterator() 也就是 checkpointrdd.iterator(),否则直接调用该rdd的<code>compute</code>, 那么我们就跟进<code>checkpointrdd</code>的<code>compute</code>

这里就两行代码,意思是从path上读取我们的checkpoint数据,看一下<code>readcheckpointfile</code>

<code>checkpointrdd</code> 负责读取文件系统上的文件,生成该 rdd 的 partition。这就解释了为什么要为调用了<code>checkpoint</code>的rdd 添加一个 <code>parent checkpointrdd</code>的原因。

到此,整个checkpoint的流程就结束了。

<b>checkpoint 读取数据时序图</b>

spark源码分析之Checkpoint的过程概述checkpoint流程分析

继续阅读