一、回顧一下之前的内容
上一次閱讀到了 SparkContext 初始化,繼續往下之前,先溫故一下之前的内容。
這裡有個假設是:Spark 叢集以 Standalone 的方式來啟動的,作業也是送出到 Spark standalone 叢集。
首先需要啟動 Spark 叢集,使用 start-all.sh 腳本依次啟動 Master (主備) 和多個 Worker。
啟動好之後,開始送出作業,使用 spark-submit 指令來送出。
- 首先在送出任務的機器上使用 java 指令啟動了一個虛拟機,并且執行了主類 SparkSubmit 的 main 方法作為入口。
- 然後根據送出到不同的叢集,來 new 不同的用戶端類,如果是 standalone 的話,就 new 了一個 ClientApp;然後把 java DriverWrapper 這個指令封裝到 RequestSubmmitDriver 消息中,把這個消息發送給 Master;
- Master 随機找一個滿足資源條件的 Worker 來啟動 Driver,實際上是在虛拟機裡執行 DriverWrapper 的 main 方法;
- 然後 Worker 開始啟動 Driver,啟動的時候會執行使用者送出的 java 包裡的 main 方法,然後開始執行 SparkContext 的初始化,依次在 Driver 中建立了 DAGScheduler、TaskScheduler、SchedulerBackend 三個重要的執行個體。并且啟動了 DriverEndpoint 和 ClientEndpoint ,用來和 Worker、Master 通信。
二、Master 處理應用的注冊
接着上次 ClientEndpoint 啟動之後,會向 Master 發送一個 RegisterApplication 消息,Master 開始處理這個消息。
然後看到 Matster 類處理 RegisterApplication 消息的地方:
可以看到,用應用程式的描述和 Driver 的引用建立了一個 Application,然後開始注冊這個 Application。
注冊 Application 很簡單,就是往 Master 的記憶體中加入各種資訊,重點來了,把 ApplicationInfo 加入到了 waitingApps 這個結構裡,然後 schedule() 方法會周遊這個清單,為 Application 配置設定資源,并排程起來。
然後往 zk 中寫入了 Application 的資訊,并且往 Driver 發送了一個 RegisteredApplication 應用已經注冊的消息。
接着開始 schedule(),這個方法上次講過,它會周遊兩個清單,一個是周遊 waitingDrivers 來啟動 Driver,一個是周遊 waitingApps,來啟動 Application。
waitingDrivers 清單在用戶端請求啟動 Driver 的時候就處理過了,本次重點看這個方法:
startExecutorsOnWorkers()
複制
三、Master 對資源的排程
有以下幾個步驟:
- 周遊 waitingApps 的所有 app;
- 如果 app 需要的核數小于一個 Executor 可以提供的核數,就不為 app 配置設定新的 Executor;
- 過濾出還有可供排程的 cpu 和 memory 的 workers,并按照 cores 的大小降序排序,作為 usableWorkers;
- 計算所有 usableWorkers 上要配置設定多少 CPU;
- 然後周遊可用的 Workers,配置設定資源并執行排程,啟動 Executor。
源碼從 Master 類的 schedule() 方法的最後一行 startExecutorsOnWorkers() 開始:
這個方法主要作用是計算 worker 的 executor 數量和配置設定的資源并啟動 executor。
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
if (app.coresLeft >= coresPerExecutor) {
// 1. 剩餘記憶體大于單個 executor 需要的記憶體
// 2. 剩餘的核心數大于單個 executor 需要的核心數
// 3. 按照核心數從大到小排序
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, app.desc))
.sortBy(_.coresFree).reverse
val appMayHang = waitingApps.length == 1 &&
waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
if (appMayHang) {
logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
}
// 計算每個 Worker 上可用的 cores
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
複制
(1)周遊 waitingApps,如果 app 還需要的 cpu 核數大于每個執行器的核數,才繼續配置設定。
(2)過濾可用的 worker,條件一:該 worker 剩餘記憶體大于單個 executor 需要的記憶體;條件二:該 worker 剩餘 cpu 核數大于單個 executor 需要的核數;然後按照可用 cpu核數從大到小排序。
(3)下面兩個方法是關鍵的方法
scheduleExecutorsOnWorkers(),用來計算每個 Worker 上可用的 cpu 核數;
allocateWorkerResourceToExecutors() 用來真正在 Worker 上配置設定 Executor。
四、scheduleExecutorsOnWorkers 計算每個 Worker 可用的核數
這個方法很長,首先看方法注釋,大緻翻譯了一下:
當執行器配置設定的 cpu 核數(spark.executor.cores)被顯示設定的時候,如果這個 worker 上有足夠的核數和記憶體的話,那麼每個 worker 上可以執行多個執行器;反之,沒有設定的時候,每個 worker 上隻能啟動一個執行器;并且,這個執行器會使用 worker 能提供出來的盡可能多的核數;
appA 和 appB 都有一個執行器運作在 worker1 上。但是 appA 還需要一些 cpu 核,當 appB 執行結束,釋放了它在 worker1 上的核數時, 下一次排程的時候,appA 會新啟動一個 executor 獲得了 worker1 上所有的可用的核心,是以 appA 就在 worker1 上啟動了多個執行器。
設定 coresPerExecutor (spark.executor.cores)很重要,考慮下面的例子:叢集有4個worker,每個worker有16核;使用者請求 3 個執行器(spark.cores.max = 48,spark.executor.cores=16)。如果不設定這個參數,那麼每次配置設定 1 個 cpu核心,每個 worker 輪流配置設定一個 cpu核,最終 4 個執行器配置設定 12 個核心給每個 executor,4 個 worker 也同樣配置設定了48個核心,但是最終每個 executor 隻有 12核 < 16 核,是以最終沒有執行器被啟動。
如果看我的翻譯還是很費勁,我就再精簡下:
- 如果沒有設定 spark.executor.cores,那麼每個 Worker 隻能啟動一個 Executor,并且這個 Executor 會占用所有 Worker 能提供的 cpu核數;
- 如果顯示設定了,那麼每個 Worker 可以啟動多個 Executor;
下面是源碼,每句都有挨個注釋過,中間有一個方法是判斷這個 Worker 上還能不能再配置設定 Executor 了。
重點是中間方法後面那一段,周遊每個 Worker 配置設定 cpu,如果不是 Spend Out 模式,則在一個 Worker 上一直配置設定,直到 Worker 資源配置設定完畢。
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
// 每個 executor 的核數
val coresPerExecutor = app.desc.coresPerExecutor
// 每個 executor 的最小核數 為1
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
// 每個Worker配置設定一個Executor? 這個參數可以控制這個行為
val oneExecutorPerWorker = coresPerExecutor.isEmpty
// 每個Executor的記憶體
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
// 可用 Worker 的總數
val numUsable = usableWorkers.length
// 給每個Worker的cores數
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
// 給每個Worker上新的Executor數
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
// app 需要的核心數 和 所有 worker 能提供的核心總數,取最小值
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
// 判斷指定的worker是否可以為這個app啟動一個executor
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutorForApp(pos: Int): Boolean = {
// 如果能提供的核心數 大于等 executor 需要的最小核心數,則繼續配置設定
val keepScheduling = coresToAssign >= minCoresPerExecutor
// 是否有足夠的核心:目前 worker 能提供的核數 減去 每個 worker 已配置設定的核心數 ,大于每個 executor最小的核心數
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// 目前 worker 新配置設定的 executor 個數
val assignedExecutorNum = assignedExecutors(pos)
// 如果每個worker允許多個executor,就能一直在啟動新的的executor
// 如果在這個worker上已經有executor,則給這個executor更多的core
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
// 如果一個 worker 上可以啟動多個 executor 或者 這個 worker 還沒配置設定 executor
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
if (launchingNewExecutor) {
// 總共已經配置設定的記憶體
val assignedMemory = assignedExecutorNum * memoryPerExecutor
// 是否有足夠的記憶體:目前worker 的剩餘記憶體 減去 已配置設定的記憶體 大于每個 executor需要的記憶體
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
//
val assignedResources = resourceReqsPerExecutor.map {
req => req.resourceName -> req.amount * assignedExecutorNum
}.toMap
val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
}
val enoughResources = ResourceUtils.resourcesMeetRequirements(
resourcesFree, resourceReqsPerExecutor)
// 所有已配置設定的核數+app需要的核數 小于 app的核數限制
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}
// 不斷的啟動executor,直到不再有Worker可以容納任何Executor,或者達到了這個Application的要求
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
// 過濾出可以啟動 executor 的 workers
var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
while (freeWorkers.nonEmpty) {
// 周遊每個 worker
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutorForApp(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// 如果我們在每個worker上啟動一個executor,每次疊代為每個executor增加一個core
// 否則,每次疊代都會為新的executor配置設定cores
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// 如果不使用Spreading out方法,我們會在這個worker上繼續排程executor,直到使用它所有的資源
// 否則,就跳轉到下一個worker
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
}
assignedCores
}
複制
接着真正開始在 Worker 上啟動 Executor:
在 launchExecutor 在方法裡:
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id,
exec.application.desc, exec.cores, exec.memory, exec.resources))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
複制
給 Worker 發送了一個 LaunchExecutor 消息。
然後給執行器對應的 Driver 發送了 ExecutorAdded 消息。
五、總結
本次我們講了 Master 處理應用的注冊,重點是把 app 資訊加入到 waitingApps 清單中,然後調用 schedule() 方法,計算每個 Worker 可用的 cpu核數,并且在 Worker 上啟動執行器。