天天看点

Spark 源码(8) - Master分配资源并在Worker上启动Executor ,逐行代码注释版

一、回顾一下之前的内容

上一次阅读到了 SparkContext 初始化,继续往下之前,先温故一下之前的内容。

这里有个假设是:Spark 集群以 Standalone 的方式来启动的,作业也是提交到 Spark standalone 集群。

首先需要启动 Spark 集群,使用 start-all.sh 脚本依次启动 Master (主备) 和多个 Worker。

启动好之后,开始提交作业,使用 spark-submit 命令来提交。

  • 首先在提交任务的机器上使用 java 命令启动了一个虚拟机,并且执行了主类 SparkSubmit 的 main 方法作为入口。
  • 然后根据提交到不同的集群,来 new 不同的客户端类,如果是 standalone 的话,就 new 了一个 ClientApp;然后把 java DriverWrapper 这个命令封装到 RequestSubmmitDriver 消息中,把这个消息发送给 Master;
  • Master 随机找一个满足资源条件的 Worker 来启动 Driver,实际上是在虚拟机里执行 DriverWrapper 的 main 方法;
  • 然后 Worker 开始启动 Driver,启动的时候会执行用户提交的 java 包里的 main 方法,然后开始执行 SparkContext 的初始化,依次在 Driver 中创建了 DAGScheduler、TaskScheduler、SchedulerBackend 三个重要的实例。并且启动了 DriverEndpoint 和 ClientEndpoint ,用来和 Worker、Master 通信。

二、Master 处理应用的注册

接着上次 ClientEndpoint 启动之后,会向 Master 发送一个 RegisterApplication 消息,Master 开始处理这个消息。

然后看到 Matster 类处理 RegisterApplication 消息的地方:

Spark 源码(8) - Master分配资源并在Worker上启动Executor ,逐行代码注释版

可以看到,用应用程序的描述和 Driver 的引用创建了一个 Application,然后开始注册这个 Application。

注册 Application 很简单,就是往 Master 的内存中加入各种信息,重点来了,把 ApplicationInfo 加入到了 waitingApps 这个结构里,然后 schedule() 方法会遍历这个列表,为 Application 分配资源,并调度起来。

Spark 源码(8) - Master分配资源并在Worker上启动Executor ,逐行代码注释版
Spark 源码(8) - Master分配资源并在Worker上启动Executor ,逐行代码注释版

然后往 zk 中写入了 Application 的信息,并且往 Driver 发送了一个 RegisteredApplication 应用已经注册的消息。

接着开始 schedule(),这个方法上次讲过,它会遍历两个列表,一个是遍历 waitingDrivers 来启动 Driver,一个是遍历 waitingApps,来启动 Application。

waitingDrivers 列表在客户端请求启动 Driver 的时候就处理过了,本次重点看这个方法:

startExecutorsOnWorkers()
           

复制

三、Master 对资源的调度

有以下几个步骤:

  1. 遍历 waitingApps 的所有 app;
  2. 如果 app 需要的核数小于一个 Executor 可以提供的核数,就不为 app 分配新的 Executor;
  3. 过滤出还有可供调度的 cpu 和 memory 的 workers,并按照 cores 的大小降序排序,作为 usableWorkers;
  4. 计算所有 usableWorkers 上要分配多少 CPU;
  5. 然后遍历可用的 Workers,分配资源并执行调度,启动 Executor。

源码从 Master 类的 schedule() 方法的最后一行 startExecutorsOnWorkers() 开始:

这个方法主要作用是计算 worker 的 executor 数量和分配的资源并启动 executor。

