天天看點

深度剖析Spark分布式執行原理

讓代碼分布式運作是所有分布式計算架構需要解決的最基本的問題。

Spark是大資料領域中相當火熱的計算架構,在大資料分析領域有一統江湖的趨勢,網上對于Spark源碼分析的文章有很多,但是介紹Spark如何處理代碼分布式執行問題的資料少之又少,這也是我撰寫文本的目的。

Spark運作在JVM之上,任務的執行依賴序列化及類加載機制,是以本文會重點圍繞這兩個主題介紹Spark對代碼分布式執行的處理。本文假設讀者對Spark、Java、Scala有一定的了解,代碼示例基于Scala,Spark源碼基于2.1.0版本。閱讀本文你可以了解到:

  • Java對象序列化機制
  • 類加載器的作用
  • Spark對closure序列化的處理
  • Spark Application的class是如何加載的
  • Spark REPL(spark-shell)中的代碼是如何分布式執行的

根據以上内容,讀者可以基于JVM相關的語言建構一個自己的分布式計算服務架構。

Java對象序列化

序列化(Serialization)是将對象的狀态資訊轉換為可以存儲或傳輸的形式的過程。所謂的狀态資訊指的是對象在記憶體中的資料,Java中一般指對象的字段資料。我們開發Java應用的時候或多或少都處理過對象序列化,對象常見的序列化形式有JSON、XML等。

JDK中内置一個

ObjectOutputStream

類可以将對象序列化為二進制資料,使用

ObjectOutputStream

序列化對象時,要求對象所屬的類必須實作

java.io.Serializable

接口,否則會報

java.io.NotSerializableException

的異常。

基本的概念先介紹到這。接下來我們一起探讨一個問題:Java的方法能否被序列化?

假設我們有如下的

SimpleTask

類(Java類):

import java.io.Serializable;

public abstract class Task implements Serializable {
    public void run() {
        System.out.println("run task!");
    }
}

public class SimpleTask extends Task {
    @Override
    public void run() {
        System.out.println("run simple task!");
    }
}
           

還有一個用于将對象序列化到檔案的工具類

FileSerializer

