天天看点

二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流

Flink运行架构

  • 一、Flink运行组件
    • 1、作业管理器(JobManager)
    • 2、任务管理器(TaskManager)
    • 3、资源管理器(ResourceManager)
    • 4、分发器(Dispatcher)
  • 二、任务执行流程
    • 1.任务提交流程
    • 2.任务提交流程(with Yarn)
    • 3、任务调度原理
    • 4、TaskManager和Taskslot关系
      • 4.1、对slot(插槽),以及core,partition,Parallelism(并行度)理解。
  • 三、程序与数据流
    • 1、Dataflow
    • 2、执行图(ExecutionGraph)
    • 3、任务链(Operator Chains)
    • 4、并行度(Parallelism)

一、Flink运行组件

  • 作业管理器(JobManager)
  • 任务管理器(TaskManager)
  • 资源管理器(ResourceManager)
  • 分发器(Dispacher)

同时运行在一个Master进程中,以上管理器都为线程。

1、作业管理器(JobManager)

控制一个应用程序执行的主进程,

  • 每一个flink程序都会被一个不同的JobManager所控制执行。相当于一段flink的程序代码。
  • 一个JobManager会接收一个需要执行的应用程序,这个应用程序中包含:作业图(JobGraph),逻辑数据流图(Local dataFlow graph),和打包的所有的类,库,以及其他所需要对应的jar包。
  • JobManager会将获取的JobGraph转换为对应的物理层面的数据流图,也就是ExecutionGraph(执行图),执行图中包含了所有可以并行执行的任务。
  • JobManager会向ResourceManager申请此次任务所需要的资源,该资源即为TaskManager上的插槽(slot),获取资源之后,会将得到的ExecutionGraph发送到对应的TaskManager上。
  • JobManager在发送执行图之后,会负责对中央协调的操作。例如对于检查点checkpoints的协调。

2、任务管理器(TaskManager)

是Flink运行中,执行任务的工作进程。

  • flink中会有多个TaskManager运行。每一个TaskManager上的插槽(slot)限制着每个TaskManager能够执行任务的数量。
  • 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
  • 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。

3、资源管理器(ResourceManager)

主要负责管理任务管理器(TaskManager)的插槽(slot)

  • TaskManager 插槽是Flink中定义的处理资源单元。
  • Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及Standalone部署,一般为yarn。
  • 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果Flink的资源管理器没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台(YARN的资源管理器)发起会话,以提供启动TaskManager进程的容器。

4、分发器(Dispatcher)

  • 可以跨作业运行,它为应用提交提供了REST接口。
  • 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
  • 分发器也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
  • 分发器在架构中可能并不是必需的,这取决于应用提交运行的方式。

二、任务执行流程

1.任务提交流程

二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流

个人理解:

  1. 首先App提交应用,两种方式。①通过flink run命令提交,该提交方式不经过Dispatcher,直接提交。②通过flink-web页面8081端口进行提交,会经过Dispatcher。
  2. 提交的应用会提交到JobManager上,JobManager收到应用后,向ResourceManager请求资源(slots),同时TaskManager启动,并向ResourceManager上请求注册slots。
  3. TaskManager向JobManager提供执行该应用所需要的资源。
  4. JobManager将该任务分配到对应的TaskManager的插槽中(slots),任务在TaskManagre的插槽中执行。

2.任务提交流程(with Yarn)

二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流

每次JobManager收到作业时,都会向ResourceManager请求资源,当ResourceManager资源不够时,会向yarn集群申请资源。

3、任务调度原理

二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流

代码(流程图) -> StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图

  1. 我们写的flink代码会在打包编译的过程中,会存在优化,生成一个作业图。
  2. 然后通过分发器(Dispatcher)将作业图提交到JobManager。
  3. 通过调度器(scheduler)将任务调度到任务管理器(TaskManager)上执行。
  4. 每一个任务管理器都有对应的插槽(配置文件可以指定),每个任务管理器都是一个jvm进程,而每个任务插槽为一个线程。
  5. flink中通信模块用的是Actor System。

4、TaskManager和Taskslot关系

二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流
  • Flink 中每一个 TaskManager 都是一个JVM进程,它中的一个或多个 subtask可能会在独立的线程上执行。
  • 为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
    二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流
  • 默认情况下,Flink 允许子任务共享 slot。 这样的结果是,一个 slot 可以保存作业的整个管道。
  • Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力

