在大数据应用场景下,面对实时计算、处理流失数据及降低计算耗时等问题,通过Apache Spark 提供的计算引擎能很好地满足这些需求。S park 是一种基于内存的分布式计算,其核心为弹性分布式数据集( Resilient Distributed Dat asets ,简称RDD ),它支持多种数据来源,拥有容错机制,可以被缓存,并且支持并行操作,能够很好地用于数据挖掘和机器学习。
Spark 是专门为海量数据处理而设计的快速且通用的计算引擎, 支持多种编程语言 ( Java 、Scala 、Python 、R ),并拥有更快的计算速度。据官网数据统计,通过利用内存进行数据计算, Spark 的计算速度比Ha doop 中MapReduce 的计算速度快100 倍。
安装部署及使用
Spark 集群的安装部署并不复杂,需要配置的信息较少, 读者可以通过本节的学习,完成一个基于分布式Spark 集群的搭建。
在官网选择Spark 软件安装包时,需要注意Spark 和Hadoop 的版本匹配问题。
环境准备
- 服务器集群
我用的CentOS-6.6版本的4个虚拟机,主机名为hadoop01、hadoop02、hadoop03、hadoop04,另外我会使用hadoop用户搭建集群(生产环境中root用户不是可以任意使用的)
- spark安装包
下载地址:https://mirrors.aliyun.com/apache/spark/
我用的spark-2.2.0-bin-hadoop2.7.tgz
要根据自己机器中的hadoop版本选择对应的spark版本
1. 集群规划
. 详细步骤
(1) 把安装包上传到hadoop01服务器并解压
[[email protected] soft]$ tar zxvf spark-2.2.0-bin-hadoop2.7.tgz -C /home/hadoop/apps/ # 解压后如果感觉安装目录的名称太长可以修改一下 [[email protected] soft]$ cd /home/hadoop/apps/ [[email protected] apps]$ mv spark-2.2.0-bin-hadoop2.7 spark-2.2.0
(2) 修改spark-env.sh配置文件
# 把SPARK_HOME/conf/下的spark-env.sh.template文件复制为spark-env.sh [[email protected] apps]$ cd spark-2.2.0/conf [[email protected] conf]$ mv spark-env.sh.template spark-env.sh # 修改spark-env.sh配置文件,添加如下内容 [[email protected] conf]$ vim spark-env.sh # 配置JAVA_HOME,一般来说,不配置也可以,但是可能会出现问题,还是配上吧 export JAVA_HOME=/usr/local/java/jdk1.8.0_73 # 一般来说,spark任务有很大可能性需要去HDFS上读取文件,所以配置上 # 如果说你的spark就读取本地文件,也不需要yarn管理,不用配 export HADOOP_CONF_DIR=/home/hadoop/apps/hadoop-2.7.4/etc/hadoop # 设置Master的主机名 export SPARK_MASTER_HOST=hadoop01 # 提交Application的端口,默认就是这个,万一要改呢,改这里 export SPARK_MASTER_PORT=7077 # 每一个Worker最多可以使用的cpu core的个数,我虚拟机就一个... # 真实服务器如果有32个,你可以设置为32个 export SPARK_WORKER_CORES=1 # 每一个Worker最多可以使用的内存,我的虚拟机就2g # 真实服务器如果有128G,你可以设置为100G export SPARK_WORKER_MEMORY=1g
(3) 修改slaves配置文件,添加Worker的主机列表
[[email protected] conf]$ mv slaves.template slaves [[email protected] conf]$ vim slaves # 里面的内容原来为localhost hadoop01 hadoop02 hadoop03 hadoop04
(4) 把SPARK_HOME/sbin下的start-all.sh和stop-all.sh这两个文件重命名
比如分别把这两个文件重命名为start-spark-all.sh和stop-spark-all.sh
原因:
如果集群中也配置HADOOP_HOME,那么在HADOOP_HOME/sbin目录下也有start-all.sh和stop-all.sh这两个文件,当你执行这两个文件,系统不知道是操作hadoop集群还是spark集群。修改后就不会冲突了,当然,不修改的话,你需要进入它们的sbin目录下执行这些文件,这肯定就不会发生冲突了。我们配置SPARK_HOME主要也是为了执行其他spark命令方便。
[[email protected] conf]$ cd ../sbin [[email protected] sbin]$ mv start-all.sh start-spark-all.sh [[email protected] sbin]$ mv stop-all.sh stop-spark-all.sh
(5) 把spark安装包分发给其他节点
[[email protected] apps]$ scp -r spark-2.2.0 hadoop02:`pwd` [[email protected] apps]$ scp -r spark-2.2.0 hadoop03:`pwd` [[email protected] apps]$ scp -r spark-2.2.0 hadoop04:`pwd`
(6) 在集群所有节点中配置SPARK_HOME环境变量
[[email protected] conf]$ vim ~/.bash_profile export SPARK_HOME=/home/hadoop/apps/spark-2.2.0 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin [[email protected] conf]$ source ~/.bash_profile # 其他节点也都配置...
(7) 在spark master节点启动spark集群
# 注意,如果你没有执行第4步,一定要进入SPARK_HOME/sbin目录下执行这个命令 # 或者你在Master节点分别执行start-master.sh和start-slaves.sh [[email protected] conf]$ start-spark-all.sh
注意:
- 如果你配置了HADOOP_CONF_DIR,在启动spark集群之前,先启动hadoop集群
(8) 验证
spark完全分布式集群搭建成功!
### 实际测试 ###
[[email protected] ~]$ cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
## bigdata cluster ##
192.168.41.20 big-master1 #bigdata1 namenode1,zookeeper,resourcemanager Haproxy-master
192.168.41.21 big-master2 #bigdata2 namenode2,zookeeper,slave,resourcemanager haproxy-standby spark0-master
192.168.41.22 big-slave01 #bigdata3 datanode1,zookeeper,slave hive1 spark1-work
192.168.41.25 big-slave02 #bigdata4 datanode2,zookeeper,slave hive2 spark2-work
192.168.41.27 big-slave03 #bigdata5 datanode3,zookeeper,slave hive3 spark3-work
192.168.41.17 tidb05.500.com #hive mysql
######
[[email protected] ~]$ cat /etc/profile
## scala ##
export SCALA_HOME=/usr/local/scala-2.13.3
export PATH=$SCALA_HOME/bin:$PATH
## Python3.7 ##
export PYTHON3_HOME=/usr/local/python3
export PATH=$PYTHON3_HOME/bin:$PATH
## spark ##
export SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
######
[[email protected] ~]$ cd /usr/local/spark/
[[email protected] spark]$ pwd
/usr/local/spark
[[email protected] spark]$ ls
bin conf data examples jars kubernetes LICENSE licenses logs NOTICE python R README.md RELEASE sbin yarn
[[email protected] spark]$ cd conf/
[[email protected] conf]$ ls
docker.properties.template log4j.properties.template slaves spark-defaults.conf spark-env.sh
fairscheduler.xml.template metrics.properties.template slaves.template spark-defaults.conf.template spark-env.sh.template
[[email protected] conf]$ cat slaves
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
big-slave01
big-slave02
big-slave03
[[email protected] conf]$ cat spark-env.sh
#!/usr/bin/env bash
export JAVA_HOME=/usr/local/jdk1.8.0_251
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export JAVA_LIBRARY_PATH=/usr/local/hadoop/lib/native
export SCALA_HOME=/usr/local/scala-2.13.3
export PYTHON3_HOME=/usr/local/python3
export SPARK_MASTER_HOST=big-master2
export SPARK_LOCAL_DIRS=/usr/local/spark
export SPARK_EXECUTOR_MEMORY=2g
export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
[[email protected] conf]$ cat spark-defaults.conf
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
spark.eventLog.enabled true
spark.eventLog.dir hdfs://big-master1:9000/historyserverforSpark
spark.yarn.historyServer.address big-master2:18080
spark.history.fs.logDirectory hdfs://big-master1:9000/historyserverforSpark
spark.speculation true
-- master2 节点 把所有文件拷贝至 slave 库。
###
启动:
cd /usr/local/spark/sbin
[[email protected] sbin]$ ls
slaves.sh start-all.sh start-mesos-shuffle-service.sh start-thriftserver.sh stop-mesos-dispatcher.sh stop-slaves.sh
spark-config.sh start-history-server.sh start-shuffle-service.sh stop-all.sh stop-mesos-shuffle-service.sh stop-thriftserver.sh
spark-daemon.sh start-master.sh start-slave.sh stop-history-server.sh stop-shuffle-service.sh
spark-daemons.sh start-mesos-dispatcher.sh start-slaves.sh stop-master.sh stop-slave.sh
验证:
[[email protected] ~]$ jps
20032 NameNode
20116 JournalNode
20324 DFSZKFailoverController
31540 HMaster
2988 Master
7788 Jps
18830 QuorumPeerMain
2462 ResourceManager
[[email protected] ~]# jps
10161 NodeManager
28338 HRegionServer
10546 RunJar
8196 Jps
7702 QuorumPeerMain
8583 DataNode
8108 Worker
8686 JournalNode
10638 RunJar
[[email protected] ~]# jps
7168 Worker
8322 Jps
5187 DataNode
8581 RunJar
6697 NodeManager
4362 QuorumPeerMain
5290 JournalNode
25869 HRegionServer
[[email protected] ~]# jps
4562 QuorumPeerMain
5442 DataNode
26004 HRegionServer
6389 RunJar
6903 NodeManager
5545 JournalNode
26895 Worker
27375 Jps
登录: http://big-master2:8088
问题一:
在运行 [[email protected] ~]$ spark-submit --master spark://big-master2:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark/examples/jars/spark-examples_2.11-2.4.5.jar 500 时 提示报错:
java.io.FileNotFoundException: File does not exist: hdfs://big-master1:9000/historyserverforSpark
解决办法:
登录hdoop 系统,创建对应的 目录:
[[email protected] ~]$ hdfs dfs -mkdir -p /historyserverforSpark
[[email protected] ~]$ hdfs dfs -chmod 777 /historyserverforSpark
[[email protected] ~]$ hdfs dfs -ls /
Found 11 items
drwxr-xr-x - hadoop supergroup 0 2020-05-26 16:17 /data
drwxr-xr-x - hadoop supergroup 0 2020-06-04 23:41 /hbase
drwxrwxrwx - hadoop supergroup 0 2020-08-07 16:11 /historyserverforSpark
drwxr-xr-x - hadoop supergroup 0 2020-05-24 02:53 /sqoop-mysql
drwxr-xr-x - hadoop supergroup 0 2020-05-24 03:04 /sqoop-mysql11
drwxr-xr-x - hadoop supergroup 0 2020-05-24 02:59 /sqoop-mysql22
drwxr-xr-x - hadoop supergroup 0 2020-05-15 14:59 /test
drwxr-xr-x - hadoop supergroup 0 2020-05-18 17:15 /test01
drwx------ - hadoop supergroup 0 2020-06-11 12:22 /tmp
drwxr-xr-x - hadoop supergroup 0 2020-06-11 12:21 /user
drwxr-xr-x - root supergroup 0 2020-05-26 23:08 /var
再执行: [[email protected]big-master2 ~]$ spark-submit --master spark://big-master2:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark/examples/jars/spark-examples_2.11-2.4.5.jar 500
.......
.......
20/08/07 16:13:09 INFO scheduler.TaskSetManager: Finished task 495.0 in stage 0.0 (TID 495) in 170 ms on 192.168.41.22 (executor 1) (500/500)
20/08/07 16:13:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/08/07 16:13:09 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 22.098 s
20/08/07 16:13:09 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 22.793146 s
Pi is roughly 3.1413011028260223
20/08/07 16:13:10 INFO server.AbstractConnector: Stopped [email protected]{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
20/08/07 16:13:10 INFO ui.SparkUI: Stopped Spark web UI at http://big-master2:4040
20/08/07 16:13:10 INFO cluster.StandaloneSchedulerBackend: Shutting down all executors
20/08/07 16:13:10 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
20/08/07 16:13:11 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/08/07 16:13:11 INFO memory.MemoryStore: MemoryStore cleared
20/08/07 16:13:11 INFO storage.BlockManager: BlockManager stopped
20/08/07 16:13:11 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
20/08/07 16:13:11 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/08/07 16:13:12 INFO spark.SparkContext: Successfully stopped SparkContext
20/08/07 16:13:12 INFO util.ShutdownHookManager: Shutdown hook called
20/08/07 16:13:12 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1f127bd1-743a-472f-8a6f-c65256e6cae8
20/08/07 16:13:12 INFO util.ShutdownHookManager: Deleting directory /usr/local/spark/spark-805632c9-774a-4e29-9481-ed0a071094fc
-- 命令执行ok
[[email protected] ~]$ spark-shell
--本地
20/08/07 16:20:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = local[*], app id = local-1596788482324).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
[[email protected] ~]$ spark-shell --master spark://big-master2:7077
-- 分布式集群 运行
20/08/07 16:24:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
------- 外编备注 ------
Spark 执行Spark任务的两种方式:spark-submit和spark-shell
1.spark-submit方式:将jar上传到集群,然后到/bin目录下通过spark-submit的方式,执行spark任务:
格式:
spark-submit --master spark的地址 --class 全类名 jar包地址 参数
举个栗子:运行spark自带的测试程序,计算pi的值
./spark-submit --master spark://node3:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 500
运行结果:
Pi is roughly 3.1414508628290174
2.spark-shell方式:相当于REPL工具,命令行工具,本身也是一个Application
2.1本地模式:不需要连接到Spark集群,在本地直接运行,用于测试
启动命令:bin/spark-shell 后面不写任何参数,代表本地模式:
--本地
20/08/07 16:20:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = local[*], app id = local-1596788482324).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
2.2集群模式
启动命令:bin/spark-shell --master spark://.....
[[email protected] ~]$ spark-shell --master spark://big-master2:7077
-- 分布式集群 运行
20/08/07 16:24:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://big-master2:4040
Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
说明:
Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).
Spark session available as 'spark'.
Spark session : Spark2.0以后提供的,利用session可以访问所有spark组件(core sql..)
'spark' 'sc' 两个对象,可以直接使用
举个栗子:在Spark shell中 开发一个wordCount程序
(*)读取一个本地文件,将结果打印到屏幕上。
注意:示例必须只有一个worker 且本地文件与worker在同一台服务器上。
scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
结果:
res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (hehehehehe,1), (I,2), (of,1), (the,1))
(*)读取一个hdfs文件,进行WordCount操作,并将结果写回hdfs
scala> sc.textFile("hdfs://bigdata111:9000/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://bigdata111:9000/result")
说明:这里textFile()里的地址是HDFS上地址
spark任务执行完成之后,会把结果存放在hdfs上的result文件夹里:
查看:
[[email protected] ~]$ hdfs dfs -ls /historyserverforSpark
Found 3 items
-rwxrwx--- 3 hadoop supergroup 1063090 2020-08-07 16:13 /historyserverforSpark/app-20200807161241-0003
-rwxrwx--- 3 hadoop supergroup 293 2020-08-07 16:24 /historyserverforSpark/app-20200807162442-0004.inprogress
-rwxrwx--- 3 hadoop supergroup 19771 2020-08-07 16:22 /historyserverforSpark/local-1596788482324.inprogress