天天看點

XGBoost4J-Spark 1.0.0運作流程與排坑指南0.前言1.确認版本2.Maven配置3.代碼4.運作5.評價

XGBoost4J-Spark配置全解

  • 0.前言
  • 1.确認版本
  • 2.Maven配置
  • 3.代碼
  • 4.運作
  • 5.評價

0.前言

XGBoost4J-Spark能夠讓我們在Spark上玩XGBoost,對于海量資料來說應該是很有用的。這篇文章主要介紹了将其官網提供的Demo部署在Linux上運作的全過程。通過此Demo在Spark上用XGBoost完成iris資料集的多分類任務。

此次選用的是最新的穩定版XGBoost4J-Spark 1.0.0,之前0.81版本的Windows部署可以參考這裡。不過其實都是互通的,兩邊都看看也無妨。我這裡會将我選擇、填坑時的思路全部記錄下來,需要注意的地方我會用這樣進行标注,以供你參考,(●´З`●)。

1.确認版本

作為最基礎的demo,要考慮的版本包括:

  • XGBoost4J-Spark
  • Scala
  • Spark
  • Python
  • Hadoop
  • Java

在安裝之前一定要根據自己的配置選擇合适的版本,確定兩兩之間版本契合,否則後面會報一堆錯。我選擇的版本如下:

子產品 版本号
XGBoost4J-Spark 1.0.0
Scala 2.11.8
Spark 2.4.5
Python 3.5.2
Hadoop 2.6
Java 1.8.0

具體考量如下:首先是選擇了目前的最新穩定版——1.0.0的XGBoost4J-Spark,然後在它的官網文檔裡發現0.9之後的版本僅支援Spark2.4+了,具體原因摘抄在下面:

XGBoost4J-Spark now requires Apache Spark 2.4+. Latest versions of XGBoost4J-Spark uses facilities of org.apache.spark.ml.param.shared extensively to provide for a tight integration with Spark MLLIB framework, and these facilities are not fully available on earlier versions of Spark.

但我Spark是卑微的2.2.0,隻能重新裝一個高版本了。然後選擇了Spark 2.4.5 Prebuild for Apache Hadoop 2.6。這裡Hadoop的版本2.6.0就确定好了,好在Hadoop和Spark互通這塊網上排坑指南比較多,在實際操作時這塊也沒出問題,甚至我用的是hadoop-2.6.0-cdh5.7.0也沒啥問題,好評。然後這裡有個點請注意一下,在Spark下載下傳界面有這麼一段話:

Note that, Spark is pre-built with Scala 2.11 except version 2.4.2, which is pre-built with Scala 2.12.

是以對于Spark 2.4.5來說,給咱們的jars是基于Scala 2.11的(當然你也可以選擇 Pre-built with Scala 2.12 and user-provided Apache Hadoop這個版本,然而我試了一下發現在spark-submit這個版本的時候坑太多了,比起填這個坑還是轉Scala2.11簡單點,遂棄之)。

于是Scala的大版本也确定好了,在選擇Scala的時候小心每個小版本裡也有很多更疊,導緻一些方法找不到,最後在嘗試過很多小版本後敲定了Scala 2.11.8,至少在這個Demo裡沒有表現出特别的問題。

另外提一嘴Python,版本需要2.7+,盡量就Python 3+就完事了,不然會在訓練的時候報錯

ImportError: No module named argparse

,進而導緻出現

XGBoostError: XGBoostModel training failed

這種詭異的錯誤。

JDK就是1.8.0,沒啥好說的,也沒出問題,就寫在這以防有老哥要看。

2.Maven配置

要注意各個版本的一一對應,要查包的對應版本的話請在MvnRepository裡查找關鍵字。

這裡直接放整體的pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.taipark.xgboost</groupId>
  <artifactId>sonarAnalysis</artifactId>
  <version>1.0</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>ml.dmlc</groupId>
      <artifactId>xgboost4j-spark_2.11</artifactId>
      <version>1.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>
           

3.代碼

配置好之後就要開始寫代碼了,代碼在xgboost的文檔裡說的其實還是蠻清楚的,這裡把完整的能運作的代碼放出來了解一下:

注:代碼是用Scala編寫的。

首先是import:

import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
           

建立SparkSession:

根據資料集設定Schema:

val schema = new StructType(Array(
	      StructField("sepal length", DoubleType, true),
	      StructField("sepal width", DoubleType, true),
	      StructField("petal length", DoubleType, true),
	      StructField("petal width", DoubleType, true),
	      StructField("class", StringType, true)
	    ))
           

導入資料,使其與Schema一一對應,這裡注意資料在轉為csv的時候不要使用UTF-8之類的編碼,而是使用Windows預設編碼,不然會導緻XGBoost識别分類數量錯誤進而報有關

num_class

的錯誤:

val dataPath = args(0)
    val rawInput = spark.read.schema(schema).csv(dataPath)
           

這裡的class列是用String表示的,我們要将其轉化為Double類型,可以使用StringIndexer來轉換:

val stringIndexer = new StringIndexer()
      .setInputCol("class")
      .setOutputCol("classIndex")
      .fit(rawInput)
   val labelTransformed = stringIndexer.transform(rawInput).drop("class")
           

将轉換後的列classIndex替換之前的列class:

val vectorAssembler = new VectorAssembler()
      .setInputCols(Array("sepal length","sepal width","petal length","petal width"))
      .setOutputCol("features")
    val xgbInput = vectorAssembler.transform(labelTransformed)
      .select("features","classIndex")
           

将資料八二分為訓練集與測試集:

val splitXgbInput = xgbInput.randomSplit(Array(0.8, 0.2))
    val trainXgbInput = splitXgbInput(0)
    val testXgbInput = splitXgbInput(1)
           

設定XGBoost的參數,需要特别注意的是num_workers的值需要小于等于之後傳進spark-submit的master的值,即XGBoost用到的線程要小于Spark啟的線程:

val xgbParam = Map(
	      "eta" -> 0.1f,
	      "max_depth" -> 2,
	      "objective" -> "multi:softprob",
	      "num_class" -> 3,
	      "num_round" -> 100,
	      "num_workers" -> 2
	    )
           

建立XGBoost函數,并且展示了另一種設定參數的方法,這裡需要注意隻要你是用分布式進行訓練,那麼setTreeMethod(“approx”)是一定要設定的,不然最後在訓練的時候JVM會報

Core dump written

//建立xgb函數,指定特征向量與标簽
    val xgbClassifier = new XGBoostClassifier(xgbParam)
      .setFeaturesCol("features")
      .setLabelCol("classIndex")
    xgbClassifier.setMaxDepth(2)
    xgbClassifier.setTreeMethod("approx")
           

參數設定完畢,開始訓練:

進行預測:

展示并停止:

result.show(1000)

    spark.stop()
           

最後再放一個完整的代碼:

package com.taipark.xgboost

import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

object IrisAnalysis {
  def main(args: Array[String]): Unit = {
    //建立SparkSession
    val spark = SparkSession.builder()
      .getOrCreate()

    //設定schema
    val schema = new StructType(Array(
      StructField("sepal length", DoubleType, true),
      StructField("sepal width", DoubleType, true),
      StructField("petal length", DoubleType, true),
      StructField("petal width", DoubleType, true),
      StructField("class", StringType, true)
    ))

    //導入資料
    val dataPath = args(0)
    val rawInput = spark.read.schema(schema).csv(dataPath)

    //将class由string轉為double
    val stringIndexer = new StringIndexer()
      .setInputCol("class")
      .setOutputCol("classIndex")
      .fit(rawInput)
    val labelTransformed = stringIndexer.transform(rawInput).drop("class")

    //拼合組成整體資料集
    val vectorAssembler = new VectorAssembler()
      .setInputCols(Array("sepal length","sepal width","petal length","petal width"))
      .setOutputCol("features")
    val xgbInput = vectorAssembler.transform(labelTransformed)
      .select("features","classIndex")

    //将資料切分為訓練集與測試集
    val splitXgbInput = xgbInput.randomSplit(Array(0.8, 0.2))
    val trainXgbInput = splitXgbInput(0)
    val testXgbInput = splitXgbInput(1)

    //設定xgb參數
    val xgbParam = Map(
      "eta" -> 0.1f,
      "max_depth" -> 2,
      "objective" -> "multi:softprob",
      "num_class" -> 3,
      "num_round" -> 100,
      "num_workers" -> 2
    )

    //建立xgb函數,指定特征向量與标簽
    val xgbClassifier = new XGBoostClassifier(xgbParam)
      .setFeaturesCol("features")
      .setLabelCol("classIndex")
    xgbClassifier.setMaxDepth(2)
    xgbClassifier.setTreeMethod("approx")

    //開始訓練
    val xgbClassificationModel = xgbClassifier.fit(trainXgbInput)

    //預測
    val result = xgbClassificationModel.transform(testXgbInput)

    //展示
    result.show(1000)

    spark.stop()
  }

}
           

4.運作

打包後上傳Linux伺服器,需要一并上傳的内容包括:

  • 資料集.csv
  • 打包好的jar包
  • xgboost4j_2.11-1.0.0.jar
  • xgboost4j-spark_2.11-1.0.0.jar

後面兩個jar包需要單獨上傳上去,這兩個檔案應該在Maven的倉庫裡,可以用IDEA找,或者用Everything直接在Windows裡找。

首先需要将資料集放在HDFS裡,然後運作spark-submit:

./spark-submit \
--master local[2] \
--class com.taipark.xgboost.IrisAnalysis \
--jars /home/hadoop/lib/xgboost4j-spark_2.11-1.0.0.jar,/home/hadoop/lib/xgboost4j_2.11-1.0.0.jar \
 /home/hadoop/lib/sonarAnalysis-1.0.jar hdfs://hadoop000:8020/tai/iris.csv
           

其中傳入的第一個local[2]的2要大于等于之前設定的num_workers,這個之前說過了。其他的根據你自己放的位置引就可以了。

最後給個跑出來的結果:

XGBoost4J-Spark 1.0.0運作流程與排坑指南0.前言1.确認版本2.Maven配置3.代碼4.運作5.評價

5.評價

用AUC來對模型進行評價,在預測和展示之後、

spark.stop()

之前加入如下代碼段即可:

//評價
    val scoreAndLabels = result.select(xgbClassificationModel.getPredictionCol,xgbClassificationModel.getLabelCol)
        .rdd.map{case Row(score: Double,lable: Double) => (score,lable)}
    val metric = new BinaryClassificationMetrics(scoreAndLabels)
    val auc = metric.areaUnderROC()
    println("auc:" + auc)
           

如果對你有用,就給個關注吧,(●´З`●)

繼續閱讀