XGBoost4J-Spark配置全解
- 0.前言
- 1.确認版本
- 2.Maven配置
- 3.代碼
- 4.運作
- 5.評價
0.前言
XGBoost4J-Spark能夠讓我們在Spark上玩XGBoost,對于海量資料來說應該是很有用的。這篇文章主要介紹了将其官網提供的Demo部署在Linux上運作的全過程。通過此Demo在Spark上用XGBoost完成iris資料集的多分類任務。
此次選用的是最新的穩定版XGBoost4J-Spark 1.0.0,之前0.81版本的Windows部署可以參考這裡。不過其實都是互通的,兩邊都看看也無妨。我這裡會将我選擇、填坑時的思路全部記錄下來,需要注意的地方我會用這樣進行标注,以供你參考,(●´З`●)。
1.确認版本
作為最基礎的demo,要考慮的版本包括:
- XGBoost4J-Spark
- Scala
- Spark
- Python
- Hadoop
- Java
在安裝之前一定要根據自己的配置選擇合适的版本,確定兩兩之間版本契合,否則後面會報一堆錯。我選擇的版本如下:
子產品 | 版本号 |
---|---|
XGBoost4J-Spark | 1.0.0 |
Scala | 2.11.8 |
Spark | 2.4.5 |
Python | 3.5.2 |
Hadoop | 2.6 |
Java | 1.8.0 |
具體考量如下:首先是選擇了目前的最新穩定版——1.0.0的XGBoost4J-Spark,然後在它的官網文檔裡發現0.9之後的版本僅支援Spark2.4+了,具體原因摘抄在下面:
XGBoost4J-Spark now requires Apache Spark 2.4+. Latest versions of XGBoost4J-Spark uses facilities of org.apache.spark.ml.param.shared extensively to provide for a tight integration with Spark MLLIB framework, and these facilities are not fully available on earlier versions of Spark.
但我Spark是卑微的2.2.0,隻能重新裝一個高版本了。然後選擇了Spark 2.4.5 Prebuild for Apache Hadoop 2.6。這裡Hadoop的版本2.6.0就确定好了,好在Hadoop和Spark互通這塊網上排坑指南比較多,在實際操作時這塊也沒出問題,甚至我用的是hadoop-2.6.0-cdh5.7.0也沒啥問題,好評。然後這裡有個點請注意一下,在Spark下載下傳界面有這麼一段話:
Note that, Spark is pre-built with Scala 2.11 except version 2.4.2, which is pre-built with Scala 2.12.
是以對于Spark 2.4.5來說,給咱們的jars是基于Scala 2.11的(當然你也可以選擇 Pre-built with Scala 2.12 and user-provided Apache Hadoop這個版本,然而我試了一下發現在spark-submit這個版本的時候坑太多了,比起填這個坑還是轉Scala2.11簡單點,遂棄之)。
于是Scala的大版本也确定好了,在選擇Scala的時候小心每個小版本裡也有很多更疊,導緻一些方法找不到,最後在嘗試過很多小版本後敲定了Scala 2.11.8,至少在這個Demo裡沒有表現出特别的問題。
另外提一嘴Python,版本需要2.7+,盡量就Python 3+就完事了,不然會在訓練的時候報錯
ImportError: No module named argparse
,進而導緻出現
XGBoostError: XGBoostModel training failed
這種詭異的錯誤。
JDK就是1.8.0,沒啥好說的,也沒出問題,就寫在這以防有老哥要看。
2.Maven配置
要注意各個版本的一一對應,要查包的對應版本的話請在MvnRepository裡查找關鍵字。
這裡直接放整體的pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.taipark.xgboost</groupId>
<artifactId>sonarAnalysis</artifactId>
<version>1.0</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.8</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-spark_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
3.代碼
配置好之後就要開始寫代碼了,代碼在xgboost的文檔裡說的其實還是蠻清楚的,這裡把完整的能運作的代碼放出來了解一下:
注:代碼是用Scala編寫的。
首先是import:
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
建立SparkSession:
根據資料集設定Schema:
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField("class", StringType, true)
))
導入資料,使其與Schema一一對應,這裡注意資料在轉為csv的時候不要使用UTF-8之類的編碼,而是使用Windows預設編碼,不然會導緻XGBoost識别分類數量錯誤進而報有關
num_class
的錯誤:
val dataPath = args(0)
val rawInput = spark.read.schema(schema).csv(dataPath)
這裡的class列是用String表示的,我們要将其轉化為Double類型,可以使用StringIndexer來轉換:
val stringIndexer = new StringIndexer()
.setInputCol("class")
.setOutputCol("classIndex")
.fit(rawInput)
val labelTransformed = stringIndexer.transform(rawInput).drop("class")
将轉換後的列classIndex替換之前的列class:
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("sepal length","sepal width","petal length","petal width"))
.setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed)
.select("features","classIndex")
将資料八二分為訓練集與測試集:
val splitXgbInput = xgbInput.randomSplit(Array(0.8, 0.2))
val trainXgbInput = splitXgbInput(0)
val testXgbInput = splitXgbInput(1)
設定XGBoost的參數,需要特别注意的是num_workers的值需要小于等于之後傳進spark-submit的master的值,即XGBoost用到的線程要小于Spark啟的線程:
val xgbParam = Map(
"eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2
)
建立XGBoost函數,并且展示了另一種設定參數的方法,這裡需要注意隻要你是用分布式進行訓練,那麼setTreeMethod(“approx”)是一定要設定的,不然最後在訓練的時候JVM會報
Core dump written
:
//建立xgb函數,指定特征向量與标簽
val xgbClassifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol("features")
.setLabelCol("classIndex")
xgbClassifier.setMaxDepth(2)
xgbClassifier.setTreeMethod("approx")
參數設定完畢,開始訓練:
進行預測:
展示并停止:
result.show(1000)
spark.stop()
最後再放一個完整的代碼:
package com.taipark.xgboost
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
object IrisAnalysis {
def main(args: Array[String]): Unit = {
//建立SparkSession
val spark = SparkSession.builder()
.getOrCreate()
//設定schema
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField("class", StringType, true)
))
//導入資料
val dataPath = args(0)
val rawInput = spark.read.schema(schema).csv(dataPath)
//将class由string轉為double
val stringIndexer = new StringIndexer()
.setInputCol("class")
.setOutputCol("classIndex")
.fit(rawInput)
val labelTransformed = stringIndexer.transform(rawInput).drop("class")
//拼合組成整體資料集
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("sepal length","sepal width","petal length","petal width"))
.setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed)
.select("features","classIndex")
//将資料切分為訓練集與測試集
val splitXgbInput = xgbInput.randomSplit(Array(0.8, 0.2))
val trainXgbInput = splitXgbInput(0)
val testXgbInput = splitXgbInput(1)
//設定xgb參數
val xgbParam = Map(
"eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> 2
)
//建立xgb函數,指定特征向量與标簽
val xgbClassifier = new XGBoostClassifier(xgbParam)
.setFeaturesCol("features")
.setLabelCol("classIndex")
xgbClassifier.setMaxDepth(2)
xgbClassifier.setTreeMethod("approx")
//開始訓練
val xgbClassificationModel = xgbClassifier.fit(trainXgbInput)
//預測
val result = xgbClassificationModel.transform(testXgbInput)
//展示
result.show(1000)
spark.stop()
}
}
4.運作
打包後上傳Linux伺服器,需要一并上傳的内容包括:
- 資料集.csv
- 打包好的jar包
- xgboost4j_2.11-1.0.0.jar
- xgboost4j-spark_2.11-1.0.0.jar
後面兩個jar包需要單獨上傳上去,這兩個檔案應該在Maven的倉庫裡,可以用IDEA找,或者用Everything直接在Windows裡找。
首先需要将資料集放在HDFS裡,然後運作spark-submit:
./spark-submit \
--master local[2] \
--class com.taipark.xgboost.IrisAnalysis \
--jars /home/hadoop/lib/xgboost4j-spark_2.11-1.0.0.jar,/home/hadoop/lib/xgboost4j_2.11-1.0.0.jar \
/home/hadoop/lib/sonarAnalysis-1.0.jar hdfs://hadoop000:8020/tai/iris.csv
其中傳入的第一個local[2]的2要大于等于之前設定的num_workers,這個之前說過了。其他的根據你自己放的位置引就可以了。
最後給個跑出來的結果:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiclRnblN2XjlGcjAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL6NGVPVTS61kMNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5MjM4MDO0EjMwMDNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
5.評價
用AUC來對模型進行評價,在預測和展示之後、
spark.stop()
之前加入如下代碼段即可:
//評價
val scoreAndLabels = result.select(xgbClassificationModel.getPredictionCol,xgbClassificationModel.getLabelCol)
.rdd.map{case Row(score: Double,lable: Double) => (score,lable)}
val metric = new BinaryClassificationMetrics(scoreAndLabels)
val auc = metric.areaUnderROC()
println("auc:" + auc)
如果對你有用,就給個關注吧,(●´З`●)