文章目录
- 概要
- 1. CoarseGrainedExecutorBackend简介
- 2. 启动CoarseGrainedExecutorBackend
-
-
- 2.1 main()
-
-
- 2.1.1 定义变量用以保存命令行参数
- 2.1.2 解析命令行参数
- 2.1.3 调用run()来执行主要逻辑
-
- 2.2 run()
-
-
- 2.2.1 run()获取到的参数
- 2.2.2 从Driver获取properties
- 2.2.3 创建ExecutorEnv
- 2.2.3 创建ExecutorRpcEndpoint
-
- 2.3 new CoarseGrainedExecutorBackend之onStart()
- 2.4 处理返回消息RegisteredExecutor
-
- 总结
- 致谢
- 附录
概要
本文基于Spark之Launch Executor一文中的启动流程之Worker端receive消息,延伸讲解了
Worker
执行的
Command
中调用的
CoarseGrainedExecutorBackend类
。
1. CoarseGrainedExecutorBackend简介
在介绍
CoarseGrainedExecutorBackend的启动流程
前,先了解下CoarseGrainedExecutorBackend。
我们知道Executor负责计算任务,即执行task,而Executor对象的创建及维护是由CoarseGrainedExecutorBackend负责的,CoarseGrainedExecutorBackend在spark运行期是一个单独的进程,在Worker节点可以通过Java的jps命令查看,如下
如何启动上图中的CoarseGrainedExecutorBackend进程是我们后面介绍的重点,先看下其定义及UML
package org.apache.spark.scheduler.cluster.CoarseGrainedExecutorBackend.scala
从上图可以得到如下信息
-
是CoarseGrainedExecutorBackend
,能够和Driver进行RPC通信,其生命周期方法RpcEndpoint的子类
一定要关注,看执行了哪些动作。onStart
-
维护了两个属性CoarseGrainedExecutorBackend
和executor
: executor负责运行task,driver是自己所属的Driver的Ref,用于向Driver发消息。driver
-
有抽象方法ExecutorBackend
statusUpdate
,负责将Executor的计算结果返回给Driver。
最后,CoarseGrainedExecutorBackend是spark运行期的一个进程,Executor运行在该进程内。
除此之外,
CoarseGrainedExecutorBackend
有自己的
NettyRpcEndpoint
,基于此与
Driver
进行通信。
2. 启动CoarseGrainedExecutorBackend
接着Spark 任务调度之Launch Executor介绍,最后讲到
- Worker进程收到LaunchExecutor消息,然后,Worker将收到的消息封装为
对象,调用其ExecutorRunner
方法。start
-
方法启动线程,调用start
的ExecutorRunner
方法,fetchAndRunExecutor
方法中将收到的信息fetchAndRunExecutor
,然后使用拼接为Linux命令
执行Linux命令启动ProcessBuilder
(和启动Driver的方式如出一辙),执行的Linux命令大致如下 :CoarseGrainedExecutorBackend
上面的java命令会调用CoarseGrainedExecutorBackend的main方法,main方法中处理命令行传入的参数,然后创建RpcEnv,并注册RpcEndpoint等。
接下来看看main()方法的具体
2.1 main()
从上图可以看出来,命令行会给main方法传很多参数。因此,这些参数需要解析并保存起来:
2.1.1 定义变量用以保存命令行参数
2.1.2 解析命令行参数
2.1.3 调用run()来执行主要逻辑
见下一节的----2.2 run()
2.2 run()
2.2.1 run()获取到的参数
查看run()函数的定义:
可以看到,run()函数得到的信息中,包含了
DriverUrl
、
ExecutorId(由Master指定的)
、
WorkerUrl
等等。
2.2.2 从Driver获取properties
既然知道了Driver的地址,那么就作为SparkUser去从Driver拿到Spark的properties
2.2.3 创建ExecutorEnv
通过从
Driver拿到的properties
来为Executor创建ExecutorEnv
在内部,会去创建属于此Executor的NettyRpcEnv,也就意味着它有自己的TransportContext、TransportServer等等组件了,可以进行通信。
2.2.3 创建ExecutorRpcEndpoint
通过ExecutorNettyRpcEnv来创建ExecutorRpcEndpoint,
- 通过传入一个
CoarseGrainedExecutorBackend
来创建的。
- 同时,会创建一个
WorkerWatcher
–RpcEndpoint,用于连接到Worker进程,并在连接断开是终止JVM
到这里,我们知道了,run()方法会去创建CoarseGrainedExecutorBackend。
也就是说,
- 在Object的main()中调用run()
- run()中创建了一个Class CoarseGrainedExecutorBackend
下面我们来看看,在new CoarseGrainedExecutorBackend时候会执行的onStart()方法
2.3 new CoarseGrainedExecutorBackend之onStart()
onStart()
方法中主要是
获取到DriverRpcEndpointRef
,从而
向Driver
发送
ask(RegisterExecutor)
。到Driver去
注册此Executor
,Driver会去做执行注册等操作,参见Launch Executor之Driver端。这里虽然向Driver注册了Executor,但是实际上只是将地址信息先告诉了Driver,然后等待Driver返回RegisteredExecutor,Executor才被真正的创建出来。
值得注意的是由于Master只是起到一个资源调度和管理的作用,因此,这里Executor并不会去向Master注册,并且Master也不会像维护Driver和Worker那样去维护Executor的信息。
既然是ask,那么就会有异步的返回消息。他的返回消息就是RegisteredExecutor
2.4 处理返回消息RegisteredExecutor
既然
Driver返回了注册成功
的消息,那么就要去
真正的创建出来这个Executor
了。
- 83行中,创建了一个Executor,executorId是之前由Master指定的。
至此,
CoarseGrainedExecutorBackend就已经成功启动
,并且
创建了Executor
,Executor创建时,会创建ThreadPool,并且去获取数据,从而执行Tasks。这部分我们放在Executor执行Task部分来进行剖析。
总结
本文基于Spark之Launch Executor一文中的启动流程之Worker端receive消息,延伸讲解了Worker执行的Command中调用的CoarseGrainedExecutorBackend类。
-
,也就是说,它拥有一个Executor。CoarseGrainedExecutorBackend有自己的executorRpcEndpoint,Executor基于此进行通信CoarseGrainedExecutorBackend是Executor的直接创建者
-
,之前只是准备好了注册信息等。CoarseGrainedExecutorBackend在创建Executor之前,才会去向Driver注册Executor,直到Driver返回注册成功,才会去真正创建Executor
致谢
- Spark 任务调度之创建Executor
- Spark 任务调度之启动CoarseGrainedExecutorBackend
附录
package org.apache.spark.executor
--------------------------CoarseGrainedExecutorBackend.scala:----------------------------
private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
Utils.initDaemon(log)
SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)
// 1. 作为SparkUser去拿到Spark的properties。
val executorConf = new SparkConf
// 1.1 fetcher是由RpcEnv.create()创建的一个临时的NettyRpcEnv
// 注意这个NettyRpcEnv只是用来获取Executor和App相关的配置信息等,获取完就会shutdown.
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
-1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()
// 2. 通过从Driver拿到的properties来创建ExecutorEnv,
// 在内部,会去创建属于此Executor的NettyRpcEnv,也就意味着它有自己的TransportContext、TransportServer等等组件了,可以进行通信。
val driverConf = new SparkConf()
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
}
cfg.hadoopDelegationCreds.foreach { tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
// 3. 通过ExecutorNettyRpcEnv来创建ExecutorRpcEndpoint,
// 通过传入一个CoarseGrainedExecutorBackend来创建的。
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
// 3.* 创建一个WorkerWatcher--RpcEndpoint,用于连接到Worker进程并在连接断开时终止jvm
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
}
}
def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
-------------------------------------onStart()------------
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
------------------------------receive()-------------------
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}