import java.io.{FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

object FileSerializer {

  def writeObjectToFile(obj: Object, file: String) = {
    val fileStream = new FileOutputStream(file)
    val oos = new ObjectOutputStream(fileStream)
    oos.writeObject(obj)
    oos.close()
  }

  def readObjectFromFile(file: String): Object = {
    val fileStream = new FileInputStream(file)
    val ois = new ObjectInputStream(fileStream)
    val obj = ois.readObject()
    ois.close()
    obj
  }
}
           

簡單起見,我們采用将對象序列化到檔案,然後通過反序列化執行的方式來模拟代碼的分布式執行。SimpleTask就是我們需要模拟分布式執行的代碼。我們先将

SimpleTask

序列化到檔案中:

val task = new SimpleTask()
FileSerializer.writeObjectToFile(task, "task.ser")
           

然後将

SimpleTask

類從我們的代碼中删除,此時隻有

task.ser

檔案中含有task對象的序列化資料。接下來我們執行下面的代碼:

val task = FileSerializer.readObjectFromFile("task.ser").asInstanceOf[Task]
task.run()
           

請各位讀者思考,上面的代碼執行後會出現什麼樣的結果?

  • 輸出:

    run simple task!

    ?
  • run task!

  • 還是會報錯?

實際執行會出現形如下面的異常:

Exception in thread "main" java.lang.ClassNotFoundException: site.stanzhai.serialization.SimpleTask
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
	at site.stanzhai.serialization.FileSerializer$.readObjectFromFile(FileSerializer.scala:20)
           

從異常資訊來看,反序列過程中找不到

SimpleTask

類。由此可以推斷序列化後的資料是不包含類的定義資訊的。那麼,

ObjectOutputStream

到底序列化了哪些資訊呢?

ObjectOutputStream

實作機制感興趣的同學可以去看下JDK中這個類的實作,

ObjectOutputStream

序列化對象時,從父類的資料開始序列化到子類,如果override了writeObject方法,會反射調用writeObject來序列化資料。序列化的資料會按照以下的順序以二進制的形式輸出到OutputStream中:

  1. 類的descriptor(僅僅是類的描述資訊,不包含類的定義)
  2. 對象的primitive類型資料(int,boolean等,String和Array是特殊處理的)
  3. 對象的其他obj資料

回到我們的問題上:Java的方法能否被序列化?通過我們代碼示例及分析,想必大家對這個問題應該清楚了。通過

ObjectOutputStream

序列化對象,僅包含類的描述(而非定義),對象的狀态資料,由于缺少類的定義,也就是缺少

SimpleTask

的位元組碼,反序列化過程中就會出現ClassNotFound的異常。

如何讓我們反序列化的對象能正常使用呢?我們還需要了解類加載器。

類加載器:ClassLoader

ClassLoader在Java中是一個抽象類,ClassLoader的作用是加載類,給定一個類名,ClassLoader會嘗試查找或生成類的定義,一種典型的加載政策是将類名對應到檔案名上,然後從檔案系統中加載class file。

在我們的示例中,反序列化

SimpleTask

失敗,是因為JVM找不到類的定義,是以要確定正常反序列化,我們必須将

SimpleTask

的class檔案儲存下來,反序列化的時候能夠讓ClassLoader加載到

SimpleTask

的class。

接下來,我們對代碼做一些改造,添加一個

ClassManipulator

類,用于将對象的class檔案導出到目前目錄的檔案中,預設的檔案名就是對象的類名(不含包名):

object ClassManipulator {
  def saveClassFile(obj: AnyRef): Unit = {
    val classLoader = obj.getClass.getClassLoader
    val className = obj.getClass.getName
    val classFile = className.replace('.', '/') + ".class"
    val stream = classLoader.getResourceAsStream(classFile)

    // just use the class simple name as the file name
    val outputFile = className.split('.').last + ".class"
    val fileStream = new FileOutputStream(outputFile)
    var data = stream.read()
    while (data != -1) {
      fileStream.write(data)
      data = stream.read()
    }
    fileStream.flush()
    fileStream.close()
  }
}
           

按照JVM的規範,假設對

package.Simple

這樣的一個類編譯,編譯後的class檔案為

package/Simple.class

,是以我們可以根據路徑規則,從目前JVM程序的Resource中得到指定類的class資料。

在删除

SimpleTask

前,我們除了将task序列化到檔案外,還需要将task的class檔案儲存起來,執行完下面的代碼,

SimpleTask

類就可以從代碼中剔除了:

val task = new SimpleTask()
FileSerializer.writeObjectToFile(task, "task.ser")
ClassManipulator.saveClassFile(task)
           

由于我們儲存class檔案的方式比較特殊,既不在jar包中,也不是按package/ClassName.class這種标準的儲存方式,是以還需要實作一個自定義的

FileClassLoader

按照我們儲存class檔案的方式來加載所需的類:

class FileClassLoader() extends ClassLoader {
  override def findClass(fullClassName: String): Class[_] = {
    val file = fullClassName.split('.').last + ".class"
    val in = new FileInputStream(file)
    val bos = new ByteArrayOutputStream
    val bytes = new Array[Byte](4096)
    var done = false
    while (!done) {
      val num = in.read(bytes)
      if (num >= 0) {
        bos.write(bytes, 0, num)
      } else {
        done = true
      }
    }
    val data = bos.toByteArray
    defineClass(fullClassName, data, 0, data.length)
  }
}
           

ObjectInputStream

類用于對象的反序列化,在反序列化過程中,它根據序列化資料中類的descriptor資訊,調用

resolveClass

方法加載對應的類,但是通過

Class.forName

加載class使用的并不是我們自定義的

FileClassLoader

,是以如果直接使用

ObjectInputStream

進行反序列,依然會因為找不到類而報錯,下面是

resolveClass

的源碼:

protected Class<?> resolveClass(ObjectStreamClass desc)
    throws IOException, ClassNotFoundException
{
    String name = desc.getName();
    try {
        return Class.forName(name, false, latestUserDefinedLoader());
    } catch (ClassNotFoundException ex) {
        Class<?> cl = primClasses.get(name);
        if (cl != null) {
            return cl;
        } else {
            throw ex;
        }
    }
}
           

為了能讓

ObjectInputStream

在序列化的過程中使用我們自定義的ClassLoader,我們還需要對

FileSerializer

中的

readObjectFromFile

方法做些改造,修改的代碼如下:

def readObjectFromFile(file: String, classLoader: ClassLoader): Object = {
  val fileStream = new FileInputStream(file)
  val ois = new ObjectInputStream(fileStream) {
    override def resolveClass(desc: ObjectStreamClass): Class[_] =
      Class.forName(desc.getName, false, classLoader)
  }
  val obj = ois.readObject()
  ois.close()
  obj
}
           

最後,我們将反序列化的代碼調整為:

val fileClassLoader = new FileClassLoader()
val task = FileSerializer.readObjectFromFile("task.ser", fileClassLoader).asInstanceOf[Task]
task.run()
           

反序列化的過程中能夠通過fileClassLoader加載到所需的類,這樣我們在執行就不會出錯了,最終的執行結果為:

run simple task!

。到此為止,我們已經完整地模拟了代碼分布式執行的過程。完整的示例代碼,請參閱:https://github.com/stanzhai/jvm-exercise/tree/master/src/main/scala/site/stanzhai/exercise/serialization

我們依然通過一個示例,快速了解下Scala對閉包的處理,下面是從Scala的REPL中執行的代碼:

scala> val n = 2
n: Int = 2

scala> val f = (x: Int) => x * n
f: Int => Int = <function1>

scala> Seq.range(0, 5).map(f)
res0: Seq[Int] = List(0, 2, 4, 6, 8)
           

f

是采用Scala的

=>

文法糖定義的一個閉包,為了弄清楚Scala是如何處理閉包的,我們繼續執行下面的代碼:

scala> f.getClass
res0: Class[_ <: Int => Int] = class $anonfun$1

scala> f.isInstanceOf[Function1[Int, Int]]
res1: Boolean = true

scala> f.isInstanceOf[Serializable]
res2: Boolean = true
           

可以看出

f

對應的類為

$anonfun$1

Function1[Int, Int]

的子類,而且實作了

Serializable

接口,這說明

f

是可以被序列化的。

Spark對于資料的處理基本都是基于閉包,下面是一個簡單的Spark分布式處理資料的代碼片段:

val spark = SparkSession.builder().appName("demo").master("local").getOrCreate()
val sc = spark.sparkContext
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val sum = distData.map(x => x * 2).sum()
println(sum)  // 30.0
           

對于

distData.map(x => x * 2)

,map中傳的一個匿名函數,也是一個非常簡單的閉包,對

distData

中的每個元素*2,我們知道對于這種形式的閉包,Scala編譯後是可以序列化的,是以我們的代碼能正常執行也合情合理。将入我們将處理函數的閉包定義到一個類中,然後将代碼改造為如下形式:

class Operation {
  val n = 2
  def multiply = (x: Int) => x * n
}
...
val sum = distData.map(new Operation().multiply).sum()
...
           

我們在去執行,會出現什麼樣的結果呢?實際執行會出現這樣的異常:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	...
Caused by: java.io.NotSerializableException: Operation
           

Scala在構造閉包的時候會确定他所依賴的外部變量,并将它們的引用存到閉包對象中,這樣能保證在不同的作用域中調用閉包不出現問題。

出現

Task not serializable

的異常,是由于我們的

multiply

函數依賴

Operation

類的變量

n

,雖然multiply是支援序列化的,但是

Operation

不支援序列化,這導緻

multiply

函數在序列化的過程中出現了

NotSerializable

的異常,最終導緻我們的Task序列化失敗。為了確定

multiply

能被正常序列化,我們需要想辦法去除對

Operation

的依賴,我們将代碼做如下修改,在去執行就可以了:

class Operation {
  def multiply = (x: Int) => x * 2
}
...
val sum = distData.map(new Operation().multiply).sum()
...
           

Spark對閉包序列化前,會通過工具類

org.apache.spark.util.ClosureCleaner

嘗試clean掉閉包中無關的外部對象引用,

ClosureCleaner

對閉包的處理是在運作期間,相比Scala編譯器,能更精準的去除閉包中無關的引用。這樣做,一方面可以盡可能保證閉包可被序列化,另一方面可以減少閉包序列化後的大小,便于網絡傳輸。

我們在開發Spark應用的時候,如果遇到

Task not serializable

的異常,就需要考慮下,閉包中是否或引用了無法序列化的對象,有的話,嘗試去除依賴就可以了。

Spark中實作的序列化工具有多個:

深度剖析Spark分布式執行原理

SparkEnv

類的實作來看,用于閉包序列化的是

JavaSerializer

:

深度剖析Spark分布式執行原理

JavaSerializer

内部使用的是

ObjectOutputStream

将閉包序列化:

private[spark] class JavaSerializationStream(
    out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
  extends SerializationStream {
  private val objOut = new ObjectOutputStream(out)
  ...
}
           

将閉包反序列化的核心代碼為:

private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
  extends DeserializationStream {

  private val objIn = new ObjectInputStream(in) {
    override def resolveClass(desc: ObjectStreamClass): Class[_] =
      try {
        Class.forName(desc.getName, false, loader)
      } catch {
        case e: ClassNotFoundException =>
          JavaDeserializationStream.primitiveMappings.getOrElse(desc.getName, throw e)
      }
  }
  ...
}
           

關于

ObjectInputStream

我們前面已有介紹,

JavaDeserializationStream

有個關鍵的成員變量

loader

,它是個ClassLoader,可以讓Spark使用非預設的ClassLoader按照自定義的加載政策去加載class,這樣才能保證反序列化過程在其他節點正常進行。

通過前面的介紹,想要代碼在另一端執行,隻有序列化還不行,還需要保證執行端能夠加載到閉包對應的類。接下來我們探讨Spark加載class的機制。

通常情況下我們會将開發的Spark Application打包為jar包,然後通過

spark-submit

指令送出到叢集運作,下面是一個官網的示例:

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  ... \
  --jars /path/to/dep-libs.jar \
  /path/to/examples.jar \
           

此時,我們編寫的代碼中所包含的閉包,對應的類已經被編譯到jar包中了,是以Executor端隻要能加載到這個jar包,從jar包中定位閉包的class檔案,就可以将閉包反序列化了。事實上Spark也是這麼做的。

Spark Application的Driver端在運作的時候會基于netty建立一個檔案服務,我們運作的jar包,及

--jars

中指定的依賴jar包,會被添加到檔案伺服器中。這個過程在

SparkContext

的addJar方法中完成:

/**
 * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
 * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
 * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
 */
def addJar(path: String) {
  if (path == null) {
    logWarning("null specified as parameter to addJar")
  } else {
    var key = ""
    if (path.contains("\\")) {
      // For local paths with backslashes on Windows, URI throws an exception
      key = env.rpcEnv.fileServer.addJar(new File(path))
    } else {
      val uri = new URI(path)
      // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
      Utils.validateURL(uri)
      key = uri.getScheme match {
        // A JAR file which exists only on the driver node
        case null | "file" =>
          try {
            env.rpcEnv.fileServer.addJar(new File(uri.getPath))
          } catch {
            case exc: FileNotFoundException =>
              logError(s"Jar not found at $path")
              null
          }
        // A JAR file which exists locally on every worker node
        case "local" =>
          "file:" + uri.getPath
        case _ =>
          path
      }
    }
    if (key != null) {
      val timestamp = System.currentTimeMillis
      if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
        logInfo(s"Added JAR $path at $key with timestamp $timestamp")
        postEnvironmentUpdate()
      }
    }
  }
}
           

Executor端在執行任務的時候,會從任務資訊中得到依賴的jar包,然後

updateDependencies

從Driver端的檔案伺服器下載下傳缺失的jar包,并将jar包添加到URLClassLoader中,最後再将task反序列化,反序列化前所需的jar都已準備好,是以能夠将task中的閉包正常反序列化,核心代碼如下:

override def run(): Unit = {
  ...
  
  try {
    val (taskFiles, taskJars, taskProps, taskBytes) =
      Task.deserializeWithDependencies(serializedTask)

    // Must be set before updateDependencies() is called, in case fetching dependencies
    // requires access to properties contained within (e.g. for access control).
    Executor.taskDeserializationProps.set(taskProps)

    updateDependencies(taskFiles, taskJars)
    task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
    ...
  } finally {
    runningTasks.remove(taskId)
  }
}
           

這麼來看,整個Spark Application分布式加載class的機制就比較清晰了。Executor端能夠正常加載class,反序列化閉包,分布式執行代碼自然就不存在什麼問題了。

spark-shell

是Spark為我們提供的一個REPL的工具,可以讓我們非常友善的寫一些簡單的資料處理腳本。下面是一個運作在

spark-shell

的代碼:

scala> val f = (x: Int) => x + 1
f: Int => Int = <function1>

scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> distData.map(f).sum()
res0: Double = 20.0
           

我們已知,閉包

f

會被Scala編譯為匿名類,如果要将

f

序列化到Executor端執行,必須要加載

f

對應的匿名類的class資料,才能正常反序列化。

Spark是如何得到

f

的class資料的?Executor又是如何加載到的?

源碼面前,了無秘密。我們看一下Spark的repl項目的代碼入口,核心代碼如下:

object Main extends Logging {
  ...
  val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
  val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")

  def main(args: Array[String]) {
    doMain(args, new SparkILoop)
  }

  // Visible for testing
  private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
    interp = _interp
    val jars = Utils.getUserJars(conf, isShell = true).mkString(File.pathSeparator)
    val interpArguments = List(
      "-Yrepl-class-based",
      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
      "-classpath", jars
    ) ++ args.toList

    val settings = new GenericRunnerSettings(scalaOptionError)
    settings.processArguments(interpArguments, true)

    if (!hasErrors) {
      interp.process(settings) // Repl starts and goes in loop of R.E.P.L
      Option(sparkContext).map(_.stop)
    }
  }
  ...
}
           

