天天看点

spark 分布式部署

在大数据应用场景下,面对实时计算、处理流失数据及降低计算耗时等问题,通过Apache Spark 提供的计算引擎能很好地满足这些需求。S park 是一种基于内存的分布式计算,其核心为弹性分布式数据集( Resilient Distributed Dat asets ,简称RDD ),它支持多种数据来源,拥有容错机制,可以被缓存,并且支持并行操作,能够很好地用于数据挖掘和机器学习。

Spark 是专门为海量数据处理而设计的快速且通用的计算引擎, 支持多种编程语言 ( Java 、Scala 、Python 、R ),并拥有更快的计算速度。据官网数据统计,通过利用内存进行数据计算, Spark 的计算速度比Ha doop 中MapReduce 的计算速度快100 倍。

安装部署及使用

Spark 集群的安装部署并不复杂,需要配置的信息较少, 读者可以通过本节的学习,完成一个基于分布式Spark 集群的搭建。

在官网选择Spark 软件安装包时,需要注意Spark 和Hadoop 的版本匹配问题。

环境准备

  • 服务器集群

我用的CentOS-6.6版本的4个虚拟机,主机名为hadoop01、hadoop02、hadoop03、hadoop04,另外我会使用hadoop用户搭建集群(生产环境中root用户不是可以任意使用的)

  • spark安装包

下载地址:https://mirrors.aliyun.com/apache/spark/

我用的spark-2.2.0-bin-hadoop2.7.tgz

要根据自己机器中的hadoop版本选择对应的spark版本

1. 集群规划

. 详细步骤

(1) 把安装包上传到hadoop01服务器并解压

[[email protected] soft]$ tar zxvf spark-2.2.0-bin-hadoop2.7.tgz -C /home/hadoop/apps/ # 解压后如果感觉安装目录的名称太长可以修改一下 [[email protected] soft]$ cd /home/hadoop/apps/ [[email protected] apps]$ mv spark-2.2.0-bin-hadoop2.7 spark-2.2.0

(2) 修改spark-env.sh配置文件

# 把SPARK_HOME/conf/下的spark-env.sh.template文件复制为spark-env.sh [[email protected] apps]$ cd spark-2.2.0/conf [[email protected] conf]$ mv spark-env.sh.template spark-env.sh # 修改spark-env.sh配置文件,添加如下内容 [[email protected] conf]$ vim spark-env.sh # 配置JAVA_HOME,一般来说,不配置也可以,但是可能会出现问题,还是配上吧 export JAVA_HOME=/usr/local/java/jdk1.8.0_73 # 一般来说,spark任务有很大可能性需要去HDFS上读取文件,所以配置上 # 如果说你的spark就读取本地文件,也不需要yarn管理,不用配 export HADOOP_CONF_DIR=/home/hadoop/apps/hadoop-2.7.4/etc/hadoop # 设置Master的主机名 export SPARK_MASTER_HOST=hadoop01 # 提交Application的端口,默认就是这个,万一要改呢,改这里 export SPARK_MASTER_PORT=7077 # 每一个Worker最多可以使用的cpu core的个数,我虚拟机就一个... # 真实服务器如果有32个,你可以设置为32个 export SPARK_WORKER_CORES=1 # 每一个Worker最多可以使用的内存,我的虚拟机就2g # 真实服务器如果有128G,你可以设置为100G export SPARK_WORKER_MEMORY=1g

(3) 修改slaves配置文件,添加Worker的主机列表

[[email protected] conf]$ mv slaves.template slaves [[email protected] conf]$ vim slaves # 里面的内容原来为localhost hadoop01 hadoop02 hadoop03 hadoop04

(4) 把SPARK_HOME/sbin下的start-all.sh和stop-all.sh这两个文件重命名

比如分别把这两个文件重命名为start-spark-all.sh和stop-spark-all.sh

原因:

如果集群中也配置HADOOP_HOME,那么在HADOOP_HOME/sbin目录下也有start-all.sh和stop-all.sh这两个文件,当你执行这两个文件,系统不知道是操作hadoop集群还是spark集群。修改后就不会冲突了,当然,不修改的话,你需要进入它们的sbin目录下执行这些文件,这肯定就不会发生冲突了。我们配置SPARK_HOME主要也是为了执行其他spark命令方便。

[[email protected] conf]$ cd ../sbin [[email protected] sbin]$ mv start-all.sh start-spark-all.sh [[email protected] sbin]$ mv stop-all.sh stop-spark-all.sh

