天天看点

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

文章目录

  • 概要
  • 1. CoarseGrainedExecutorBackend简介
  • 2. 启动CoarseGrainedExecutorBackend
      • 2.1 main()
          • 2.1.1 定义变量用以保存命令行参数
          • 2.1.2 解析命令行参数
          • 2.1.3 调用run()来执行主要逻辑
      • 2.2 run()
          • 2.2.1 run()获取到的参数
          • 2.2.2 从Driver获取properties
          • 2.2.3 创建ExecutorEnv
          • 2.2.3 创建ExecutorRpcEndpoint
      • 2.3 new CoarseGrainedExecutorBackend之onStart()
      • 2.4 处理返回消息RegisteredExecutor
  • 总结
  • 致谢
  • 附录

概要

本文基于Spark之Launch Executor一文中的启动流程之Worker端receive消息,延伸讲解了

Worker

执行的

Command

中调用的

CoarseGrainedExecutorBackend类

1. CoarseGrainedExecutorBackend简介

在介绍

CoarseGrainedExecutorBackend的启动流程

前,先了解下CoarseGrainedExecutorBackend。

我们知道Executor负责计算任务,即执行task,而Executor对象的创建及维护是由CoarseGrainedExecutorBackend负责的,CoarseGrainedExecutorBackend在spark运行期是一个单独的进程,在Worker节点可以通过Java的jps命令查看,如下

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

如何启动上图中的CoarseGrainedExecutorBackend进程是我们后面介绍的重点,先看下其定义及UML

package org.apache.spark.scheduler.cluster.CoarseGrainedExecutorBackend.scala

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录
Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

从上图可以得到如下信息

  1. CoarseGrainedExecutorBackend

    RpcEndpoint的子类

    ,能够和Driver进行RPC通信,其生命周期方法

    onStart

    一定要关注,看执行了哪些动作。
  2. CoarseGrainedExecutorBackend

    维护了两个属性

    executor

    driver

    : executor负责运行task,driver是自己所属的Driver的Ref,用于向Driver发消息。
  3. ExecutorBackend

    有抽象方法

    statusUpdate

    ,负责将Executor的计算结果返回给Driver。

    最后,CoarseGrainedExecutorBackend是spark运行期的一个进程,Executor运行在该进程内。

除此之外,

CoarseGrainedExecutorBackend

有自己的

NettyRpcEndpoint

,基于此与

Driver

进行通信。

2. 启动CoarseGrainedExecutorBackend

接着Spark 任务调度之Launch Executor介绍,最后讲到

  • Worker进程收到LaunchExecutor消息,然后,Worker将收到的消息封装为

    ExecutorRunner

    对象,调用其

    start

    方法。
  • start

    方法启动线程,调用

    ExecutorRunner

    fetchAndRunExecutor

    方法,

    fetchAndRunExecutor

    方法中将收到的信息

    拼接为Linux命令

    ,然后使用

    ProcessBuilder

    执行Linux命令启动

    CoarseGrainedExecutorBackend

    (和启动Driver的方式如出一辙),执行的Linux命令大致如下 :
    Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

    上面的java命令会调用CoarseGrainedExecutorBackend的main方法,main方法中处理命令行传入的参数,然后创建RpcEnv,并注册RpcEndpoint等。

    接下来看看main()方法的具体

2.1 main()

从上图可以看出来,命令行会给main方法传很多参数。因此,这些参数需要解析并保存起来:

2.1.1 定义变量用以保存命令行参数
Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录
2.1.2 解析命令行参数
Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录
2.1.3 调用run()来执行主要逻辑

  见下一节的----2.2 run()

2.2 run()

2.2.1 run()获取到的参数

查看run()函数的定义:

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

可以看到,run()函数得到的信息中,包含了

DriverUrl

ExecutorId(由Master指定的)

WorkerUrl

等等。

2.2.2 从Driver获取properties

既然知道了Driver的地址,那么就作为SparkUser去从Driver拿到Spark的properties

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录
2.2.3 创建ExecutorEnv

通过从

Driver拿到的properties

来为Executor创建ExecutorEnv

在内部,会去创建属于此Executor的NettyRpcEnv,也就意味着它有自己的TransportContext、TransportServer等等组件了,可以进行通信。

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录
2.2.3 创建ExecutorRpcEndpoint

通过ExecutorNettyRpcEnv来创建ExecutorRpcEndpoint,

- 通过传入一个

CoarseGrainedExecutorBackend

来创建的。

- 同时,会创建一个

WorkerWatcher

–RpcEndpoint,用于连接到Worker进程,并在连接断开是终止JVM

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

到这里,我们知道了,run()方法会去创建CoarseGrainedExecutorBackend。

也就是说,

  • 在Object的main()中调用run()
  • run()中创建了一个Class CoarseGrainedExecutorBackend