Spark2.1.0的REPL基于Scala-2.11的

scala.tools.nsc

編譯工具實作,代碼已經相當簡潔,Spark給

interp

設定了2個關鍵的配置

-Yrepl-class-based

-Yrepl-outdir

,通過這兩個配置,我們在shell中輸入的代碼會被編譯為class檔案輸出到執行的檔案夾中。如果指定了

spark.repl.classdir

配置,會用這個配置的路徑作為class檔案的輸出路徑,否則使用

SPARK_LOCAL_DIRS

對應的路徑。下面是我測試過程中輸出到檔案夾中的class檔案:

深度剖析Spark分布式執行原理

我們已經清楚Spark如何将shell中的代碼編譯為class了,那麼Executor端,如何加載到這些class檔案呢?在

org/apache/spark/executor/Executor.scala

中有段和REPL相關的代碼:

private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

/**
 * If the REPL is in use, add another ClassLoader that will read
 * new classes defined by the REPL as the user types code
 */
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
  val classUri = conf.get("spark.repl.class.uri", null)
  if (classUri != null) {
    logInfo("Using REPL class URI: " + classUri)
    try {
      val _userClassPathFirst: java.lang.Boolean = userClassPathFirst
      val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader")
        .asInstanceOf[Class[_ <: ClassLoader]]
      val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv],
        classOf[String], classOf[ClassLoader], classOf[Boolean])
      constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst)
    } catch {
      case _: ClassNotFoundException =>
        logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
        System.exit(1)
        null
    }
  } else {
    parent
  }
}

