天天看点

深入理解Spark:核心思想与源码分析. 3.3 创建metadataCleaner

<b>3.3 创建metadatacleaner</b>

sparkcontext为了保持对所有持久化的rdd的跟踪,使用类型是timestamped-weakvaluehashmap的persistentrdds缓存。metadatacleaner的功能是清除过期的持久化rdd。创建metadatacleaner的代码如下。

private[spark] val persistentrdds = new timestampedweakvaluehashmap[int,

rdd[_]]

private[spark] val metadatacleaner =

new metadatacleaner(metadatacleanertype.spark_context, this.cleanup,

conf)

我们仔细看看metadatacleaner的实现,见代码清单3-14。

代码清单3-14 metadatacleaner的实现

private[spark] class metadatacleaner(

cleanertype: metadatacleanertype.metadatacleanertype,

cleanupfunc: (long) =&gt; unit,

conf: sparkconf)

extends logging

{

val name = cleanertype.tostring

private val delayseconds = metadatacleaner.getdelayseconds(conf,

cleanertype)

private val periodseconds = math.max(10, delayseconds / 10)

private val timer = new timer(name + " cleanup timer", true)

private val task = new timertask {

        override def run() {

try {

cleanupfunc(system.currenttimemillis() - (delayseconds * 1000))

loginfo("ran metadata cleaner for " + name)

} catch {

case e: exception =&gt; logerror("error running cleanup task for

" + name, e)

}

    }

if (delayseconds &gt; 0) {

timer.schedule(task, delayseconds * 1000, periodseconds * 1000)

def cancel() {

timer.cancel()

从metadatacleaner的实现可以看出其实质是一个用timertask实现的定时器,不断调用cleanupfunc: (long) =&gt; unit这样的函数参数。构造metadatacleaner时的函数参数是cleanup,用于清理persistentrdds中的过期内容,代码如下。

private[spark] def cleanup(cleanuptime:

long) {

persistentrdds.clearoldvalues(cleanuptime)

继续阅读