下面我们来看看,在new CoarseGrainedExecutorBackend时候会执行的onStart()方法

2.3 new CoarseGrainedExecutorBackend之onStart()

  

onStart()

方法中主要是

获取到DriverRpcEndpointRef

,从而

向Driver

发送

ask(RegisterExecutor)

。到Driver去

注册此Executor

,Driver会去做执行注册等操作,参见Launch Executor之Driver端。这里虽然向Driver注册了Executor,但是实际上只是将地址信息先告诉了Driver,然后等待Driver返回RegisteredExecutor,Executor才被真正的创建出来。

  值得注意的是由于Master只是起到一个资源调度和管理的作用,因此,这里Executor并不会去向Master注册,并且Master也不会像维护Driver和Worker那样去维护Executor的信息。

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

既然是ask,那么就会有异步的返回消息。他的返回消息就是RegisteredExecutor

2.4 处理返回消息RegisteredExecutor

Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

既然

Driver返回了注册成功

的消息,那么就要去

真正的创建出来这个Executor

了。

  • 83行中,创建了一个Executor,executorId是之前由Master指定的。

至此,

CoarseGrainedExecutorBackend就已经成功启动

,并且

创建了Executor

,Executor创建时,会创建ThreadPool,并且去获取数据,从而执行Tasks。这部分我们放在Executor执行Task部分来进行剖析。

总结

本文基于Spark之Launch Executor一文中的启动流程之Worker端receive消息,延伸讲解了Worker执行的Command中调用的CoarseGrainedExecutorBackend类。

  • CoarseGrainedExecutorBackend是Executor的直接创建者

    ,也就是说,它拥有一个Executor。CoarseGrainedExecutorBackend有自己的executorRpcEndpoint,Executor基于此进行通信
  • CoarseGrainedExecutorBackend在创建Executor之前,才会去向Driver注册Executor,直到Driver返回注册成功,才会去真正创建Executor

    ,之前只是准备好了注册信息等。
Spark 任务调度之启动CoarseGrainedExecutorBackend概要1. CoarseGrainedExecutorBackend简介2. 启动CoarseGrainedExecutorBackend总结致谢附录

致谢

  • Spark 任务调度之创建Executor
  • Spark 任务调度之启动CoarseGrainedExecutorBackend

附录

package org.apache.spark.executor

--------------------------CoarseGrainedExecutorBackend.scala:----------------------------
private[spark] object CoarseGrainedExecutorBackend extends Logging {

  private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL]) {

    Utils.initDaemon(log)

    SparkHadoopUtil.get.runAsSparkUser { () =>
      // Debug code
      Utils.checkHost(hostname)
      
      // 1. 作为SparkUser去拿到Spark的properties。
      val executorConf = new SparkConf
      
          // 1.1 fetcher是由RpcEnv.create()创建的一个临时的NettyRpcEnv
         // 注意这个NettyRpcEnv只是用来获取Executor和App相关的配置信息等,获取完就会shutdown.
      val fetcher = RpcEnv.create(
        "driverPropsFetcher",
        hostname,
        -1,
        executorConf,
        new SecurityManager(executorConf),
        clientMode = true)
      val driver = fetcher.setupEndpointRefByURI(driverUrl)
      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
      val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
      fetcher.shutdown()

      // 2. 通过从Driver拿到的properties来创建ExecutorEnv,
      //    在内部,会去创建属于此Executor的NettyRpcEnv,也就意味着它有自己的TransportContext、TransportServer等等组件了,可以进行通信。
      val driverConf = new SparkConf()
      for ((key, value) <- props) {
        // this is required for SSL in standalone mode
        if (SparkConf.isExecutorStartupConf(key)) {
          driverConf.setIfMissing(key, value)
        } else {
          driverConf.set(key, value)
        }
      }

      cfg.hadoopDelegationCreds.foreach { tokens =>
        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
      }

      val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
        
      // 3. 通过ExecutorNettyRpcEnv来创建ExecutorRpcEndpoint,
            // 通过传入一个CoarseGrainedExecutorBackend来创建的。
      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      workerUrl.foreach { url =>
        // 3.* 创建一个WorkerWatcher--RpcEndpoint,用于连接到Worker进程并在连接断开时终止jvm
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
    }
  }

  def main(args: Array[String]) {
    var driverUrl: String = null
    var executorId: String = null
    var hostname: String = null
    var cores: Int = 0
    var appId: String = null
    var workerUrl: Option[String] = None
    val userClassPath = new mutable.ListBuffer[URL]()

    var argv = args.toList
    while (!argv.isEmpty) {
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
        case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
        case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
        case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
        case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
        case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
        case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case Nil =>
        case tail =>
          // scalastyle:off println
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          // scalastyle:on println
          printUsageAndExit()
      }
    }

    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }

    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }
-------------------------------------onStart()------------
 override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }


------------------------------receive()-------------------
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }
           

继续阅读