/**
     * Schedule and launch executors on workers
     */
    private def startExecutorsOnWorkers(): Unit = {
        // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
        // in the queue, then the second app, etc.

        for (app <- waitingApps) {
            val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
            // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
            if (app.coresLeft >= coresPerExecutor) {
                // 1. 剩余内存大于单个 executor 需要的内存
                // 2. 剩余的内核数大于单个 executor 需要的内核数
                // 3. 按照内核数从大到小排序
                // Filter out workers that don't have enough resources to launch an executor
                val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
                    .filter(canLaunchExecutor(_, app.desc))
                    .sortBy(_.coresFree).reverse
                val appMayHang = waitingApps.length == 1 &&
                    waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
                if (appMayHang) {
                    logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
                }
                // 计算每个 Worker 上可用的 cores
                val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
                
                // Now that we've decided how many cores to allocate on each worker, let's allocate them
                for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
                    allocateWorkerResourceToExecutors(
                        app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
                }
            }
        }
    }
    
           

复制

(1)遍历 waitingApps,如果 app 还需要的 cpu 核数大于每个执行器的核数,才继续分配。

(2)过滤可用的 worker,条件一:该 worker 剩余内存大于单个 executor 需要的内存;条件二:该 worker 剩余 cpu 核数大于单个 executor 需要的核数;然后按照可用 cpu核数从大到小排序。

(3)下面两个方法是关键的方法

scheduleExecutorsOnWorkers(),用来计算每个 Worker 上可用的 cpu 核数;

allocateWorkerResourceToExecutors() 用来真正在 Worker 上分配 Executor。

四、scheduleExecutorsOnWorkers 计算每个 Worker 可用的核数

这个方法很长,首先看方法注释,大致翻译了一下:

当执行器分配的 cpu 核数(spark.executor.cores)被显示设置的时候,如果这个 worker 上有足够的核数和内存的话,那么每个 worker 上可以执行多个执行器;反之,没有设置的时候,每个 worker 上只能启动一个执行器;并且,这个执行器会使用 worker 能提供出来的尽可能多的核数;

appA 和 appB 都有一个执行器运行在 worker1 上。但是 appA 还需要一些 cpu 核,当 appB 执行结束,释放了它在 worker1 上的核数时, 下一次调度的时候,appA 会新启动一个 executor 获得了 worker1 上所有的可用的核心,因此 appA 就在 worker1 上启动了多个执行器。

设置 coresPerExecutor (spark.executor.cores)很重要,考虑下面的例子:集群有4个worker,每个worker有16核;用户请求 3 个执行器(spark.cores.max = 48,spark.executor.cores=16)。如果不设置这个参数,那么每次分配 1 个 cpu核心,每个 worker 轮流分配一个 cpu核,最终 4 个执行器分配 12 个核心给每个 executor,4 个 worker 也同样分配了48个核心,但是最终每个 executor 只有 12核 < 16 核,所以最终没有执行器被启动。

如果看我的翻译还是很费劲,我就再精简下:

  • 如果没有设置 spark.executor.cores,那么每个 Worker 只能启动一个 Executor,并且这个 Executor 会占用所有 Worker 能提供的 cpu核数;
  • 如果显示设置了,那么每个 Worker 可以启动多个 Executor;

下面是源码,每句都有挨个注释过,中间有一个方法是判断这个 Worker 上还能不能再分配 Executor 了。

重点是中间方法后面那一段,遍历每个 Worker 分配 cpu,如果不是 Spend Out 模式,则在一个 Worker 上一直分配,直到 Worker 资源分配完毕。

