在大資料應用場景下,面對實時計算、處理流失資料及降低計算耗時等問題,通過Apache Spark 提供的計算引擎能很好地滿足這些需求。S park 是一種基于記憶體的分布式計算,其核心為彈性分布式資料集( Resilient Distributed Dat asets ,簡稱RDD ),它支援多種資料來源,擁有容錯機制,可以被緩存,并且支援并行操作,能夠很好地用于資料挖掘和機器學習。
Spark 是專門為海量資料處理而設計的快速且通用的計算引擎, 支援多種程式設計語言 ( Java 、Scala 、Python 、R ),并擁有更快的計算速度。據官網資料統計,通過利用記憶體進行資料計算, Spark 的計算速度比Ha doop 中MapReduce 的計算速度快100 倍。
安裝部署及使用
Spark 叢集的安裝部署并不複雜,需要配置的資訊較少, 讀者可以通過本節的學習,完成一個基于分布式Spark 叢集的搭建。
在官網選擇Spark 軟體安裝包時,需要注意Spark 和Hadoop 的版本比對問題。
環境準備
- 伺服器叢集
我用的CentOS-6.6版本的4個虛拟機,主機名為hadoop01、hadoop02、hadoop03、hadoop04,另外我會使用hadoop使用者搭建叢集(生産環境中root使用者不是可以任意使用的)
- spark安裝包
下載下傳位址:https://mirrors.aliyun.com/apache/spark/
我用的spark-2.2.0-bin-hadoop2.7.tgz
要根據自己機器中的hadoop版本選擇對應的spark版本
1. 叢集規劃
. 詳細步驟
(1) 把安裝包上傳到hadoop01伺服器并解壓
[[email protected] soft]$ tar zxvf spark-2.2.0-bin-hadoop2.7.tgz -C /home/hadoop/apps/ # 解壓後如果感覺安裝目錄的名稱太長可以修改一下 [[email protected] soft]$ cd /home/hadoop/apps/ [[email protected] apps]$ mv spark-2.2.0-bin-hadoop2.7 spark-2.2.0
(2) 修改spark-env.sh配置檔案
# 把SPARK_HOME/conf/下的spark-env.sh.template檔案複制為spark-env.sh [[email protected] apps]$ cd spark-2.2.0/conf [[email protected] conf]$ mv spark-env.sh.template spark-env.sh # 修改spark-env.sh配置檔案,添加如下内容 [[email protected] conf]$ vim spark-env.sh # 配置JAVA_HOME,一般來說,不配置也可以,但是可能會出現問題,還是配上吧 export JAVA_HOME=/usr/local/java/jdk1.8.0_73 # 一般來說,spark任務有很大可能性需要去HDFS上讀取檔案,是以配置上 # 如果說你的spark就讀取本地檔案,也不需要yarn管理,不用配 export HADOOP_CONF_DIR=/home/hadoop/apps/hadoop-2.7.4/etc/hadoop # 設定Master的主機名 export SPARK_MASTER_HOST=hadoop01 # 送出Application的端口,預設就是這個,萬一要改呢,改這裡 export SPARK_MASTER_PORT=7077 # 每一個Worker最多可以使用的cpu core的個數,我虛拟機就一個... # 真實伺服器如果有32個,你可以設定為32個 export SPARK_WORKER_CORES=1 # 每一個Worker最多可以使用的記憶體,我的虛拟機就2g # 真實伺服器如果有128G,你可以設定為100G export SPARK_WORKER_MEMORY=1g
(3) 修改slaves配置檔案,添加Worker的主機清單
[[email protected] conf]$ mv slaves.template slaves [[email protected] conf]$ vim slaves # 裡面的内容原來為localhost hadoop01 hadoop02 hadoop03 hadoop04
(4) 把SPARK_HOME/sbin下的start-all.sh和stop-all.sh這兩個檔案重命名
比如分别把這兩個檔案重命名為start-spark-all.sh和stop-spark-all.sh
原因:
如果叢集中也配置HADOOP_HOME,那麼在HADOOP_HOME/sbin目錄下也有start-all.sh和stop-all.sh這兩個檔案,當你執行這兩個檔案,系統不知道是操作hadoop叢集還是spark叢集。修改後就不會沖突了,當然,不修改的話,你需要進入它們的sbin目錄下執行這些檔案,這肯定就不會發生沖突了。我們配置SPARK_HOME主要也是為了執行其他spark指令友善。
[[email protected] conf]$ cd ../sbin [[email protected] sbin]$ mv start-all.sh start-spark-all.sh [[email protected] sbin]$ mv stop-all.sh stop-spark-all.sh
(5) 把spark安裝包分發給其他節點
[[email protected] apps]$ scp -r spark-2.2.0 hadoop02:`pwd` [[email protected] apps]$ scp -r spark-2.2.0 hadoop03:`pwd` [[email protected] apps]$ scp -r spark-2.2.0 hadoop04:`pwd`
(6) 在叢集所有節點中配置SPARK_HOME環境變量
[[email protected] conf]$ vim ~/.bash_profile export SPARK_HOME=/home/hadoop/apps/spark-2.2.0 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin [[email protected] conf]$ source ~/.bash_profile # 其他節點也都配置...
(7) 在spark master節點啟動spark叢集
# 注意,如果你沒有執行第4步,一定要進入SPARK_HOME/sbin目錄下執行這個指令 # 或者你在Master節點分别執行start-master.sh和start-slaves.sh [[email protected] conf]$ start-spark-all.sh
注意:
- 如果你配置了HADOOP_CONF_DIR,在啟動spark叢集之前,先啟動hadoop叢集
(8) 驗證
spark完全分布式叢集搭建成功!
### 實際測試 ###
[[email protected] ~]$ cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
## bigdata cluster ##
192.168.41.20 big-master1 #bigdata1 namenode1,zookeeper,resourcemanager Haproxy-master
192.168.41.21 big-master2 #bigdata2 namenode2,zookeeper,slave,resourcemanager haproxy-standby spark0-master
192.168.41.22 big-slave01 #bigdata3 datanode1,zookeeper,slave hive1 spark1-work
192.168.41.25 big-slave02 #bigdata4 datanode2,zookeeper,slave hive2 spark2-work
192.168.41.27 big-slave03 #bigdata5 datanode3,zookeeper,slave hive3 spark3-work
192.168.41.17 tidb05.500.com #hive mysql
######
[[email protected] ~]$ cat /etc/profile
## scala ##
export SCALA_HOME=/usr/local/scala-2.13.3
export PATH=$SCALA_HOME/bin:$PATH
## Python3.7 ##
export PYTHON3_HOME=/usr/local/python3
export PATH=$PYTHON3_HOME/bin:$PATH
## spark ##
export SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
######
[[email protected] ~]$ cd /usr/local/spark/
[[email protected] spark]$ pwd
/usr/local/spark
[[email protected] spark]$ ls
bin conf data examples jars kubernetes LICENSE licenses logs NOTICE python R README.md RELEASE sbin yarn
[[email protected] spark]$ cd conf/
[[email protected] conf]$ ls
docker.properties.template log4j.properties.template slaves spark-defaults.conf spark-env.sh
fairscheduler.xml.template metrics.properties.template slaves.template spark-defaults.conf.template spark-env.sh.template
[[email protected] conf]$ cat slaves
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
big-slave01
big-slave02
big-slave03
[[email protected] conf]$ cat spark-env.sh
#!/usr/bin/env bash
export JAVA_HOME=/usr/local/jdk1.8.0_251
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export JAVA_LIBRARY_PATH=/usr/local/hadoop/lib/native
export SCALA_HOME=/usr/local/scala-2.13.3
export PYTHON3_HOME=/usr/local/python3
export SPARK_MASTER_HOST=big-master2
export SPARK_LOCAL_DIRS=/usr/local/spark
export SPARK_EXECUTOR_MEMORY=2g
export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
[[email protected] conf]$ cat spark-defaults.conf
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.eventLog.enabled true
spark.eventLog.dir hdfs://big-master1:9000/historyserverforSpark
spark.yarn.historyServer.address big-master2:18080
spark.history.fs.logDirectory hdfs://big-master1:9000/historyserverforSpark
spark.speculation true
-- master2 節點 把所有檔案拷貝至 slave 庫。
###
啟動:
cd /usr/local/spark/sbin
[[email protected] sbin]$ ls
slaves.sh start-all.sh start-mesos-shuffle-service.sh start-thriftserver.sh stop-mesos-dispatcher.sh stop-slaves.sh
spark-config.sh start-history-server.sh start-shuffle-service.sh stop-all.sh stop-mesos-shuffle-service.sh stop-thriftserver.sh
spark-daemon.sh start-master.sh start-slave.sh stop-history-server.sh stop-shuffle-service.sh
spark-daemons.sh start-mesos-dispatcher.sh start-slaves.sh stop-master.sh stop-slave.sh
驗證:
[[email protected] ~]$ jps
20032 NameNode
20116 JournalNode
20324 DFSZKFailoverController
31540 HMaster
2988 Master
7788 Jps
18830 QuorumPeerMain
2462 ResourceManager
[[email protected] ~]# jps
10161 NodeManager
28338 HRegionServer
10546 RunJar
8196 Jps
7702 QuorumPeerMain
8583 DataNode
8108 Worker
8686 JournalNode
10638 RunJar
[[email protected] ~]# jps
7168 Worker
8322 Jps
5187 DataNode
8581 RunJar
6697 NodeManager
4362 QuorumPeerMain
5290 JournalNode
25869 HRegionServer
[[email protected] ~]# jps
4562 QuorumPeerMain
5442 DataNode
26004 HRegionServer
6389 RunJar
6903 NodeManager
5545 JournalNode
26895 Worker
27375 Jps
登入: http://big-master2:8088
問題一:
在運作 [[email protected] ~]$ spark-submit --master spark://big-master2:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark/examples/jars/spark-examples_2.11-2.4.5.jar 500 時 提示報錯:
java.io.FileNotFoundException: File does not exist: hdfs://big-master1:9000/historyserverforSpark
解決辦法:
登入hdoop 系統,建立對應的 目錄:
[[email protected] ~]$ hdfs dfs -mkdir -p /historyserverforSpark
[[email protected] ~]$ hdfs dfs -chmod 777 /historyserverforSpark
[[email protected] ~]$ hdfs dfs -ls /
Found 11 items
drwxr-xr-x - hadoop supergroup 0 2020-05-26 16:17 /data
drwxr-xr-x - hadoop supergroup 0 2020-06-04 23:41 /hbase
drwxrwxrwx - hadoop supergroup 0 2020-08-07 16:11 /historyserverforSpark
drwxr-xr-x - hadoop supergroup 0 2020-05-24 02:53 /sqoop-mysql
drwxr-xr-x - hadoop supergroup 0 2020-05-24 03:04 /sqoop-mysql11
drwxr-xr-x - hadoop supergroup 0 2020-05-24 02:59 /sqoop-mysql22
drwxr-xr-x - hadoop supergroup 0 2020-05-15 14:59 /test
drwxr-xr-x - hadoop supergroup 0 2020-05-18 17:15 /test01
drwx------ - hadoop supergroup 0 2020-06-11 12:22 /tmp
drwxr-xr-x - hadoop supergroup 0 2020-06-11 12:21 /user
drwxr-xr-x - root supergroup 0 2020-05-26 23:08 /var
再執行: [[email protected]big-master2 ~]$ spark-submit --master spark://big-master2:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark/examples/jars/spark-examples_2.11-2.4.5.jar 500
.......
.......
20/08/07 16:13:09 INFO scheduler.TaskSetManager: Finished task 495.0 in stage 0.0 (TID 495) in 170 ms on 192.168.41.22 (executor 1) (500/500)
20/08/07 16:13:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/08/07 16:13:09 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 22.098 s
20/08/07 16:13:09 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 22.793146 s
Pi is roughly 3.1413011028260223
20/08/07 16:13:10 INFO server.AbstractConnector: Stopped [email protected]{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
20/08/07 16:13:10 INFO ui.SparkUI: Stopped Spark web UI at http://big-master2:4040
20/08/07 16:13:10 INFO cluster.StandaloneSchedulerBackend: Shutting down all executors
20/08/07 16:13:10 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
20/08/07 16:13:11 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/08/07 16:13:11 INFO memory.MemoryStore: MemoryStore cleared
20/08/07 16:13:11 INFO storage.BlockManager: BlockManager stopped
20/08/07 16:13:11 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
20/08/07 16:13:11 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/08/07 16:13:12 INFO spark.SparkContext: Successfully stopped SparkContext
20/08/07 16:13:12 INFO util.ShutdownHookManager: Shutdown hook called
20/08/07 16:13:12 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1f127bd1-743a-472f-8a6f-c65256e6cae8
20/08/07 16:13:12 INFO util.ShutdownHookManager: Deleting directory /usr/local/spark/spark-805632c9-774a-4e29-9481-ed0a071094fc
-- 指令執行ok
[[email protected] ~]$ spark-shell
--本地
20/08/07 16:20:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = local[*], app id = local-1596788482324).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
[[email protected] ~]$ spark-shell --master spark://big-master2:7077
-- 分布式叢集 運作
20/08/07 16:24:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
------- 外編備注 ------
Spark 執行Spark任務的兩種方式:spark-submit和spark-shell
1.spark-submit方式:将jar上傳到叢集,然後到/bin目錄下通過spark-submit的方式,執行spark任務:
格式:
spark-submit --master spark的位址 --class 全類名 jar包位址 參數
舉個栗子:運作spark自帶的測試程式,計算pi的值
./spark-submit --master spark://node3:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 500
運作結果:
Pi is roughly 3.1414508628290174
2.spark-shell方式:相當于REPL工具,指令行工具,本身也是一個Application
2.1本地模式:不需要連接配接到Spark叢集,在本地直接運作,用于測試
啟動指令:bin/spark-shell 後面不寫任何參數,代表本地模式:
--本地
20/08/07 16:20:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = local[*], app id = local-1596788482324).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
2.2叢集模式
啟動指令:bin/spark-shell --master spark://.....
[[email protected] ~]$ spark-shell --master spark://big-master2:7077
-- 分布式叢集 運作
20/08/07 16:24:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
說明:
Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).
Spark session available as 'spark'.
Spark session : Spark2.0以後提供的,利用session可以通路所有spark元件(core sql..)
'spark' 'sc' 兩個對象,可以直接使用
舉個栗子:在Spark shell中 開發一個wordCount程式
(*)讀取一個本地檔案,将結果列印到螢幕上。
注意:示例必須隻有一個worker 且本地檔案與worker在同一台伺服器上。
scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
結果:
res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (hehehehehe,1), (I,2), (of,1), (the,1))
(*)讀取一個hdfs檔案,進行WordCount操作,并将結果寫回hdfs
scala> sc.textFile("hdfs://bigdata111:9000/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://bigdata111:9000/result")
說明:這裡textFile()裡的位址是HDFS上位址
spark任務執行完成之後,會把結果存放在hdfs上的result檔案夾裡:
檢視:
[[email protected] ~]$ hdfs dfs -ls /historyserverforSpark
Found 3 items
-rwxrwx--- 3 hadoop supergroup 1063090 2020-08-07 16:13 /historyserverforSpark/app-20200807161241-0003
-rwxrwx--- 3 hadoop supergroup 293 2020-08-07 16:24 /historyserverforSpark/app-20200807162442-0004.inprogress
-rwxrwx--- 3 hadoop supergroup 19771 2020-08-07 16:22 /historyserverforSpark/local-1596788482324.inprogress