天天看點

源碼- Spark中Worker源碼分析(二)

繼續前一篇的内容。前一篇内容為:

Spark中Worker源碼分析(一)http://www.cnblogs.com/yourarebest/p/5300202.html

4.receive方法,

receive方法主要分為以下14種情況:

(1)worker向master注冊成功後,詳見代碼

(2)worker向master發送心跳消息,如果還沒有注冊到master上,該消息将被忽略,詳見代碼

(3)worker的工作空間的清理,詳見代碼

(4)更換master,詳見代碼

(5)worker注冊失敗,詳見代碼

(6)再次連接配接worker,詳見代碼

(7)建立executor,詳見代碼

(8)executor的轉态發生改變時,詳見代碼

(9)kill executor,詳見代碼

(10)建立driver,詳見代碼

(11)kill driver,詳見代碼

(12)driver的狀态發生變化時,詳見代碼

(13)将worker注冊到master上,詳見代碼

(14)app執行完畢,詳見代碼

worker與master相關的互動為(1)(2)(4)(6)(13)

worker與driver相關的互動為(10)(11)(12)

worker與executor相關的互動為(3)(7)(8)(9)(14),需要說明的是(3)(14)它們的完成都與executor有着密切的聯系。

<code>

override def receive: PartialFunction[Any, Unit] = {

    //(1)注冊成功的Woker

    case RegisteredWorker(masterRef, masterWebUiUrl) =>

      logInfo("Successfully registered with master " + masterRef.address.toSparkURL)

      registered = true

      changeMaster(masterRef, masterWebUiUrl)

      //守護線程15s發送一次心跳消息

      forwordMessageScheduler.scheduleAtFixedRate(new Runnable {

        override def run(): Unit = Utils.tryLogNonFatalError {

          self.send(SendHeartbeat)

        }

      }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)

      //如果允許清理

      if (CLEANUP_ENABLED) {

        logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")

        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {

          override def run(): Unit = Utils.tryLogNonFatalError {

            //守護線程30min清理app檔案夾

            self.send(WorkDirCleanup)

          }

        }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)

      }

    //(2)worker向master發送心跳消息,如果還沒有注冊到master上,該消息将被忽略

    case SendHeartbeat =>

      if (connected) { sendToMaster(Heartbeat(workerId, self)) }

    //(3)worker的工作空間的清理

    case WorkDirCleanup =>

         //為了加快獨立将來獨立線程的清理工作,不要占用worker rpcEndpoint的端口号,拷貝ids是以它可以被清理線程使用

      val appIds = executors.values.map(_.appId).toSet

      val cleanupFuture = concurrent.future {

        val appDirs = workDir.listFiles()

        if (appDirs == null) {

          throw new IOException("ERROR: Failed to list files in " + appDirs)

        }

        appDirs.filter { dir =>

          //目錄正在被app使用-當清理時檢查app是否在運作

          val appIdFromDir = dir.getName

          val isAppStillRunning = appIds.contains(appIdFromDir)

          dir.isDirectory && !isAppStillRunning &&

          !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)

        }.foreach { dir =>

          logInfo(s"Removing directory: ${dir.getPath}")

          Utils.deleteRecursively(dir)

        }

      }(cleanupThreadExecutor)

      cleanupFuture.onFailure {

        case e: Throwable =>

          logError("App dir cleanup failed: " + e.getMessage, e)

      }(cleanupThreadExecutor)

    //(4)更換master

    case MasterChanged(masterRef, masterWebUiUrl) =>

      logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)

      changeMaster(masterRef, masterWebUiUrl)

      val execs = executors.values.

        map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))

      masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))

    //(5)worker注冊失敗

    case RegisterWorkerFailed(message) =>

      if (!registered) {

        logError("Worker registration failed: " + message)

        System.exit(1)

      }

    //(6)再次連接配接Worker

    case ReconnectWorker(masterUrl) =>

      logInfo(s"Master with url $masterUrl requested this worker to reconnect.")

      //再次将worker注冊到masters上

      registerWithMaster()

    //(7)建立Executor

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>if (masterUrl != activeMasterUrl) {

        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")

      } else {

        try {

          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          //建立executor的工作目錄

          val executorDir = new File(workDir, appId + "/" + execId)

          if (!executorDir.mkdirs()) {

            throw new IOException("Failed to create directory " + executorDir)

          }

          //為executors建立本地目錄,通過SPARK_EXECUTOR_DIRS環境變量設定,當app執行完後并删除

          val appLocalDirs = appDirectories.get(appId).getOrElse {

            Utils.getOrCreateLocalRootDirs(conf).map { dir =>

              val appDir = Utils.createDirectory(dir, namePrefix = "executor")

              Utils.chmod700(appDir)

              appDir.getAbsolutePath()

            }.toSeq

          }

          appDirectories(appId) = appLocalDirs

          val manager = new ExecutorRunner(

            appId,

            execId,

            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),

            cores_,

            memory_,

            self,

            workerId,

            host,

            webUi.boundPort,

            publicAddress,

            sparkHome,

            executorDir,

            workerUri,

            conf,

            appLocalDirs, ExecutorState.LOADING)

          executors(appId + "/" + execId) = manager

          manager.start()

          coresUsed += cores_

          memoryUsed += memory_

          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

        } catch {

          case e: Exception => {

            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)

            if (executors.contains(appId + "/" + execId)) {

              executors(appId + "/" + execId).kill()

              executors -= appId + "/" + execId

            }

            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,

              Some(e.toString), None))

          }

        }

      }

    //(8)executor的轉态發生改變時

    case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>

      handleExecutorStateChanged(executorStateChanged)

    //(9)kill executor

    case KillExecutor(masterUrl, appId, execId) =>

      if (masterUrl != activeMasterUrl) {

        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)

      } else {

        val fullId = appId + "/" + execId

        executors.get(fullId) match {

          case Some(executor) =>

            logInfo("Asked to kill executor " + fullId)

            executor.kill()

          case None =>

            logInfo("Asked to kill unknown executor " + fullId)

        }

      }

    //(10)建立Driver

    case LaunchDriver(driverId, driverDesc) => {

      logInfo(s"Asked to launch driver $driverId")

      val driver = new DriverRunner(

        conf,

        driverId,

        workDir,

        sparkHome,

        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),

        self,

        workerUri,

        securityMgr)

      drivers(driverId) = driver

      driver.start(

      coresUsed += driverDesc.cores

      memoryUsed += driverDesc.mem

    }

    //(11)kill Driver

    case KillDriver(driverId) => {

      logInfo(s"Asked to kill driver $driverId")

      drivers.get(driverId) match {

        case Some(runner) =>

          runner.kill()

        case None =>

          logError(s"Asked to kill unknown driver $driverId")

      }

    }

    //(12)driver的狀态發生變化時

    case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {

      handleDriverStateChanged(driverStateChanged)

    }

    //(13)将worker注冊到master上

    case ReregisterWithMaster =>

      reregisterWithMaster()

    //(14)app執行完畢

    case ApplicationFinished(id) =>

      finishedApps += id

      //删除執行完的app在執行過程中建立的本地檔案

      maybeCleanupApplication(id)

  }

</code>

繼續閱讀