private def scheduleExecutorsOnWorkers(
        app: ApplicationInfo,
        usableWorkers: Array[WorkerInfo],
        spreadOutApps: Boolean): Array[Int] = {
        // 每个 executor 的核数
        val coresPerExecutor = app.desc.coresPerExecutor
        // 每个 executor 的最小核数 为1
        val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
        //  每个Worker分配一个Executor? 这个参数可以控制这个行为
        val oneExecutorPerWorker = coresPerExecutor.isEmpty
        //  每个Executor的内存
        val memoryPerExecutor = app.desc.memoryPerExecutorMB
        val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
        // 可用 Worker 的总数
        
        val numUsable = usableWorkers.length
        // 给每个Worker的cores数
        val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
        // 给每个Worker上新的Executor数
        val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
        // app 需要的核心数 和 所有 worker 能提供的核心总数,取最小值
        var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

        //  判断指定的worker是否可以为这个app启动一个executor
        /** Return whether the specified worker can launch an executor for this app. */
        def canLaunchExecutorForApp(pos: Int): Boolean = {
            // 如果能提供的核心数 大于等 executor 需要的最小核心数,则继续分配
            val keepScheduling = coresToAssign >= minCoresPerExecutor
            // 是否有足够的核心:当前 worker 能提供的核数 减去 每个 worker 已分配的核心数 ,大于每个 executor最小的核心数
            val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
            // 当前 worker 新分配的 executor 个数
            val assignedExecutorNum = assignedExecutors(pos)

            //  如果每个worker允许多个executor,就能一直在启动新的的executor
            //  如果在这个worker上已经有executor,则给这个executor更多的core
            // If we allow multiple executors per worker, then we can always launch new executors.
            // Otherwise, if there is already an executor on this worker, just give it more cores.

            // 如果一个 worker 上可以启动多个 executor  或者 这个 worker 还没分配 executor
            val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
            if (launchingNewExecutor) {
                // 总共已经分配的内存
                val assignedMemory = assignedExecutorNum * memoryPerExecutor
                // 是否有足够的内存:当前worker 的剩余内存 减去 已分配的内存 大于每个 executor需要的内存
                val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
                //
                val assignedResources = resourceReqsPerExecutor.map {
                    req => req.resourceName -> req.amount * assignedExecutorNum
                }.toMap
                val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
                    case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
                }
                val enoughResources = ResourceUtils.resourcesMeetRequirements(
                    resourcesFree, resourceReqsPerExecutor)
                // 所有已分配的核数+app需要的核数  小于 app的核数限制
                val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
                keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
            } else {
                // We're adding cores to an existing executor, so no need
                // to check memory and executor limits
                keepScheduling && enoughCores
            }
        }

        // 不断的启动executor,直到不再有Worker可以容纳任何Executor,或者达到了这个Application的要求
        // Keep launching executors until no more workers can accommodate any
        // more executors, or if we have reached this application's limits
        // 过滤出可以启动 executor 的 workers
        var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)

        while (freeWorkers.nonEmpty) {
            // 遍历每个 worker
            freeWorkers.foreach { pos =>
                var keepScheduling = true
                while (keepScheduling && canLaunchExecutorForApp(pos)) {
                    coresToAssign -= minCoresPerExecutor
                    assignedCores(pos) += minCoresPerExecutor

                    //  如果我们在每个worker上启动一个executor,每次迭代为每个executor增加一个core
                    //  否则,每次迭代都会为新的executor分配cores
                    // If we are launching one executor per worker, then every iteration assigns 1 core
                    // to the executor. Otherwise, every iteration assigns cores to a new executor.
                    if (oneExecutorPerWorker) {
                        assignedExecutors(pos) = 1
                    } else {
                        assignedExecutors(pos) += 1
                    }

                    //  如果不使用Spreading out方法,我们会在这个worker上继续调度executor,直到使用它所有的资源
                    //  否则,就跳转到下一个worker
                    // Spreading out an application means spreading out its executors across as
                    // many workers as possible. If we are not spreading out, then we should keep
                    // scheduling executors on this worker until we use all of its resources.
                    // Otherwise, just move on to the next worker.
                    if (spreadOutApps) {
                        keepScheduling = false
                    }
                }
            }
            freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
        }
        assignedCores
    }
           

复制

接着真正开始在 Worker 上启动 Executor:

Spark 源码(8) - Master分配资源并在Worker上启动Executor ,逐行代码注释版

在 launchExecutor 在方法里:

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
        logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
        worker.addExecutor(exec)
        worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
            exec.application.desc, exec.cores, exec.memory, exec.resources))
        exec.application.driver.send(
            ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
    }
           

复制

给 Worker 发送了一个 LaunchExecutor 消息。

然后给执行器对应的 Driver 发送了 ExecutorAdded 消息。

五、总结

本次我们讲了 Master 处理应用的注册,重点是把 app 信息加入到 waitingApps 列表中,然后调用 schedule() 方法,计算每个 Worker 可用的 cpu核数,并且在 Worker 上启动执行器。