推薦閱讀
《Spark源碼精度計劃1 | SparkConf》
《Spark Core源碼精讀計劃2 | SparkContext元件初始化》
《Spark Core源碼精讀計劃3 | SparkContext輔助屬性及後初始化》
《Spark Core源碼精讀計劃4 | SparkContext提供的其他功能》
《Spark Core源碼精讀計劃5 | 事件總線及ListenerBus》
《Spark Core源碼精讀計劃6 | AsyncEventQueue與LiveListenerBus》
《Spark Core源碼精讀計劃7 | Spark執行環境的初始化》
《Spark Core源碼精讀計劃8 | SparkEnv中RPC環境的基礎建構》
《Spark Core源碼精讀計劃9 | Spark RPC環境中的消息排程邏輯》
《Spark Core源碼精讀計劃10 | NettyRpcEnv用戶端消息發送邏輯》
《Spark Core源碼精讀計劃11 | Spark廣播機制的實作》
《Spark Core源碼精讀計劃12 | Spark序列化及壓縮機制淺析》
《Spark Core源碼精讀計劃13 | 度量系統MetricsSystem的建立》
目錄
- 前言
- 建立SparkUI
- SparkContext中的操作
- 初始化SparkUI
- WebUI的具體實作
- 屬性成員和Getter方法
- WebUI提供的attach/detach類方法
- 綁定WebUI到Jetty服務
- Spark Web UI的展示
- WebUITab與WebUIPage的定義
- 渲染Spark UI頁面
- 總結
前言
我們已經在SparkEnv的世界裡摸爬滾打了很長時間,對RPC環境、廣播變量、序列化和壓縮、度量系統這幾個相對獨立的元件有了一定的了解。現在是時候抽身出來,繼續move forward,跟着SparkContext初始化的流程走下去。按照順序,本文要講的是Spark Web UI。正好,上一篇文章剛剛講過度量系統,本文可以說是水到渠成了。
Spark Web UI主要依賴于流行的Servlet容器Jetty實作,本文為避免跑題,在涉及Jetty相關細節的時候都不會詳細地展開。
建立SparkUI
由于距離講SparkContext的初始化已經過去許久了,是以先看看SparkUI在SparkContext的建立流程。
SparkContext中的操作
代碼#14.1 - SparkContext中建立SparkUI的代碼
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
None
}
_ui.foreach(_.bind())
複制
其中,_statusStore是早先初始化的AppStatusStore,它是包裝過的KVStore和AppStatusListener,前者用于存儲監控資料,後者注冊到事件總線中的appStatus隊列中。_env.securityManager則是SparkEnv中初始化的安全管理器。
SparkContext通過調用SparkUI伴生對象中的create()方法來直接new出SparkUI執行個體,然後調用bind()方法将SparkUI綁定到Jetty服務。bind()方法之後再說,現在先來看SparkUI類的事情。
初始化SparkUI
以下是SparkUI類中的屬性成員,以及構造方法。
代碼#14.2 - o.a.s.ui.SparkUI類中的成員屬性和initialize()方法
private[spark] class SparkUI private (
val store: AppStatusStore,
val sc: Option[SparkContext],
val conf: SparkConf,
securityManager: SecurityManager,
var appName: String,
val basePath: String,
val startTime: Long,
val appSparkVersion: String)
extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
conf, basePath, "SparkUI")
with Logging with UIRoot {
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
var appId: String = _
private var streamingJobProgressListener: Option[SparkListener] = None
def initialize(): Unit = {
val jobsTab = new JobsTab(this, store)
attachTab(jobsTab)
val stagesTab = new StagesTab(this, store)
attachTab(stagesTab)
attachTab(new StorageTab(this, store))
attachTab(new EnvironmentTab(this, store))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
}
initialize()
}
複制
SparkUI類中有3個屬性成員:
- killEnabled由配置項spark.ui.killEnabled控制,如果為true,會在UI界面中展示強行殺掉Spark Job的開關。
- appId就是目前的Application ID。
- streamingJobProgressListener是用于Spark Streaming作業進度的監聽器。
在initialize()方法中,首先建立了5個Tab,并調用了attachTab()方法注冊到Web UI。所謂Tab就是Spark UI中的标簽頁,如下圖中最上面的一欄所示,名稱也是一一對應的。
圖#14.1 - Spark Web UI頁面
接下來,調用createStaticHandler()方法建立靜态資源的ServletContextHandler,又調用createRedirectHandler()建立一些重定向的ServletContextHandler。【插一句:ServletContextHandler是Jetty中一個功能完善的處理器,負責接收并處理HTTP請求,再投遞給Servlet。】最後,逐一調用attachHandler()方法注冊到Web UI。
那麼上面的這一系列方法(也包含上一節的bind()方法)是哪兒來的呢?答案是WebUI抽象類,也就是SparkUI的基類。下面來閱讀它的源碼。
WebUI的具體實作
WebUI是Spark裡所有可以在浏覽器中展示的内容的頂級元件,是以SparkUI類也會繼承它。
屬性成員和Getter方法
代碼#14.3 - o.a.s.ui.WebUI類的屬性成員和Getter方法
protected val tabs = ArrayBuffer[WebUITab]()
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
conf.get(DRIVER_HOST_ADDRESS))
private val className = Utils.getFormattedClassName(this)
def getBasePath: String = basePath
def getTabs: Seq[WebUITab] = tabs
def getHandlers: Seq[ServletContextHandler] = handlers
def getSecurityManager: SecurityManager = securityManager
複制
屬性成員有以下6個。
- tabs:持有WebUITab(即圖#14.1中的标簽頁)的緩存。
- handlers:持有Jetty ServletContextHandler的緩存。
- pageToHandlers:儲存WebUIPage(WebUITab的下一級元件)與其對應的ServletContextHandler的映射關系。
- serverInfo:目前Web UI對應的Jetty伺服器資訊。
- publicHostName:目前Web UI對應的Jetty服務主機名。先通過系統環境變量SPARK_PUBLIC_DNS擷取,再通過spark.driver.host配置項擷取。
- className:目前類的名稱,用Utils.getFormattedClassName()方法格式化過。
Getter方法有4個,getTabs()和getHandlers()都是簡單地獲得對應屬性的值。getBasePath()取得構造參數中定義的Web UI基路徑,getSecurityManager()則取得構造參數中傳入的安全管理器。
WebUI提供的attach/detach類方法
這類方法都是成對的,一共有3對:attachTab()/detachTab(),用于注冊和移除WebUITab;attachPage()/detachPage(),用于注冊和移除WebUIPage;attachHandler()/detachHandler(),用于注冊和移除ServletContextHandler。以下是它們的代碼。
代碼#14.4 - WebUI提供的attach/detach類方法
def attachTab(tab: WebUITab) {
tab.pages.foreach(attachPage)
tabs += tab
}
def detachTab(tab: WebUITab) {
tab.pages.foreach(detachPage)
tabs -= tab
}
def detachPage(page: WebUIPage) {
pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
}
def attachPage(page: WebUIPage) {
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, conf, basePath)
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
handlers += renderHandler
}
def attachHandler(handler: ServletContextHandler) {
handlers += handler
serverInfo.foreach(_.addHandler(handler))
}
def detachHandler(handler: ServletContextHandler) {
handlers -= handler
serverInfo.foreach(_.removeHandler(handler))
}
複制
看起來并不難了解,我們就來讀讀其中最長的attachPage()方法。它的流程是:調用Jetty工具類JettyUtils的createServletHander()方法,為WebUIPage的兩個渲染方法render()和renderJson()建立ServletContextHandler,也就是一個WebUIPage需要對應兩個處理器。然後,調用上述attachHandler()方法向Jetty注冊處理器,并将映射關系寫入handlers結構中。綁定WebUI到Jetty服務
這裡就是在前一章節提到的bind()方法了。
代碼#14.5 - o.a.s.ui.WebUI.bind()方法
def bind(): Unit = {
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
try {
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name))
logInfo(s"Bound $className to $host, and started at $webUrl")
} catch {
case e: Exception =>
logError(s"Failed to bind $className", e)
System.exit(1)
}
}
複制
該方法調用了JettyUtils.startJettyServer()方法來啟動Jetty服務,具體不再贅述。
Spark Web UI的展示
Spark Web UI實際上是一個三層的樹形結構,根節點為WebUI,中層節點為WebUITab,葉子節點為WebUIPage。UI界面的展示就主要靠WebUITab與WebUIPage來實作。在Spark UI界面中,一個Tab可以包含一個或多個Page,并且Tab是可選的。
WebUITab與WebUIPage的定義
以下是WebUITab的代碼。
代碼#14.6 - o.a.s.ui.WebUITab抽象類
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
val pages = ArrayBuffer[WebUIPage]()
val name = prefix.capitalize
def attachPage(page: WebUIPage) {
page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
pages += page
}
def headerTabs: Seq[WebUITab] = parent.getTabs
def basePath: String = parent.getBasePath
}
複制
由于一個Tab可以包含多個Page,是以pages數組就用來緩存該Tab下所有的Page。attachPage()方法就用于将Tab的路徑字首與Page的路徑字首拼合起來,并将其加入pages數組中。
WebUIPage抽象類的定義更加簡單,隻有兩個方法,前面已經出現過。render()方法用于渲染頁面,renderJson()方法則用于生成對應的JSON串,代碼就不再貼出來了。
WebUITab與WebUIPage各有很多的實作類,分别對應一個Tab或一個Page。本來想拿IDEA生成兩張類圖,但是不知為何,所有表示繼承關系的箭頭都顯示不出來(可能IDEA對Scala的支援仍然不是很好吧),隻得作罷。最後,我們來看看Spark UI上的内容是怎樣展示出來的。
渲染Spark UI頁面
我們以Environment這一頁為例來探索,因為它的頁面元素相當簡單,隻是展示許多環境資訊(如Spark配置、系統屬性、JVM資訊、Classpath等等)的表格,幹擾比較少。其頁面本身如下圖所示。
圖#14.2 - Spark UI Environment頁
首先來看EnvironmentTab的代碼,非常簡單。
代碼#14.7 - o.a.s.ui.env.EnvironmentTab類
private[ui] class EnvironmentTab(
parent: SparkUI,
store: AppStatusStore) extends SparkUITab(parent, "environment") {
attachPage(new EnvironmentPage(this, parent.conf, store))
}
複制
其中SparkUITab就是對WebUITab的簡單封裝,加上了Application名稱和Spark版本的屬性。EnvironmentTab類隻有構造方法,調用代碼#14.6中預先定義好的attachPage()方法,将EnvironmentPage加入。以下則是EnvironmentPage的具體實作。
代碼#14.8 - o.a.s.ui.env.EnvironmentPage類
private[ui] class EnvironmentPage(
parent: EnvironmentTab,
conf: SparkConf,
store: AppStatusStore) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val appEnv = store.environmentInfo()
val jvmInformation = Map(
"Java Version" -> appEnv.runtime.javaVersion,
"Java Home" -> appEnv.runtime.javaHome,
"Scala Version" -> appEnv.runtime.scalaVersion)
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow,
Utils.redact(conf, appEnv.sparkProperties.toSeq), fixedWidth = true)
val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, appEnv.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
classPathHeaders, classPathRow, appEnv.classpathEntries, fixedWidth = true)
val content =
<span>
<h4>Runtime Information</h4> {runtimeInformationTable}
<h4>Spark Properties</h4> {sparkPropertiesTable}
<h4>System Properties</h4> {systemPropertiesTable}
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>
UIUtils.headerSparkPage("Environment", content, parent)
}
private def propertyHeader = Seq("Name", "Value")
private def classPathHeaders = Seq("Resource", "Source")
private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}
複制
render()方法用來渲染頁面内容,其流程如下:
- 從AppStatusStore中取得所有環境資訊。
- 調用UIUtils.listingTable()方法,将對應的表頭與添加了HTML标簽的行封裝成表格。
- 将4張表格排列好,調用UIUtils.headerSparkPage()方法,按照規定好的頁面布局展示在浏覽器上。
這樣,圖#14.2的頁面就顯示出來了。
總結
本文從SparkContext中對Spark UI的初始化入手,首先介紹了SparkUI類的具體構造。然後分析了SparkUI的基類WebUI的具體實作,明确了整個UI界面的組成部分。最後簡要介紹WebUITab與WebUIPage,并以Spark UI中的Environment頁為例,分析了頁面的展示流程。
— THE END —