简单研究下Flink的任务部署。我们在IDEA 开发工具中用代码跑Flink 的时候,实际是会虚拟出一个小型的Flink 集群,当执行execute 的时候是将上面的代码作为一个job 提交到Flink 的JobManager中。
参考: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/
1.简介
Flink 提交作业和执行任务,需要以下几个组件:客户端(Client)、作业管理器(JobManager)、任务管理器(TaskManager)。我们的代码由客户端获取,之后提交给JobManager,做进一步转换处理后,然后分发给任务众多的TaskManager。这里的TaskManager是真正干活的人,数据处理操作是他们处理的。
2. 单机版部署
1. 资源下载:
https://archive.apache.org/dist/flink/flink-1.13.0/ 下载scala_2.12.tgz
2. 开始启动集群
下载下来,直接启动即可。就回启动一个单节点的集群。JobManager和TaskManager 都在同一个机子。
[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh
然后查看Java 相关进程
[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
4362 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
2991 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
3. 启动后会启动一个webui 控制台。默认端口是8081,这些默认的端口都是可以修改的。在配置文件 conf/flink-conf.yaml
注意这里有个可用的任务槽, 可以理解为Worker 节点数量。(后面跑任务的并行度需要小于等于该值)
4. 将之前的项目打包成jar 包
(1)修改类监听端口为 192.168.13.107
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境(流处理执行环境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("192.168.13.107", 7777);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 打印
sum.print();
// 7. 执行
executionEnvironment.execute();
}
}
View Code
(2) 打包
5. 上传jar包然后提交任务(107 服务器先nc -l 7777 监听端口)
可以选择入口类,1 是并行度(任务槽)。 也可以输入程序参数和保存点保存的路径。(这里的操作应该是客户端先跑一遍代码,把任务摘出来,然后交给TaskManager,TaskManager 分发给JobManager)
6. 测试:
发送数据:
hello flink
hello china
7. 查看flink 控制台
上面的日志实际会输出到 workers 服务器的std. 这里就是107 服务器的。
8. 停止任务(Running Jobs -》 选中任务-》Cancel Job)
9. 历史任务可以到 Completed Jobs 查看执行过的任务的信息
10. 命令行提交任务和取消任务
(1) 提交任务
[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink run -c cn.qz.SocketStreamWordCount -p 1 ./study-flink-1.0-SNAPSHOT.jar
Job has been submitted with JobID 46b0c1daf825694fbc1692add3477ba9
(2) webui 查看任务
(3) nc -l 窗口输入信息进行测试
(4) 取消任务
[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink cancel 46b0c1daf825694fbc1692add3477ba9
Cancelling job 46b0c1daf825694fbc1692add3477ba9.
Cancelled job 46b0c1daf825694fbc1692add3477ba9.
11. 关闭集群
[root@k8smaster01 flink]# ./flink-1.13.0/bin/stop-cluster.sh
3. 集群版部署
1. 节点规划
redisnode01 192.168.13.111 worker
redisnode2 192.168.13.112 worker
k8smaster01 192.168.13.107 masters+workers
2. 三个节点配置SSH免密登录
3. 三个节点配置hosts 文件,通过主机名称可以访问
[root@k8smaster01 flink-1.13.0]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.13.107 k8smaster01
192.168.13.111 redisnode01
192.168.13.112
通过scp 将三个文件复制到其他节点:
scp /etc/hosts 192.168.13.107:/etc/
4. 主节点修改flink配置
主要是修改管理节点和工作节点的信息
(1) 修改conf/flink-con.yaml中如下:
jobmanager.rpc.address: 192.168.13.107
(2) 修改masters内容为主节点加WEBUI端口
192.168.13.107:8081
(3) 修改workers(也可以是主机名称,自己配置了hosts 文件)
192.168.13.107
192.168.13.111
192.168.13.112
5. 将主节点的配置拷贝到另外两个机子(scp 复制)=三个节点的配置保持一致
scp -r ./flink-1.13.0 192.168.13.111:/opt/flink/
6. 三个节点都配置环境变量
其实master 节点可以不配置,master 节点可以通过全路径进行启动集群。workers 节点必须配置环境变量。
/etc/profile 增加如下配置:
export JAVA_HOME=/home/javatest/jdk8/jdk1.8.0_291/
export PATH=$PATH:$JAVA_HOME/bin:/opt/flink/flink-1.13.0/bin
7. 启动集群
启动集群的时候, 在主节点启动即可。workers 节点不需要再次启动,主节点启动后会通过网络去自动启动worker 节点
[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh
(1) 主节点查看进程
[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
11860 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
12187
(2) 工作节点111 查看jps 进程
[root@redisnode01 log]# jps -l | grep -v jps
17122
8. web 查看信息,可以看出有三个任务槽
(1)可以看出三个workers 节点信息
(2)点进去可以看到相关的内存信息
9. 107 节点nc -l 7777 监听端口,然后从web界面再次添加任务
将上面的并行度设置为2,也就是说会占用两个任务槽。
10. 测试
(1) 输入如下信息:
-[root@k8smaster01 flink-1.13.0]#nc -l 7777
hello china this is new task test !!!
(2) 查看输出。 可以看出上面的輸出是在两个worker 节点的标准输出
(3)查看输出详情
107:
111:
(4) 也可以从日志文件查看输出。 下面以111 为例子
[root@redisnode01 log]# ll -tr
total 112
-rw-r--r--. 1 root root 20231 Jun 14 09:47 flink-root-taskexecutor-0-redisnode01.log.1
-rw-r--r--. 1 root root 19747 Jun 14 09:55 flink-root-taskexecutor-0-redisnode01.log.2
-rw-r--r--. 1 root root 0 Jun 14 11:04 flink-root-taskexecutor-1-redisnode01.out
-rw-r--r--. 1 root root 19747 Jun 14 11:05 flink-root-taskexecutor-1-redisnode01.log
-rw-r--r--. 1 root root 19628 Jun 14 11:05 flink-root-taskexecutor-0-redisnode01.log.3
-rw-r--r--. 1 root root 25746 Jun 14 11:10 flink-root-taskexecutor-0-redisnode01.log
-rw-r--r--. 1 root root 38 Jun 14 11:11 flink-root-taskexecutor-0-redisnode01.out
[root@redisnode01 log]# cat flink-root-taskexecutor-0-redisnode01.out
1> (hello,1)
1> (china,1)
1> (this,1)
11. 再次查看首页的任务槽 发现为1,也就是有任务占着的时候,其任务槽不会释放。也就是说不能同时使用(需要将任务cancel)
12. 终止集群
和之前一样用 stop-cluster 脚本即可。
4. 集群部署模式-standalone模式
Flink 提供了三种模式, 三种模式的区别主要是对资源的竞争以及main 方法的执行(是在client 还是JobManager)。总的来说三个都属于独立模式(Standalone-不借助于其他组件,一个Flink 集群就搞定),这种模式有个缺点就是资源不足或者出现故障,需要手动处理。这种模式一般在开发或者作业较少的情况下使用。
1. 会话模式(Session Mode)
上面的部署可以理解为一个会话模式。TaskManager 和 JonManager 在启动后会立即创建。下面两种模式都是作业提交才会创建TaskManager 和 JonManager 。
首先启动一个集群保持一个会话,在这个会话中通过客户端提交作业。上面启动后查看JVM 进程也可以看到有一个主类为StandaloneSessionClusterEntrypoint。 该模式下作业需要提交上去,且作业之间抢占资源,且作业运行的时候不释放资源。 该模式适合单个规模小、执行时间段的大量作业。(这种模式一般会结合其他部署平台使用,比如k8s)。
2. 每作业模式(Per-Task),已经为过时的模式
每个作业一个微型的集群(提交作业才会启动集群,每个作业一个集群(一个主类可能会拆成多个作业,比如从两个流读入数据)),作业完成后集群就会关闭。 FLink 本身不提供单作业模式,需要结合其他资源管理平台,比如k8s。
3. 应用模式(Application Mode)
该模式类似于上面的单作业模式,都是先提交作业才会创建集群。只是单作业模式是客户端先执行一遍代码将作业拆分出来,每个作业一个集群。这种模式是JobManager 执行应用程序,并且一段代码即使包含了多个作业,也只创建一个集群(可以理解为一个jar 包一个集群)。Flink 提供这种模式的提交。
1. 拷贝lib到flink安装目录的lib 目录
2. 启动JobManager
./bin/standalone-job.sh start --job-classname cn.qz.SocketStreamWordCount
3. 启动TaskManager
./bin/taskmanager.sh start
4. nc -l 创建输入信息
5. 查看输出
[root@k8smaster01 log]# cat flink-root-taskexecutor-0-k8smaster01.out
(this,1)
(is,1)
(application,1)
(test,1)
6. 停止集群
[root@k8smaster01 flink-1.13.0]# ./bin/standalone-job.sh stop
Stopping standalonejob daemon (pid: 32587) on host k8smaster01.
[root@k8smaster01 flink-1.13.0]# ./bin/taskmanager.sh stop
Stopping taskexecutor daemon (pid: 32924) on host k8smaster01.
结合K8S部署参考: https://github.com/tkestack/flink-on-k8s-operator