override def run(): Unit = {
  ...
  Thread.currentThread.setContextClassLoader(replClassLoader)
  val ser = env.closureSerializer.newInstance()
  ...
}
           

Executor啟動時會判斷是否為REPL模式,如果是的話會使用

ExecutorClassLoader

做為反序列閉包時所使用的ClassLoader,

ExecutorClassLoader

會通過網絡從Driver端(也就是執行

spark-shell

的節點)加載所需的class檔案。這樣我們在

spark-shell

中寫的代碼就可以分布式執行了。

總結

Spark實作代碼的分布式執行有2個關鍵點:

  1. 對象必須可序列化
  2. Executor端能夠加載到所需類的class檔案,保證反序列化過程不出錯,這點通過自定義的ClassLoader來保障

滿足以上2個條件,我們的代碼就可以分布式運作了。

當然,建構一個完整的分布式計算架構,還需要有網絡通信架構、RPC、檔案傳輸服務等作為支撐,在了解Spark代碼分布式執行原理的基礎上,相信讀者已有思路基于JVM相關的語言建構分布式計算服務。

類比其他非JVM相關的語言,實作一個分布式計算架構,依然是需要解決序列化,動态加載執行代碼的問題。

據說看到好文章不推薦的人,伺服器容易當機!

本文版權歸翟士丹(Stan Zhai)和部落格園共有,原創文章,未經允許不得轉載,否則保留追究法律責任的權利。