4.1、对slot(插槽),以及core,partition,Parallelism(并行度)理解。

  • 对全局设置并行度就是全局的并行度,对单个算子设置并行度,只作用于单个算子,一次程序申请的插槽的个数是max(全局的并行度,单个算子的并行度)。

三、程序与数据流

1、Dataflow

二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流
  • flink所有的程序都是由三部分组成:Source,transform,Sink。
  • Source:读取数据源。
  • transform:通过对应的算子对数据进行转换和处理。
  • Sink:对流的输出。

代码中还有,程序开始时的获取开发环境,程序完成后,需要对程序进行执行(对于代码而言)。

flink上运行的程序会被映射成一个逻辑数据流(dataflow),每一个dataflow包括三个部分source,transform,sink,

每一个dataflow以一个或多个source开始,以一个或多个sink结束。类似DAG(有向无环图)。

大部分情况是一对一。

2、执行图(ExecutionGraph)

flink执行图可以分为四层: StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图

  1. StreamGraph(流图):用户通过StreamAPI编写程序生成的最初的图,用来表示程序的拓扑结构。

    真实存在的数据结构:全类名:org.apache.flink.streaming.api.graph.StreamGraph

//源码中的StreamGraph(流图)

! wordcount => env.execute();
	@ 1681行 execute(DEFAULT_JOB_NAME);
		# 1699行 return execute(getStreamGraph(jobName));
			$ 1848行 return getStreamGraph(jobName, true);
				^ 1863行 StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();	

           
  1. JobGraph(工作图):在编译打包时生成。StreamGraph优化后生成JobGraph,提交到JobManager,主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点。也就是满足后面的任务链。

    真实存在的数据结构:全类名:org.apache.flink.runtime.jobgraph.JobGraph

//源码中的JobGraph(工作图)

! wordcount => env.execute();
	@ 1681行 execute(DEFAULT_JOB_NAME);
	 # 1699行 return execute(getStreamGraph(jobName)); 
	  $ 1713行 final JobClient jobClient = executeAsync(streamGraph);
	   ^ 1812行 .execute(streamGraph, configuration);  //将StreamGraph作为参数传入
	    * 15行    CompletableFuture<JobClient> execute(Pipeline var1, Configuration var2) throws Exception;的实现LocalExecutor.class中
	     《 51行 JobGraph jobGraph = this.getJobGraph(pipeline, effectiveConfig);//将StreamGraph转换为JobGraph
	      ; 63行 return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
	       [ 27行 JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
	        ] 18行 return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);//将StreamGraph转换为JobGraph(pipeline就是相当于StreamGraph,底层会强转至StreamGraph)
	         + 进入该方法的实现类:StreamGraphTranslator
	          - 25行  StreamGraph streamGraph = (StreamGraph)pipeline;//pipeline强制转换为StreamGraph
			   = 26行 return streamGraph.getJobGraph((JobID)null);
			   	/ 850行 return StreamingJobGraphGenerator.createJobGraph(this, jobID);
			   	 》 109行 return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); // 中进入createJobGraph
			   	  < 169行 setChaining(hashes, legacyHashes);//核心 
			   	  个人理解是将满足需求的算子的id赋予相同的hash-id值。


           
  1. ExecutionGraph(执行图):JobGraph通过JobManager生成ExecutionGraph。

    真实存在的数据结构:全类名:org.apache.flink.runtime.executiongraph.ExecutionGraph

  2. 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
    二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流

3、任务链(Operator Chains)

  • Flink 采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
  • 相同并行度的 one-to-one 操作,Flink 这样相连的算子链接在一起形成一个 task,原来的算子成为里面的 subtask
  • 并行度相同、并且是 one-to-one 操作,两个条件缺一不可(例如filter算子和map算子,并行度相同,没有shuffle操作,类似spark中的窄依赖,合成一起放到同一个taskslot中执行)
    二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流

4、并行度(Parallelism)

二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流
  • 一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度。
二、Flink运行架构一、Flink运行组件二、任务执行流程三、程序与数据流
  • 一个TaskManager上可以有多个Taskslot
  • 一个Taskslot上只能运行一个并行度的子任务
  • 多个子任务可以运行在多个TaskManager上。
  • 一个程序中,不同的算子可能具有不同的并行度
  • 算子之间传输数据的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具体是哪一种形式,取决于算子的种类,类似spark中的宽依赖和窄依赖。
  • One-to-one:stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。
  • Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。