(5) 把spark安装包分发给其他节点

[[email protected] apps]$ scp -r spark-2.2.0 hadoop02:`pwd` [[email protected] apps]$ scp -r spark-2.2.0 hadoop03:`pwd` [[email protected] apps]$ scp -r spark-2.2.0 hadoop04:`pwd`

(6) 在集群所有节点中配置SPARK_HOME环境变量

[[email protected] conf]$ vim ~/.bash_profile export SPARK_HOME=/home/hadoop/apps/spark-2.2.0 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin [[email protected] conf]$ source ~/.bash_profile # 其他节点也都配置...

(7) 在spark master节点启动spark集群

# 注意,如果你没有执行第4步,一定要进入SPARK_HOME/sbin目录下执行这个命令 # 或者你在Master节点分别执行start-master.sh和start-slaves.sh [[email protected] conf]$ start-spark-all.sh

注意:

  • 如果你配置了HADOOP_CONF_DIR,在启动spark集群之前,先启动hadoop集群

(8) 验证

spark完全分布式集群搭建成功!

### 实际测试 ###

[[email protected] ~]$ cat /etc/hosts

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

## bigdata cluster ##

192.168.41.20 big-master1 #bigdata1 namenode1,zookeeper,resourcemanager Haproxy-master

192.168.41.21 big-master2 #bigdata2 namenode2,zookeeper,slave,resourcemanager haproxy-standby spark0-master

192.168.41.22 big-slave01 #bigdata3 datanode1,zookeeper,slave hive1 spark1-work

192.168.41.25 big-slave02 #bigdata4 datanode2,zookeeper,slave hive2 spark2-work

192.168.41.27 big-slave03 #bigdata5 datanode3,zookeeper,slave hive3 spark3-work

192.168.41.17 tidb05.500.com #hive mysql

######

[[email protected] ~]$ cat /etc/profile

## scala ##

export SCALA_HOME=/usr/local/scala-2.13.3

export PATH=$SCALA_HOME/bin:$PATH

## Python3.7 ##

export PYTHON3_HOME=/usr/local/python3

export PATH=$PYTHON3_HOME/bin:$PATH

## spark ##

export SPARK_HOME=/usr/local/spark

export PATH=$SPARK_HOME/bin:$PATH

######

[[email protected] ~]$ cd /usr/local/spark/

[[email protected] spark]$ pwd

/usr/local/spark

[[email protected] spark]$ ls

bin conf data examples jars kubernetes LICENSE licenses logs NOTICE python R README.md RELEASE sbin yarn

[[email protected] spark]$ cd conf/

[[email protected] conf]$ ls

docker.properties.template log4j.properties.template slaves spark-defaults.conf spark-env.sh

fairscheduler.xml.template metrics.properties.template slaves.template spark-defaults.conf.template spark-env.sh.template

[[email protected] conf]$ cat slaves

#

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements. See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

#

# A Spark Worker will be started on each of the machines listed below.

big-slave01

big-slave02

big-slave03

[[email protected] conf]$ cat spark-env.sh

#!/usr/bin/env bash

export JAVA_HOME=/usr/local/jdk1.8.0_251

export HADOOP_HOME=/usr/local/hadoop

export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

export JAVA_LIBRARY_PATH=/usr/local/hadoop/lib/native

export SCALA_HOME=/usr/local/scala-2.13.3

export PYTHON3_HOME=/usr/local/python3

export SPARK_MASTER_HOST=big-master2

export SPARK_LOCAL_DIRS=/usr/local/spark

export SPARK_EXECUTOR_MEMORY=2g

export SPARK_WORKER_MEMORY=2g

export SPARK_WORKER_CORES=2

export SPARK_WORKER_INSTANCES=1

[[email protected] conf]$ cat spark-defaults.conf

#

# Licensed to the Apache Software Foundation (ASF) under one or more

# contributor license agreements. See the NOTICE file distributed with

# this work for additional information regarding copyright ownership.

# The ASF licenses this file to You under the Apache License, Version 2.0

# (the "License"); you may not use this file except in compliance with

# the License. You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

#

# Default system properties included when running spark-submit.

# This is useful for setting default environmental settings.

# Example:

# spark.master spark://master:7077

# spark.eventLog.enabled true

# spark.eventLog.dir hdfs://namenode:8021/directory

