天天看点

大数据开发之Spark 基础入门学习

集群相关

Cluster Manager指的是在集群上获取资源的外部服务,为每个spark application在集群中调度和分配资源的组件,目前有三种类型:

  • Standalone:Spark 原生的资源管理,由 Master 负责资源的分配
  • Apache Mesos:与 Hadoop MapReduce 兼容性良好的一种资源调度框架
  • Hadoop Yarn:主要是指的 Yarn 中的 ResourceManager

Worker 指集群中的工作节点,启动并运行executor进程,运行大数据培训​作业代码的节点

  • standalone模式下:Worker进程所在节点
  • yarn模式下: yarn的nodemanager进程所在的节点

Deploy Mode 分为两种模式,client和cluster,区别在于driver运行的位置

  • Client模式下driver运行在提交spark作业的机器上,
  • 可以实时看到详细的日志信息
  • 方便追踪和排查错误,用于测试
  • cluster模式下,spark application提交到cluster manager,cluster manager(比如master)负责在集群中某个节点上,启动driver进程,用于生产环境
  • 通常情况下driver和worker在同一个网络中是最好的,而client很可能就是driver worker分开布置,这样网络通信很耗时,cluster没有这样的问题

Spark分布式计算组成

Application

  • 用户编写的Spark程序,通过一个有main方法的类执行,完成一个计算任务的处理。
  • 它是由一个Driver程序和一组运行于Spark集群上的Executor组成

Driver

  • 运行main方法的Java虚拟机进程负责
  • 监听spark application的executor进程发来的通信和连接
  • 将工程jar发送到所有的executor进程中
  • driver调度task给executor执行
  • Driver与Cluster Manager、Worker协作完成
  • Application进程的启动
  • DAG划分
  • 计算任务封装
  • 分配task到executor上
  • 计算资源的分配等调度执行作业等
  • Driver调度task给executor执行,所以driver最好和spark集群在一片网络内,便以通信

Executor

  • 运行在worker节点上,负责执行作业的任务,并将数据保存在内存或磁盘中
  • 每个spark application,都有属于自己的executor进程,spark application不会共享一个executor进程
  • executor在整个spark application运行的生命周期内,executor可以动态增加/释放
  • executor使用多线程运行SparkContext分配过来的task,来一批task就执行一批

用户操作spark的入口

SparkContext是Spark的入口,负责连接Spark集群,创建RDD,累积量和广播量等

  • SparkContext是Spark的对外接口,负责向调用者提供Spark的各种功能
  • driver program通过SparkContext连接到集群管理器来实现对集群中任务的控制
  • 每个JVM只有一个SparkContext,一台服务器可以启动多个JVM

SparkSession

  • The entry point to programming Spark with the Dataset and DataFrame API.
  • 包含了SQLContext、HiveContext、sparkcontext

Spark计算切分相关

Job

  • 一个spark application可能会被分为多个job,每次调用Action时,逻辑上会生成一个Job
  • 一个Job包含了一个或多个Stage

Stage

  • 每个job都会划分为一个或多个stage(阶段),每个stage都会有对应的一批task(即一个taskset),分配到executor上去执行
  • Stage包括两类
  • ShuffleMapStage
  • ResultStage

TaskSet

  • 一组关联的,但相互之间没有Shuffle依赖关系的Task集合
  • Stage可以直接映射为TaskSet,一个TaskSet封装了一次需要运算的、具有相同处理逻辑的Task,
  • 这些Task可以并行计算,粗粒度的调度是以TaskSet为单位的。
  • 一个stage对应一个taskset

Task

  • driver发送到executor上执行的计算单元,每个task负责在一个阶段(stage),处理一小片数据,计算出对应的结果
  • Task是在物理节点上运行的基本单位,Task包含两类
  • ShuffleMapTask => 对应于Stage中ShuffleMapStage中的一个执行基本单元
  • ResultTask => 对应于Stage中ResultStage中的一个执行基本单元

Spark其他重要部分

数据结构

  • RDD
  • DataFrame 引入了schema和off-heap
  • DataSet 整合了rdd和dataframe的优点,支持结构化和非结构化数据,采用堆外内存存储,gc友好

核心调度器

  • DAGScheduler 根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler。
  • SparkEnv Spark 公共服务们相互交互,用来给 Spark 应用程序建立分布式计算平台的运行时环境
  • TaskScheduler 将Taskset提交给Worker node集群运行并返回结果。

Spark调度

提交方式

  • yarn-cluster模式,用于生产模式,driver运行在nodeManager,没有网卡流量激增问题,但查看log麻烦,调试不方便
  • Yarn-client模式,yarn-client用于测试,driver运行在本地客户端,负责调度application,会与yarn集群产生超大量的网络通信,从而导致网卡流量激增,yarn-client可以在本地看到所有log,方便调试
  • yarn-client vs yarn-cluster
  • yarn-client下,driver运行在spark-submit提交的机器上,ApplicationMaster只是相当于一个ExecutorLauncher,仅仅负责申请启动executor;driver负责具体调度
  • yarn-cluster下,ApplicationMaster是driver,ApplicationMaster负责具体调度

Spark通用任务执行过程

  • 用户通过 spark-submit 脚本提交应用
  • spark-submit 脚本启动Driver,调用用户定义的 main() 方法构建sparkConf和sparkContext对象,在sparkContext入口做了三件事,创建了
  • sparkEnv对象
  • TaskScheduler
  • DAGScheduler
  • Driver与cluster manager通信,申请资源以启动Executor;
  • cluster manager为Driver启动Executor;
  • 在用户应用中代码遇到对RDD的action算子操作的时候,触发一个job,这时就会
  • 调用DAGScheduler对象进行Stage划分,将划分好的stage按照分区生成一个一个的task,并且封装到TaskSet对象中
  • TaskSet提交到TaskScheduler,TaskScheduler按照提交过来的TaskSet,拿到一个序列化器,将TaskSet序列化,将序列化好的Task封装并且提交到worker
  • 任务在Executor中进行计算并保存结果;
  • 如果Driver的 main() 方法退出,或者调用了SparkContext.stop(),Driver会终止Executor,并且通过集群管理器释放资源。

spark 提交过程分析standalone

  • SparkContext连接到Master,向Master注册并申请资源
  • Master根据资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend
  • StandaloneExecutorBackend向SparkContext注册,建立Executor线程池
  • SparkContext将Applicaiton代码发送给StandaloneExecutorBackend
  • SparkContext解析Applicaiton代码,构建DAG图,
  • 提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生)
  • 以Stage(或者称为TaskSet)提交给Task Scheduler,
  • Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行
  • StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。
  • 所有Task完成后,SparkContext向Master注销,释放资源。

继续阅读