<b>3.3 创建metadatacleaner</b>
sparkcontext为了保持对所有持久化的rdd的跟踪,使用类型是timestamped-weakvaluehashmap的persistentrdds缓存。metadatacleaner的功能是清除过期的持久化rdd。创建metadatacleaner的代码如下。
private[spark] val persistentrdds = new timestampedweakvaluehashmap[int,
rdd[_]]
private[spark] val metadatacleaner =
new metadatacleaner(metadatacleanertype.spark_context, this.cleanup,
conf)
我们仔细看看metadatacleaner的实现,见代码清单3-14。
代码清单3-14 metadatacleaner的实现
private[spark] class metadatacleaner(
cleanertype: metadatacleanertype.metadatacleanertype,
cleanupfunc: (long) => unit,
conf: sparkconf)
extends logging
{
val name = cleanertype.tostring
private val delayseconds = metadatacleaner.getdelayseconds(conf,
cleanertype)
private val periodseconds = math.max(10, delayseconds / 10)
private val timer = new timer(name + " cleanup timer", true)
private val task = new timertask {
override def run() {
try {
cleanupfunc(system.currenttimemillis() - (delayseconds * 1000))
loginfo("ran metadata cleaner for " + name)
} catch {
case e: exception => logerror("error running cleanup task for
" + name, e)
}
}
if (delayseconds > 0) {
timer.schedule(task, delayseconds * 1000, periodseconds * 1000)
def cancel() {
timer.cancel()
从metadatacleaner的实现可以看出其实质是一个用timertask实现的定时器,不断调用cleanupfunc: (long) => unit这样的函数参数。构造metadatacleaner时的函数参数是cleanup,用于清理persistentrdds中的过期内容,代码如下。
private[spark] def cleanup(cleanuptime:
long) {
persistentrdds.clearoldvalues(cleanuptime)