# spark.serializer org.apache.spark.serializer.KryoSerializer

# spark.driver.memory 5g

# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

spark.eventLog.enabled true

spark.eventLog.dir hdfs://big-master1:9000/historyserverforSpark

spark.yarn.historyServer.address big-master2:18080

spark.history.fs.logDirectory hdfs://big-master1:9000/historyserverforSpark

spark.speculation true

-- master2 节点 把所有文件拷贝至 slave 库。

###

启动:

cd /usr/local/spark/sbin

[[email protected] sbin]$ ls

slaves.sh start-all.sh start-mesos-shuffle-service.sh start-thriftserver.sh stop-mesos-dispatcher.sh stop-slaves.sh

spark-config.sh start-history-server.sh start-shuffle-service.sh stop-all.sh stop-mesos-shuffle-service.sh stop-thriftserver.sh

spark-daemon.sh start-master.sh start-slave.sh stop-history-server.sh stop-shuffle-service.sh

spark-daemons.sh start-mesos-dispatcher.sh start-slaves.sh stop-master.sh stop-slave.sh

验证:

[[email protected] ~]$ jps

20032 NameNode

20116 JournalNode

20324 DFSZKFailoverController

31540 HMaster

2988 Master

7788 Jps

18830 QuorumPeerMain

2462 ResourceManager

[[email protected] ~]# jps

10161 NodeManager

28338 HRegionServer

10546 RunJar

8196 Jps

7702 QuorumPeerMain

8583 DataNode

8108 Worker

8686 JournalNode

10638 RunJar

[[email protected] ~]# jps

7168 Worker

8322 Jps

5187 DataNode

8581 RunJar

6697 NodeManager

4362 QuorumPeerMain

5290 JournalNode

25869 HRegionServer

[[email protected] ~]# jps

4562 QuorumPeerMain

5442 DataNode

26004 HRegionServer

6389 RunJar

6903 NodeManager

5545 JournalNode

26895 Worker

27375 Jps

登录: http://big-master2:8088

问题一:

在运行 [[email protected] ~]$ spark-submit --master spark://big-master2:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark/examples/jars/spark-examples_2.11-2.4.5.jar 500 时 提示报错:

java.io.FileNotFoundException: File does not exist: hdfs://big-master1:9000/historyserverforSpark

解决办法:

登录hdoop 系统,创建对应的 目录:

[[email protected] ~]$ hdfs dfs -mkdir -p /historyserverforSpark

[[email protected] ~]$ hdfs dfs -chmod 777 /historyserverforSpark

[[email protected] ~]$ hdfs dfs -ls /

Found 11 items

drwxr-xr-x - hadoop supergroup 0 2020-05-26 16:17 /data

drwxr-xr-x - hadoop supergroup 0 2020-06-04 23:41 /hbase

drwxrwxrwx - hadoop supergroup 0 2020-08-07 16:11 /historyserverforSpark

drwxr-xr-x - hadoop supergroup 0 2020-05-24 02:53 /sqoop-mysql

drwxr-xr-x - hadoop supergroup 0 2020-05-24 03:04 /sqoop-mysql11

drwxr-xr-x - hadoop supergroup 0 2020-05-24 02:59 /sqoop-mysql22

drwxr-xr-x - hadoop supergroup 0 2020-05-15 14:59 /test

drwxr-xr-x - hadoop supergroup 0 2020-05-18 17:15 /test01

drwx------ - hadoop supergroup 0 2020-06-11 12:22 /tmp

drwxr-xr-x - hadoop supergroup 0 2020-06-11 12:21 /user

drwxr-xr-x - root supergroup 0 2020-05-26 23:08 /var

再执行: [[email protected]big-master2 ~]$ spark-submit --master spark://big-master2:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark/examples/jars/spark-examples_2.11-2.4.5.jar 500

.......

.......

20/08/07 16:13:09 INFO scheduler.TaskSetManager: Finished task 495.0 in stage 0.0 (TID 495) in 170 ms on 192.168.41.22 (executor 1) (500/500)

20/08/07 16:13:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

20/08/07 16:13:09 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 22.098 s

20/08/07 16:13:09 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 22.793146 s

Pi is roughly 3.1413011028260223

20/08/07 16:13:10 INFO server.AbstractConnector: Stopped [email protected]{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}

20/08/07 16:13:10 INFO ui.SparkUI: Stopped Spark web UI at http://big-master2:4040

