天天看点

Flink集群部署-standalone部署模式

  简单研究下Flink的任务部署。我们在IDEA 开发工具中用代码跑Flink 的时候,实际是会虚拟出一个小型的Flink 集群,当执行execute 的时候是将上面的代码作为一个job 提交到Flink 的JobManager中。

  参考: ​​https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/​​

1.简介

  Flink 提交作业和执行任务,需要以下几个组件:客户端(Client)、作业管理器(JobManager)、任务管理器(TaskManager)。我们的代码由客户端获取,之后提交给JobManager,做进一步转换处理后,然后分发给任务众多的TaskManager。这里的TaskManager是真正干活的人,数据处理操作是他们处理的。

2. 单机版部署

1. 资源下载:

​​https://archive.apache.org/dist/flink/flink-1.13.0/​​  下载scala_2.12.tgz

2. 开始启动集群

  下载下来,直接启动即可。就回启动一个单节点的集群。JobManager和TaskManager 都在同一个机子。

[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh      

然后查看Java 相关进程

[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
4362 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
2991 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint      

3. 启动后会启动一个webui 控制台。默认端口是8081,这些默认的端口都是可以修改的。在配置文件 conf/flink-conf.yaml

  注意这里有个可用的任务槽, 可以理解为Worker 节点数量。(后面跑任务的并行度需要小于等于该值)

Flink集群部署-standalone部署模式

4. 将之前的项目打包成jar 包

(1)修改类监听端口为 192.168.13.107

Flink集群部署-standalone部署模式
Flink集群部署-standalone部署模式
package cn.qz;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SocketStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境(流处理执行环境)
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文件
        DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("192.168.13.107", 7777);
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要显示的声明类型信息
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        // 6. 打印
        sum.print();
        // 7. 执行
        executionEnvironment.execute();
    }
}      

View Code

(2) 打包

5. 上传jar包然后提交任务(107 服务器先nc -l 7777 监听端口)

Flink集群部署-standalone部署模式

  可以选择入口类,1 是并行度(任务槽)。 也可以输入程序参数和保存点保存的路径。(这里的操作应该是客户端先跑一遍代码,把任务摘出来,然后交给TaskManager,TaskManager 分发给JobManager)

6. 测试: 

发送数据:

hello flink
hello china      

7. 查看flink 控制台

  上面的日志实际会输出到 workers 服务器的std. 这里就是107 服务器的。

Flink集群部署-standalone部署模式

 8. 停止任务(Running Jobs -》 选中任务-》Cancel Job)

Flink集群部署-standalone部署模式

9. 历史任务可以到 Completed Jobs 查看执行过的任务的信息

10. 命令行提交任务和取消任务 

(1) 提交任务

