天天看點

Spark 源碼(8) - Master配置設定資源并在Worker上啟動Executor ,逐行代碼注釋版

一、回顧一下之前的内容

上一次閱讀到了 SparkContext 初始化,繼續往下之前,先溫故一下之前的内容。

這裡有個假設是:Spark 叢集以 Standalone 的方式來啟動的,作業也是送出到 Spark standalone 叢集。

首先需要啟動 Spark 叢集,使用 start-all.sh 腳本依次啟動 Master (主備) 和多個 Worker。

啟動好之後,開始送出作業,使用 spark-submit 指令來送出。

  • 首先在送出任務的機器上使用 java 指令啟動了一個虛拟機,并且執行了主類 SparkSubmit 的 main 方法作為入口。
  • 然後根據送出到不同的叢集,來 new 不同的用戶端類,如果是 standalone 的話,就 new 了一個 ClientApp;然後把 java DriverWrapper 這個指令封裝到 RequestSubmmitDriver 消息中,把這個消息發送給 Master;
  • Master 随機找一個滿足資源條件的 Worker 來啟動 Driver,實際上是在虛拟機裡執行 DriverWrapper 的 main 方法;
  • 然後 Worker 開始啟動 Driver,啟動的時候會執行使用者送出的 java 包裡的 main 方法,然後開始執行 SparkContext 的初始化,依次在 Driver 中建立了 DAGScheduler、TaskScheduler、SchedulerBackend 三個重要的執行個體。并且啟動了 DriverEndpoint 和 ClientEndpoint ,用來和 Worker、Master 通信。

二、Master 處理應用的注冊

接着上次 ClientEndpoint 啟動之後,會向 Master 發送一個 RegisterApplication 消息,Master 開始處理這個消息。

然後看到 Matster 類處理 RegisterApplication 消息的地方:

Spark 源碼(8) - Master配置設定資源并在Worker上啟動Executor ,逐行代碼注釋版

可以看到,用應用程式的描述和 Driver 的引用建立了一個 Application,然後開始注冊這個 Application。

注冊 Application 很簡單,就是往 Master 的記憶體中加入各種資訊,重點來了,把 ApplicationInfo 加入到了 waitingApps 這個結構裡,然後 schedule() 方法會周遊這個清單,為 Application 配置設定資源,并排程起來。

Spark 源碼(8) - Master配置設定資源并在Worker上啟動Executor ,逐行代碼注釋版
Spark 源碼(8) - Master配置設定資源并在Worker上啟動Executor ,逐行代碼注釋版

然後往 zk 中寫入了 Application 的資訊,并且往 Driver 發送了一個 RegisteredApplication 應用已經注冊的消息。

接着開始 schedule(),這個方法上次講過,它會周遊兩個清單,一個是周遊 waitingDrivers 來啟動 Driver,一個是周遊 waitingApps,來啟動 Application。

waitingDrivers 清單在用戶端請求啟動 Driver 的時候就處理過了,本次重點看這個方法:

startExecutorsOnWorkers()
           

複制

三、Master 對資源的排程

有以下幾個步驟:

  1. 周遊 waitingApps 的所有 app;
  2. 如果 app 需要的核數小于一個 Executor 可以提供的核數,就不為 app 配置設定新的 Executor;
  3. 過濾出還有可供排程的 cpu 和 memory 的 workers,并按照 cores 的大小降序排序,作為 usableWorkers;
  4. 計算所有 usableWorkers 上要配置設定多少 CPU;
  5. 然後周遊可用的 Workers,配置設定資源并執行排程,啟動 Executor。

源碼從 Master 類的 schedule() 方法的最後一行 startExecutorsOnWorkers() 開始:

這個方法主要作用是計算 worker 的 executor 數量和配置設定的資源并啟動 executor。