20/08/07 16:13:10 INFO cluster.StandaloneSchedulerBackend: Shutting down all executors

20/08/07 16:13:10 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down

20/08/07 16:13:11 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

20/08/07 16:13:11 INFO memory.MemoryStore: MemoryStore cleared

20/08/07 16:13:11 INFO storage.BlockManager: BlockManager stopped

20/08/07 16:13:11 INFO storage.BlockManagerMaster: BlockManagerMaster stopped

20/08/07 16:13:11 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

20/08/07 16:13:12 INFO spark.SparkContext: Successfully stopped SparkContext

20/08/07 16:13:12 INFO util.ShutdownHookManager: Shutdown hook called

20/08/07 16:13:12 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1f127bd1-743a-472f-8a6f-c65256e6cae8

20/08/07 16:13:12 INFO util.ShutdownHookManager: Deleting directory /usr/local/spark/spark-805632c9-774a-4e29-9481-ed0a071094fc

-- 命令执行ok

[[email protected] ~]$ spark-shell

--本地

20/08/07 16:20:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://big-master2:4040

Spark context available as 'sc' (master = local[*], app id = local-1596788482324).

Spark session available as 'spark'.

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 2.4.5

/_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)

Type in expressions to have them evaluated.

Type :help for more information.

scala>

[[email protected] ~]$ spark-shell --master spark://big-master2:7077

-- 分布式集群 运行

20/08/07 16:24:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://big-master2:4040

Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).

Spark session available as 'spark'.

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 2.4.5

/_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)

Type in expressions to have them evaluated.

Type :help for more information.

scala>

------- 外编备注 ------

Spark 执行Spark任务的两种方式:spark-submit和spark-shell

1.spark-submit方式:将jar上传到集群,然后到/bin目录下通过spark-submit的方式,执行spark任务:

格式:

spark-submit --master spark的地址 --class 全类名 jar包地址 参数

举个栗子:运行spark自带的测试程序,计算pi的值

./spark-submit --master spark://node3:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 500

运行结果:

Pi is roughly 3.1414508628290174

2.spark-shell方式:相当于REPL工具,命令行工具,本身也是一个Application

2.1本地模式:不需要连接到Spark集群,在本地直接运行,用于测试

启动命令:bin/spark-shell 后面不写任何参数,代表本地模式:

--本地

20/08/07 16:20:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://big-master2:4040

Spark context available as 'sc' (master = local[*], app id = local-1596788482324).

Spark session available as 'spark'.

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 2.4.5

/_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)

Type in expressions to have them evaluated.

Type :help for more information.

scala>

2.2集群模式

启动命令:bin/spark-shell --master spark://.....

[[email protected] ~]$ spark-shell --master spark://big-master2:7077

-- 分布式集群 运行

20/08/07 16:24:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Spark context Web UI available at http://big-master2:4040

Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).

Spark session available as 'spark'.

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 2.4.5

/_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)

Type in expressions to have them evaluated.

Type :help for more information.

scala>

说明:

Spark context available as 'sc' (master = spark://big-master2:7077, app id = app-20200807162442-0004).

Spark session available as 'spark'.

Spark session : Spark2.0以后提供的,利用session可以访问所有spark组件(core sql..)

'spark' 'sc' 两个对象,可以直接使用

举个栗子:在Spark shell中 开发一个wordCount程序

(*)读取一个本地文件,将结果打印到屏幕上。

注意:示例必须只有一个worker 且本地文件与worker在同一台服务器上。

scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

结果:

res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (hehehehehe,1), (I,2), (of,1), (the,1))

(*)读取一个hdfs文件,进行WordCount操作,并将结果写回hdfs

scala> sc.textFile("hdfs://bigdata111:9000/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://bigdata111:9000/result")

说明:这里textFile()里的地址是HDFS上地址

spark任务执行完成之后,会把结果存放在hdfs上的result文件夹里:

查看:

[[email protected] ~]$ hdfs dfs -ls /historyserverforSpark

Found 3 items

-rwxrwx--- 3 hadoop supergroup 1063090 2020-08-07 16:13 /historyserverforSpark/app-20200807161241-0003

-rwxrwx--- 3 hadoop supergroup 293 2020-08-07 16:24 /historyserverforSpark/app-20200807162442-0004.inprogress

-rwxrwx--- 3 hadoop supergroup 19771 2020-08-07 16:22 /historyserverforSpark/local-1596788482324.inprogress

继续阅读