[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink run -c cn.qz.SocketStreamWordCount -p 1 ./study-flink-1.0-SNAPSHOT.jar 
Job has been submitted with JobID 46b0c1daf825694fbc1692add3477ba9      

(2) webui 查看任务

Flink集群部署-standalone部署模式

(3) nc -l 窗口输入信息进行测试

(4) 取消任务

[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink cancel 46b0c1daf825694fbc1692add3477ba9
Cancelling job 46b0c1daf825694fbc1692add3477ba9.
Cancelled job 46b0c1daf825694fbc1692add3477ba9.      

11. 关闭集群

[root@k8smaster01 flink]# ./flink-1.13.0/bin/stop-cluster.sh      

3. 集群版部署

1. 节点规划

redisnode01 192.168.13.111 worker
redisnode2 192.168.13.112 worker
k8smaster01 192.168.13.107 masters+workers      

2. 三个节点配置SSH免密登录

3. 三个节点配置hosts 文件,通过主机名称可以访问

[root@k8smaster01 flink-1.13.0]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.13.107 k8smaster01
192.168.13.111 redisnode01
192.168.13.112      

通过scp 将三个文件复制到其他节点:

scp /etc/hosts 192.168.13.107:/etc/      

4. 主节点修改flink配置

主要是修改管理节点和工作节点的信息

(1) 修改conf/flink-con.yaml中如下:

jobmanager.rpc.address: 192.168.13.107      

(2) 修改masters内容为主节点加WEBUI端口

192.168.13.107:8081      

(3) 修改workers(也可以是主机名称,自己配置了hosts 文件)

192.168.13.107
192.168.13.111
192.168.13.112      

5. 将主节点的配置拷贝到另外两个机子(scp 复制)=三个节点的配置保持一致

scp -r ./flink-1.13.0 192.168.13.111:/opt/flink/      

6. 三个节点都配置环境变量

其实master 节点可以不配置,master 节点可以通过全路径进行启动集群。workers 节点必须配置环境变量。

/etc/profile 增加如下配置:

export JAVA_HOME=/home/javatest/jdk8/jdk1.8.0_291/
export PATH=$PATH:$JAVA_HOME/bin:/opt/flink/flink-1.13.0/bin      

7. 启动集群

启动集群的时候, 在主节点启动即可。workers 节点不需要再次启动,主节点启动后会通过网络去自动启动worker 节点

[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh      

(1) 主节点查看进程

[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
11860 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
12187      

(2) 工作节点111 查看jps 进程

[root@redisnode01 log]# jps -l | grep -v jps
17122      

8. web 查看信息,可以看出有三个任务槽

Flink集群部署-standalone部署模式

(1)可以看出三个workers 节点信息

Flink集群部署-standalone部署模式

(2)点进去可以看到相关的内存信息

Flink集群部署-standalone部署模式

 9. 107 节点nc -l 7777 监听端口,然后从web界面再次添加任务

Flink集群部署-standalone部署模式

将上面的并行度设置为2,也就是说会占用两个任务槽。

10. 测试

(1) 输入如下信息:

-[root@k8smaster01 flink-1.13.0]#nc -l 7777
hello china this is new task test !!!      

 (2) 查看输出。 可以看出上面的輸出是在两个worker 节点的标准输出

Flink集群部署-standalone部署模式

 (3)查看输出详情

107:

Flink集群部署-standalone部署模式

 111:

Flink集群部署-standalone部署模式

 (4) 也可以从日志文件查看输出。 下面以111 为例子

[root@redisnode01 log]# ll -tr
total 112
-rw-r--r--. 1 root root 20231 Jun 14 09:47 flink-root-taskexecutor-0-redisnode01.log.1
-rw-r--r--. 1 root root 19747 Jun 14 09:55 flink-root-taskexecutor-0-redisnode01.log.2
-rw-r--r--. 1 root root     0 Jun 14 11:04 flink-root-taskexecutor-1-redisnode01.out
-rw-r--r--. 1 root root 19747 Jun 14 11:05 flink-root-taskexecutor-1-redisnode01.log
-rw-r--r--. 1 root root 19628 Jun 14 11:05 flink-root-taskexecutor-0-redisnode01.log.3
-rw-r--r--. 1 root root 25746 Jun 14 11:10 flink-root-taskexecutor-0-redisnode01.log
-rw-r--r--. 1 root root    38 Jun 14 11:11 flink-root-taskexecutor-0-redisnode01.out
[root@redisnode01 log]# cat flink-root-taskexecutor-0-redisnode01.out
1> (hello,1)
1> (china,1)
1> (this,1)      

11. 再次查看首页的任务槽 发现为1,也就是有任务占着的时候,其任务槽不会释放。也就是说不能同时使用(需要将任务cancel)

Flink集群部署-standalone部署模式

 12. 终止集群

   和之前一样用 stop-cluster 脚本即可。

4. 集群部署模式-standalone模式

  Flink 提供了三种模式, 三种模式的区别主要是对资源的竞争以及main 方法的执行(是在client 还是JobManager)。总的来说三个都属于独立模式(Standalone-不借助于其他组件,一个Flink 集群就搞定),这种模式有个缺点就是资源不足或者出现故障,需要手动处理。这种模式一般在开发或者作业较少的情况下使用。

1. 会话模式(Session Mode)

  上面的部署可以理解为一个会话模式。TaskManager 和 JonManager 在启动后会立即创建。下面两种模式都是作业提交才会创建TaskManager 和 JonManager 。

  首先启动一个集群保持一个会话,在这个会话中通过客户端提交作业。上面启动后查看JVM 进程也可以看到有一个主类为StandaloneSessionClusterEntrypoint。 该模式下作业需要提交上去,且作业之间抢占资源,且作业运行的时候不释放资源。 该模式适合单个规模小、执行时间段的大量作业。(这种模式一般会结合其他部署平台使用,比如k8s)。

2. 每作业模式(Per-Task),已经为过时的模式

  每个作业一个微型的集群(提交作业才会启动集群,每个作业一个集群(一个主类可能会拆成多个作业,比如从两个流读入数据)),作业完成后集群就会关闭。 FLink 本身不提供单作业模式,需要结合其他资源管理平台,比如k8s。

3. 应用模式(Application Mode)

  该模式类似于上面的单作业模式,都是先提交作业才会创建集群。只是单作业模式是客户端先执行一遍代码将作业拆分出来,每个作业一个集群。这种模式是JobManager 执行应用程序,并且一段代码即使包含了多个作业,也只创建一个集群(可以理解为一个jar 包一个集群)。Flink 提供这种模式的提交。

1. 拷贝lib到flink安装目录的lib 目录
2. 启动JobManager
./bin/standalone-job.sh start --job-classname cn.qz.SocketStreamWordCount
3. 启动TaskManager
./bin/taskmanager.sh start
4. nc -l 创建输入信息
5. 查看输出
[root@k8smaster01 log]# cat flink-root-taskexecutor-0-k8smaster01.out
(this,1)
(is,1)
(application,1)
(test,1)
6. 停止集群
[root@k8smaster01 flink-1.13.0]# ./bin/standalone-job.sh stop
Stopping standalonejob daemon (pid: 32587) on host k8smaster01.
[root@k8smaster01 flink-1.13.0]# ./bin/taskmanager.sh stop
Stopping taskexecutor daemon (pid: 32924) on host k8smaster01.      

结合K8S部署参考: ​​https://github.com/tkestack/flink-on-k8s-operator​​