/**
     * Schedule and launch executors on workers
     */
    private def startExecutorsOnWorkers(): Unit = {
        // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
        // in the queue, then the second app, etc.

        for (app <- waitingApps) {
            val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
            // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
            if (app.coresLeft >= coresPerExecutor) {
                // 1. 剩餘記憶體大于單個 executor 需要的記憶體
                // 2. 剩餘的核心數大于單個 executor 需要的核心數
                // 3. 按照核心數從大到小排序
                // Filter out workers that don't have enough resources to launch an executor
                val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
                    .filter(canLaunchExecutor(_, app.desc))
                    .sortBy(_.coresFree).reverse
                val appMayHang = waitingApps.length == 1 &&
                    waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
                if (appMayHang) {
                    logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
                }
                // 計算每個 Worker 上可用的 cores
                val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
                
                // Now that we've decided how many cores to allocate on each worker, let's allocate them
                for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
                    allocateWorkerResourceToExecutors(
                        app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
                }
            }
        }
    }
    
           

複制

(1)周遊 waitingApps,如果 app 還需要的 cpu 核數大于每個執行器的核數,才繼續配置設定。

(2)過濾可用的 worker,條件一:該 worker 剩餘記憶體大于單個 executor 需要的記憶體;條件二:該 worker 剩餘 cpu 核數大于單個 executor 需要的核數;然後按照可用 cpu核數從大到小排序。

(3)下面兩個方法是關鍵的方法

scheduleExecutorsOnWorkers(),用來計算每個 Worker 上可用的 cpu 核數;

allocateWorkerResourceToExecutors() 用來真正在 Worker 上配置設定 Executor。

四、scheduleExecutorsOnWorkers 計算每個 Worker 可用的核數

這個方法很長,首先看方法注釋,大緻翻譯了一下:

當執行器配置設定的 cpu 核數(spark.executor.cores)被顯示設定的時候,如果這個 worker 上有足夠的核數和記憶體的話,那麼每個 worker 上可以執行多個執行器;反之,沒有設定的時候,每個 worker 上隻能啟動一個執行器;并且,這個執行器會使用 worker 能提供出來的盡可能多的核數;

appA 和 appB 都有一個執行器運作在 worker1 上。但是 appA 還需要一些 cpu 核,當 appB 執行結束,釋放了它在 worker1 上的核數時, 下一次排程的時候,appA 會新啟動一個 executor 獲得了 worker1 上所有的可用的核心,是以 appA 就在 worker1 上啟動了多個執行器。

設定 coresPerExecutor (spark.executor.cores)很重要,考慮下面的例子:叢集有4個worker,每個worker有16核;使用者請求 3 個執行器(spark.cores.max = 48,spark.executor.cores=16)。如果不設定這個參數,那麼每次配置設定 1 個 cpu核心,每個 worker 輪流配置設定一個 cpu核,最終 4 個執行器配置設定 12 個核心給每個 executor,4 個 worker 也同樣配置設定了48個核心,但是最終每個 executor 隻有 12核 < 16 核,是以最終沒有執行器被啟動。

如果看我的翻譯還是很費勁,我就再精簡下:

  • 如果沒有設定 spark.executor.cores,那麼每個 Worker 隻能啟動一個 Executor,并且這個 Executor 會占用所有 Worker 能提供的 cpu核數;
  • 如果顯示設定了,那麼每個 Worker 可以啟動多個 Executor;

下面是源碼,每句都有挨個注釋過,中間有一個方法是判斷這個 Worker 上還能不能再配置設定 Executor 了。

重點是中間方法後面那一段,周遊每個 Worker 配置設定 cpu,如果不是 Spend Out 模式,則在一個 Worker 上一直配置設定,直到 Worker 資源配置設定完畢。

private def scheduleExecutorsOnWorkers(
        app: ApplicationInfo,
        usableWorkers: Array[WorkerInfo],
        spreadOutApps: Boolean): Array[Int] = {
        // 每個 executor 的核數
        val coresPerExecutor = app.desc.coresPerExecutor
        // 每個 executor 的最小核數 為1
        val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
        //  每個Worker配置設定一個Executor? 這個參數可以控制這個行為
        val oneExecutorPerWorker = coresPerExecutor.isEmpty
        //  每個Executor的記憶體
        val memoryPerExecutor = app.desc.memoryPerExecutorMB
        val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
        // 可用 Worker 的總數
        
        val numUsable = usableWorkers.length
        // 給每個Worker的cores數
        val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
        // 給每個Worker上新的Executor數
        val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
        // app 需要的核心數 和 所有 worker 能提供的核心總數,取最小值
        var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

        //  判斷指定的worker是否可以為這個app啟動一個executor
        /** Return whether the specified worker can launch an executor for this app. */
        def canLaunchExecutorForApp(pos: Int): Boolean = {
            // 如果能提供的核心數 大于等 executor 需要的最小核心數,則繼續配置設定
            val keepScheduling = coresToAssign >= minCoresPerExecutor
            // 是否有足夠的核心:目前 worker 能提供的核數 減去 每個 worker 已配置設定的核心數 ,大于每個 executor最小的核心數
            val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
            // 目前 worker 新配置設定的 executor 個數
            val assignedExecutorNum = assignedExecutors(pos)

            //  如果每個worker允許多個executor,就能一直在啟動新的的executor
            //  如果在這個worker上已經有executor,則給這個executor更多的core
            // If we allow multiple executors per worker, then we can always launch new executors.
            // Otherwise, if there is already an executor on this worker, just give it more cores.

            // 如果一個 worker 上可以啟動多個 executor  或者 這個 worker 還沒配置設定 executor
            val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
            if (launchingNewExecutor) {
                // 總共已經配置設定的記憶體
                val assignedMemory = assignedExecutorNum * memoryPerExecutor
                // 是否有足夠的記憶體:目前worker 的剩餘記憶體 減去 已配置設定的記憶體 大于每個 executor需要的記憶體
                val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
                //
                val assignedResources = resourceReqsPerExecutor.map {
                    req => req.resourceName -> req.amount * assignedExecutorNum
                }.toMap
                val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
                    case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
                }
                val enoughResources = ResourceUtils.resourcesMeetRequirements(
                    resourcesFree, resourceReqsPerExecutor)
                // 所有已配置設定的核數+app需要的核數  小于 app的核數限制
                val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
                keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
            } else {
                // We're adding cores to an existing executor, so no need
                // to check memory and executor limits
                keepScheduling && enoughCores
            }
        }

        // 不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達到了這個Application的要求
        // Keep launching executors until no more workers can accommodate any
        // more executors, or if we have reached this application's limits
        // 過濾出可以啟動 executor 的 workers
        var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)

        while (freeWorkers.nonEmpty) {
            // 周遊每個 worker
            freeWorkers.foreach { pos =>
                var keepScheduling = true
                while (keepScheduling && canLaunchExecutorForApp(pos)) {
                    coresToAssign -= minCoresPerExecutor
                    assignedCores(pos) += minCoresPerExecutor

                    //  如果我們在每個worker上啟動一個executor,每次疊代為每個executor增加一個core
                    //  否則,每次疊代都會為新的executor配置設定cores
                    // If we are launching one executor per worker, then every iteration assigns 1 core
                    // to the executor. Otherwise, every iteration assigns cores to a new executor.
                    if (oneExecutorPerWorker) {
                        assignedExecutors(pos) = 1
                    } else {
                        assignedExecutors(pos) += 1
                    }

                    //  如果不使用Spreading out方法,我們會在這個worker上繼續排程executor,直到使用它所有的資源
                    //  否則,就跳轉到下一個worker
                    // Spreading out an application means spreading out its executors across as
                    // many workers as possible. If we are not spreading out, then we should keep
                    // scheduling executors on this worker until we use all of its resources.
                    // Otherwise, just move on to the next worker.
                    if (spreadOutApps) {
                        keepScheduling = false
                    }
                }
            }
            freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
        }
        assignedCores
    }
           

複制

接着真正開始在 Worker 上啟動 Executor:

Spark 源碼(8) - Master配置設定資源并在Worker上啟動Executor ,逐行代碼注釋版

在 launchExecutor 在方法裡:

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
        logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
        worker.addExecutor(exec)
        worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
            exec.application.desc, exec.cores, exec.memory, exec.resources))
        exec.application.driver.send(
            ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
    }
           

複制

給 Worker 發送了一個 LaunchExecutor 消息。

然後給執行器對應的 Driver 發送了 ExecutorAdded 消息。

五、總結

本次我們講了 Master 處理應用的注冊,重點是把 app 資訊加入到 waitingApps 清單中,然後調用 schedule() 方法,計算每個 Worker 可用的 cpu核數,并且在 Worker 上啟動執行器。