Flink 有非常靈活的分層API設計,其中的核心層就是DataSet/DataStream API。新版本中DataSetAPI 被棄用,使用DataStream 即可實作流批一體。
DataStream(資料流)本身是Flink 中一個用來表示集合的類,我們編寫的Flink 代碼其實就是基于這種資料類型的處理,是以這套核心API就以DataStream 命名。對于流處理和批處理,都可以用這一套API 實作。
一個Flink 程式,其實幾亅對DataStream的各種轉換。具體來說,代碼基本上由一下幾部分組成:
1.擷取執行環境(execution environment)
2.讀取資料源(source)
3.定義基于資料的轉換操作(transformation)
4.定義計算結果的輸出位置(sink)
5.觸發程式執行(execute)
擷取執行環境和觸發執行,都可以認為是針對執行環境的操作。接下來從執行環境、資料源、轉換、輸出 四部分進行研究。
1. 執行環境
1. 執行環境
編寫Flink 程式的第一步就是建立執行環境。我們要擷取的執行環境,是StreamExecutionEnvironment。
建立環境有下面三種方式:
1.建立本地環境
StreamExecutionEnvironment.createLocalEnvironment();
// 預設并行度是CPU 核數
// localEnvironment.setParallelism(3);
檢視源碼,預設的并行度為CPU核數:org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#createLocalEnvironment()
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalParallelism);
}
另外建立本地環境也可以建立webui 界面:
(1)pom引入web依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
(2)代碼建立webui,預設端口是8081
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
webui 類似于叢集的webui,可以通路叢集的任務資訊和資源資訊。
檢視org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#createLocalEnvironmentWithWebUI 源碼:
public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
checkNotNull(conf, "conf");
if (!conf.contains(RestOptions.PORT)) {
// explicitly set this option so that it's not set to 0 later
conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
}
return createLocalEnvironment(conf);
}
2. 建立遠端環境
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 建立執行環境(流處理執行環境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("192.168.13.107", 8081, "E:/study-flink-1.0-SNAPSHOT.jar");
// 2. 讀取檔案
DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("192.168.13.107", 7777);
// 3. 轉換資料格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
}).slotSharingGroup("1")
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要顯示的聲明類型資訊
// 4. 分組
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 列印
sum.print();
// 7. 執行
executionEnvironment.execute();
}
}
這時候實際上資料将任務送出到遠端位址上。這裡需要指定jar包的位置,并且中間的代碼也不能省,必須有算子才能執行。
3.getExecutionEnvironment選擇執行環境(一般用這個API)
這個方法會根據目前的上下文進行選擇,如果在叢集環境就傳回叢集環境;否則就建立本地環境。檢視源碼org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getExecutionEnvironment(org.apache.flink.configuration.Configuration):
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
2. 執行模式
DataStream API預設的執行模式是流模式(STREAMING), 也可以選擇批處理模式(BATCH), 自動模式(AUTOMIC) 根據輸入資料源是否有界自動選擇執行模式。參考類: org.apache.flink.api.common.RuntimeExecutionMode。這裡需要注意并不是可以任意指定,無界資料流就不能使用BATCH 模式。
預設就是STREAMING模式。是以重點介紹一下BATCH模式的配置。
1. BATCH模式的配置方法
(1)通過指令行配置
bin/flink -Dexecution.runtime-mode=BATCH ...
(2)代碼指定
executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
測試:
檔案内容:
hello world
hello flink
hello java
java nb
java pl
比如對于如下程式:
package cn.qz;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 建立執行環境(流處理執行環境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 讀取檔案
DataStreamSource<String> txtDataSource = executionEnvironment.readTextFile("file/words.txt");
// 3. 轉換資料格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要顯示的聲明類型資訊
// 4. 分組
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和 (sum、min、max 可以用字段名稱,也可以用字段順序)
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 列印
sum.print();
// 7. 執行
executionEnvironment.execute();
}
}
預設結果:(STREAMING)
2> (java,1)
2> (pl,1)
4> (nb,1)
3> (hello,1)
5> (world,1)
2> (java,2)
3> (hello,2)
3> (hello,3)
2> (java,3)
7> (flink,1)
BATCH 結果:
7> (flink,1)
3> (hello,3)
2> (pl,1)
2> (java,3)
5> (world,1)
4> (nb,1)
AUTOMATIC結果:
4> (nb,1)
7> (flink,1)
2> (pl,1)
3> (hello,3)
2> (java,3)
5> (world,1)
2. 什麼時候選擇BATCH
簡單的原則:BATCH處理批量資料,STREAMING 處理流式資料。 資料有界的時候輸出結果更加高效;資料無界的時候隻能選擇STREAMING流處理。
3. 程式執行
flink 是事件驅動的,隻有資料真正到來才會進行計算,這也被稱為延遲執行或懶執行。我們需要顯示地執行execute()方法,來觸發程式執行。execute 方法将一直等待作業完成,然後傳回一個JobExecutionResult(内部包含JobId 等資訊)。
2. Source
Flink 可以從各種來源擷取資料,然後建構DataStream 進行轉換處理。 一般資料的輸入來源稱為資料源(data source), 而讀取的算子就是源算子(source operator)。Flink 中通用的添加source 的方式,是調用執行環境的addSource 方法, 且方法的入參是一個SourceFunction 接口,傳回的是DataStreamSource。
1.自定義資料源的用法
參考: org.apache.flink.streaming.api.functions.source.FromElementsFunction
兩個核心方法:
run: 通過sourceContext 向上下遊發送資料。
cancel: 通過辨別位控制退出循環,來達到中斷資料源的效果。
1. 串行資料源
資料源:
package cn.qz.source;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@Slf4j
public class MySourceFunction implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
log.info("cn.qz.source.MySourceFunction.run start");
for (int index = 0; index < 100 && isRunning; index++) {
Thread.sleep(2 * 1000);
ctx.collect(RandomStringUtils.randomAlphanumeric(4));
}
log.info("cn.qz.source.MySourceFunction.run end");
}
/**
* 界面點取消任務的時候執行
*/
@Override
public void cancel() {
isRunning = false;
log.info("cn.qz.source.MySourceFunction.cancel");
}
}
測試類:
package cn.qz.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
executionEnvironment.setParallelism(1);
DataStreamSource<String> stringDataStreamSource = executionEnvironment.addSource(new MySourceFunction());
stringDataStreamSource.print();
executionEnvironment.execute();
}
}
這種資料源的并行度隻能設定為1,否則會報錯如下:
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:35)
at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:114)
2. 并行資料源
資料源:
package cn.qz.source;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.collection.Parallel;
@Slf4j
public class MySourceFunction2 implements ParallelSourceFunction<User> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<User> ctx) throws Exception {
log.info("cn.qz.source.MySourceFunction2.run start");
for (int index = 0; index < 3 && isRunning; index++) {
String s = RandomStringUtils.randomAlphabetic(4);
User user = new User(s, s, 100 + index);
ctx.collect(user);
}
log.info("cn.qz.source.MySourceFunction2.run end");
}
@Override
public void cancel() {
log.info("cn.qz.source.MySourceFunction2.run cancel");
isRunning = false;
}
}
測試類:
package cn.qz.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomSource2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
executionEnvironment.setParallelism(1);
DataStreamSource<User> stringDataStreamSource = executionEnvironment.addSource(new MySourceFunction2());
stringDataStreamSource.setParallelism(3);
stringDataStreamSource.print();
executionEnvironment.execute();
}
}
結果:
2022-06-23 19:28:36 [Legacy Source Thread - Source: Custom Source (1/3)#0] [cn.qz.source.MySourceFunction2]-[INFO] cn.qz.source.MySourceFunction2.run start
2022-06-23 19:28:36 [Legacy Source Thread - Source: Custom Source (3/3)#0] [cn.qz.source.MySourceFunction2]-[INFO] cn.qz.source.MySourceFunction2.run start
2022-06-23 19:28:36 [Legacy Source Thread - Source: Custom Source (2/3)#0] [cn.qz.source.MySourceFunction2]-[INFO] cn.qz.source.MySourceFunction2.run start
2022-06-23 19:28:36 [Legacy Source Thread - Source: Custom Source (3/3)#0] [cn.qz.source.MySourceFunction2]-[INFO] cn.qz.source.MySourceFunction2.run end
2022-06-23 19:28:36 [Legacy Source Thread - Source: Custom Source (1/3)#0] [cn.qz.source.MySourceFunction2]-[INFO] cn.qz.source.MySourceFunction2.run end
2022-06-23 19:28:36 [Legacy Source Thread - Source: Custom Source (2/3)#0] [cn.qz.source.MySourceFunction2]-[INFO] cn.qz.source.MySourceFunction2.run end
User(username=jCxn, fullname=jCxn, age=100)
User(username=SNjM, fullname=SNjM, age=101)
User(username=Wfwj, fullname=Wfwj, age=102)
User(username=bMnT, fullname=bMnT, age=100)
User(username=YkrF, fullname=YkrF, age=101)
User(username=CXsf, fullname=CXsf, age=102)
User(username=STOo, fullname=STOo, age=100)
User(username=kuCI, fullname=kuCI, age=101)
User(username=EJMx, fullname=EJMx, age=102)
2. 取集合的資料源
pojo
package cn.qz.source;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 使用者資訊, 有如下要求:
* 1. 類是公有的
* 2. 有一個無參的構造方法
* 3. 所有屬性都是公有的
* 4. 所有屬性的類型都是可以序列化的
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
public String username;
public String fullname;
public int age;
}
測試代碼
package cn.qz.source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class Client1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<User> dataStreamSource = executionEnvironment.fromCollection(listUsers());
dataStreamSource.print();
executionEnvironment.execute();
}
private static List<User> listUsers() {
List<User> users = new ArrayList<>();
users.add(new User("zs", "張三", 22));
users.add(new User("ls", "李四", 23));
users.add(new User("ww", "王五", 21));
return users;
}
}
3. 從檔案讀取
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 建立執行環境(流處理執行環境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 讀取檔案
DataStreamSource<String> txtDataSource = executionEnvironment.readTextFile("file/words.txt");
// 3. 轉換資料格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要顯示的聲明類型資訊
// 4. 分組
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和 (sum、min、max 可以用字段名稱,也可以用字段順序)
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 列印
sum.print();
// 7. 執行
executionEnvironment.execute();
}
}
4. 從socket 讀取
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 建立執行環境(流處理執行環境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 讀取檔案
DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("localhost", 7777);
// 3. 轉換資料格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要顯示的聲明類型資訊
// 4. 分組
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 列印
sum.print();
// 7. 執行
executionEnvironment.execute();
}
}
5. 支援的資料類型
為了友善地處理資料,Flink 有一整套類型系統。Flink 使用"類型資訊"(TypeInformation)來統一表示資料類型。TypeInformation類是Flink 中所有類型描述符的基類。它涵蓋了類型的一些基本屬性,并為每個資料類型生成特定的序列化器、反序列化器和比較器。
1.flink支援的資料類型
簡單來說,常見的java和scala資料類型,flink都是支援的。flink内部對支援不同的類型進行了劃分,這些類型可以在org.apache.flink.api.common.typeinfo.Types工具類中找到:
(1)基本類型
所有Java基本類型及其包裝類,再加上Void、String、Date、BigDecimal、BigInteger。
(2)數組類型
包括基本類型資料數組(PRIMITIVE_ARRAY)和對象數組(OBJECT_ARRAY)
(3)複合資料類型
Java元組類型(TUPLE): 這是Flink 内置的元祖類型,是Java API的一部分。最多25個字段,也就是Tuple0-Tuple25,不支援空字段。
Scala樣例類以及Scale 元組:不支援空字段
行類型(ROW):可認為是具有任意個字段的元祖,并支援空字段
POJO類:Flink自定義的類似于Javabean 模式的類。要求如下:
類是公告的和獨立的(沒有非靜态的内部類)
類有一個公共的無參構造方法
類中的所有字段是public且非final 的;或者有一個公共的getter、setter 方法。這些方法符合javabean的命名規範。
(4)輔助類型
Option、Either、List、Map等
(5)泛型類型
如果不是上面類型定義的類型,Flink會當做泛型來處理。Flink會把泛型當做黑盒子,無法擷取它們内部的屬性;它們也不是由Flink本身序列化的,而是由Kryo 序列化的。
2.類型提示
Flink 還具有一個類型提取系統,可以分析函數的輸入和傳回類型,自動擷取類型資訊,進而擷取序列化器和反序列化器。由于泛型擦除,有時候需要顯示地提供類型資訊,才能使應用程式正常工作或提高性能。為了解決這類問題,JavaAPI提供了類提示(type hints)。
(1)方式一:
.returns(Types.TUPLE(Types.STRING, Types.LONG))
(2)方式二:
.returns(new
3. 轉換算子
一個Flink程式的核心,其實就是所有的轉換操作,它們決定了處理的業務邏輯。下面研究常見的轉換算子。
1. 常見轉換
map\filter\flatMap
package cn.qz.transformation;
import cn.qz.source.User;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class MapTrans {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs", "張三", 22),
new User("ls", "李四", 23),
new User("ww", "王五", 21),
new User("ww", "王五", 22),
new User("zl", "趙六", 27)
);
// map 映射轉換
SingleOutputStreamOperator<String> map = dataStreamSource.map(User::getUsername);
// filter 過濾
SingleOutputStreamOperator<String> filter = map.filter(str -> "ww".equals(str));
// flatMap 扁平映射, 主要是将資料流中的整體(一般是集合類型)拆分成一個一個的個體
// 接收的參數是一個FlatMapFunction,泛型第一個實際類型是傳入的實際類型,第二個是傳回的時候傳回的類型。Collector 用于收集傳回結果。
SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = filter.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value);
}
});
executionEnvironment.execute();
}
}
2.聚合
在實際應用中,經常需要對資料進行統計或整合。進而提煉出更有用的資訊。這就需要聚合。
(1)按鍵分區(keyBy)
keyBy是聚合前必須要用到的一個算子。keyBy通過指定鍵(key),可以将一條資料分成不同的分區(partitions)。這裡所說的分區實際就是并行處理的子任務,也就是對應不同的任務槽。
在keyBy 内部,是通過計算key的哈希值,對分區數進行取模運算來實作。是以這裡的key 如果是POJO的話,必須要重寫 hashCode() 方法。
需要注意,keyBy 得到的結果将不再是DataStream,而是會将DataStream 轉換為KeyedStream(分區流、鍵控流),繼承自DataStream,是以其操作也是基于DataStream API。KeyedStream 是一個非常重要的資料結構,可以做後續的聚合,比如sum、reduce等; 而且它可以将目前子任務的狀态也按照key 進行劃分、先定位僅對目前key 有效。keyBy 可以傳入可變數組(int、String)、也可以傳入一個KeySelector 。
// 使用匿名内部類
KeyedStream<User, String> userStringKeyedStream = dataStreamSource.keyBy(User::getUsername);
// 傳入KeySelector
/*KeyedStream<User, String> userStringKeyedStream = dataStreamSource.keyBy(new KeySelector<User, String>() {
@Override
public String getKey(User value) throws Exception {
return value.getUsername();
}
});*/
(2)簡單聚合
sum: 在輸入流上,對指定的字段做疊加求和的操作
min:在輸入流上,對指定的字段求最小值
max:在輸入流上,對指定字段求最大值
minBy: 與min 類似。 不同的是,min 隻計算指定字段的最小值,其他字段會保留最初第一個資料的值; 而minBy 則會傳回包含字段最小值的整條資料
maxBy:與max類似。差別同上
上面方法api 可需要指定指定的字段。可以指定位置和指定名稱。(POJO類型隻能指定名稱,不能指定位置)
簡單測試以及結果如下:
package cn.qz.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapTrans {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2> dataStreamSource = executionEnvironment.fromElements(
Tuple2.of("a", 1),
Tuple2.of("b", 5),
Tuple2.of("c", 3),
Tuple2.of("d", 4),
Tuple2.of("d", 7)
);
// dataStreamSource.keyBy(r -> r.f0).sum(1).print();
/**
* 5> (d,4)
* 5> (d,11)
* 4> (c,3)
* 2> (b,5)
* 6> (a,1)
*/
// dataStreamSource.keyBy(r -> r.f0).sum("f1").print(); // 結果同上
// dataStreamSource.keyBy(r -> r.f0).maxBy("f1").print();
/**
* 4> (c,3)
* 2> (b,5)
* 5> (d,4)
* 6> (a,1)
* 5> (d,7)
*/
// dataStreamSource.keyBy(r -> r.f0).max(1).print();
/**
* 4> (c,3)
* 5> (d,4)
* 2> (b,5)
* 5> (d,7)
* 6> (a,1)
*/
// dataStreamSource.keyBy(r -> r.f0).min(1).print();
/**
* 6> (a,1)
* 2> (b,5)
* 5> (d,4)
* 4> (c,3)
* 5> (d,4)
*/
dataStreamSource.keyBy(r -> r.f0).minBy(1).print();
/**
* 2> (b,5)
* 6> (a,1)
* 5> (d,4)
* 4> (c,3)
* 5> (d,4)
*/
executionEnvironment.execute();
}
}
一個聚合算子,會為每一個key 儲存一個聚合的值,在Flink 中我們把它叫做"狀态"(state)。是以沒當有一個新的資料輸入,算子就回更新儲存的聚合結果,并發送一個帶有更新後聚合值的事件到下遊算子。
并且經過簡單聚合之後的資料流,元素的資料類型保持不變。
(3)歸約聚合 reduce
reduce 操作主要是一個ReduceFunction, 其接口如下:
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
/**
* The core method of ReduceFunction, combining two values into one value of the same type. The
* reduce function is consecutively applied to all values of a group until only a single value
* remains.
*
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}
接收兩個輸入資料,經過轉換後傳回一個相同類型的資料。與簡單聚合類型,它不會改變流的元素資料類型,也就是輸出類型和輸入類型是一樣的。
比如: 求一個相同名字的人的年齡之和,且保留最大的
package cn.qz.transformation;
import cn.qz.source.User;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs", "張三", 22),
new User("ls", "李四", 23),
new User("ww", "王五1", 21),
new User("ww", "王五", 22),
new User("zl", "趙六", 27)
);
dataStreamSource
// 第一次分組,按username 分組
.keyBy(User::getUsername)
// 第一次reduce, 将相同的年領求和,求和之後設定到相同組的第一個使用者的user 身上
.reduce(new ReduceFunction<User>() {
@Override
public User reduce(User value1, User value2) throws Exception {
value1.setAge(value1.getAge() + value2.getAge());
return value1;
}
})
// 為每條資料配置設定同一個key,劃分到同一個組
.keyBy(r -> true)
// 找出年齡最大的,可以用maxBy。
// .maxBy("age")
// 找出年齡最大的也可以用reduce 再次實作
.reduce((User u1, User u2) -> {
if (u1.getAge() > u2.getAge()) {
return u1;
}
return u2;
})
// 列印
.print();
executionEnvironment.execute();
}
}
3. 使用者自定義函數(UDF-user definied function)
Flink的DataStream API 程式設計風格其實是一緻的:全部以算子操作名稱 + Function 指令,比如源算子SourceFunction、MapFunction、FilterFunction 等,這些都繼承Function(一個空接口)。
上面的操作可以用自定義類,也可以用lambda表達式。如果是自定義的類,不存在泛型引起的指定傳回類型的問題;如果是lambda 有可能需要指定傳回類型(簡單類型不需要指定,如果是複雜的類型或者POJO類型需要指定)。
比如:
package cn.qz.transformation;
import cn.qz.source.User;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class UDFTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs", "張三", 22),
new User("ls", "李四", 23),
new User("ww", "王五1", 21),
new User("ww", "王五", 22),
new User("zl", "趙六", 27)
);
// 傳入的參數表示需要加密的username
dataStreamSource.map(new MyMapFunction("ww")).print();
executionEnvironment.execute();
}
private static class MyMapFunction implements MapFunction<User, String> {
private String username;
public MyMapFunction(String username) {
this.username = username;
}
@Override
public String map(User value) throws Exception {
if (value.getUsername().equals(username)) {
return "******";
}
return value.getUsername();
}
}
}
4.富函數類
富函數類也是DataStream API 提供 的一個函數的接口,所有的Flink 函數類都有其Rich版本。富函數類以抽象類的形式出現。例如:RichMapFunction、RichReduceFunction 等。與正常函數類的不同主要在于,富函數類可以擷取運作環境的上下文,并擁有一些生命周期方法,是以可以實作更複雜的功能。
RichFunction 有生命周期的概念。典型的生命周期方法有:
open 方法:RichFunction 的初始化方法,會開啟一個算子的生命周期。當一個算子的實際工作方法例如map、filter 被調用之前,open 首先會被調用。可以用于資料庫連接配接等。
close:生命周期中的最後一個調用的方法。一般用于清理工作。
這裡的生命周期的方法,對于一個并行子任務來說隻會調用一次;而對應的實際工作方法,比如map 等每條資料到來後都會觸發一次調用。另外富函數提供了getRuntimeContext 擷取上下文執行環境。
package cn.qz.transformation;
import cn.qz.source.User;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class RichMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(2);
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs", "張三", 22),
new User("ls", "李四", 23),
new User("ww", "王五1", 21),
new User("ww", "王五", 22),
new User("zl", "趙六", 27)
);
SingleOutputStreamOperator<String> map = dataStreamSource.map(new RichMapFunction<User, String>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("任務索引為(start):" + getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public String map(User value) throws Exception {
return value.getUsername();
}
@Override
public void close() throws Exception {
super.close();
System.out.println("任務索引為(close):" + getRuntimeContext().getIndexOfThisSubtask());
}
});
map.print();
executionEnvironment.execute();
}
}
結果:
任務索引為(start):1
任務索引為(start):0
2> ls
1> zs
2> ww
1> ww
1> zl
任務索引為(close):0
任務索引為(close):1
4. 輸出sink
所有的算子都可以自定義函數來實作,是以都可以通過實作函數來自定義處理邏輯,是以隻要有讀寫用戶端,與外部系統的互動在任何一個算子中都可以實作。例如可以考慮用RichMapFunction,在open() 做連接配接操作、close() 做資源清理操作。但是,這種做法的缺點就是Flink 提供的一緻性檢查點(checkpoint) 無法有效利用,發生故障很難恢複到從前。
Flink的DataStream API提供了專門向外部寫入資料的方法:addSink。 接受的參數是一個SinkFunction。比如我們常見的print 函數就是一個SinkFunction。
org.apache.flink.streaming.api.datastream.DataStream#print() 源碼如下:
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}
package org.apache.flink.api.common.functions.util;
import org.apache.flink.annotation.Internal;
import java.io.PrintStream;
import java.io.Serializable;
/** Print sink output writer for DataStream and DataSet print API. */
@Internal
public class PrintSinkOutputWriter<IN> implements Serializable {
private static final long serialVersionUID = 1L;
private static final boolean STD_OUT = false;
private static final boolean STD_ERR = true;
private final boolean target;
private transient PrintStream stream;
private final String sinkIdentifier;
private transient String completedPrefix;
public PrintSinkOutputWriter() {
this("", STD_OUT);
}
public PrintSinkOutputWriter(final boolean stdErr) {
this("", stdErr);
}
public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) {
this.target = stdErr;
this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier);
}
public void open(int subtaskIndex, int numParallelSubtasks) {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
completedPrefix = sinkIdentifier;
if (numParallelSubtasks > 1) {
if (!completedPrefix.isEmpty()) {
completedPrefix += ":";
}
completedPrefix += (subtaskIndex + 1);
}
if (!completedPrefix.isEmpty()) {
completedPrefix += "> ";
}
}
public void write(IN record) {
stream.println(completedPrefix + record.toString());
}
@Override
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
}
// 然後輸出交給 java.io.PrintStream
像kafka之類的流式系統,Flink 提供了完美對接,source/sink 兩段都能連接配接,可讀可寫; 而對于ES、FileSystem、JDBC 等則隻提供了輸出寫入的sink 連接配接器。另外Apache 提供了redis的輸出以及ActiveMQ的輸入等。除此之外就需要自定義實作sink 連接配接器了。
1.輸出到檔案
flink 本來提供了簡單的輸出到檔案的方法 writeAsText()、writeAsCsv()等,但是這種sink 的并行度隻能為1,而且對于故障恢複也沒有任何保證。是以被棄用。
Flink 為此專門提供了一個流式檔案系統的連接配接器:StreamingFileSink,繼承自RichSinkFunction,而且內建了Flink的檢查點。org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink 關系圖如下:
public class StreamingFileSink<IN> extends RichSinkFunction<IN>
implements
1. 輸出到檔案
package cn.qz.sink;
import cn.qz.source.User;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.util.concurrent.TimeUnit;
public class FileSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(2);
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs1", "張三", 22),
new User("ls2", "李四", 23),
new User("ww3", "王五1", 21),
new User("ww4", "王五", 22),
new User("ww5", "王五", 22),
new User("ww6", "王五", 22),
new User("ww7", "王五", 22),
new User("zl8", "趙六", 27)
);
StreamingFileSink<String> sink = StreamingFileSink
// 指定輸出的目錄路徑以及編碼
.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
/**
*指定滾動政策。 下面三種情況會進行分區檔案
* 至少包含15 分支的資料
* 最近五分鐘沒有收到新的資料
* 檔案大小已達到1GB
*/
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build()).build();
dataStreamSource.map(User::toString).addSink(sink);
executionEnvironment.execute();
}
}
結果:兩個檔案分别對應兩個任務槽,均等平分任務。
如果想輸出到一個檔案,可以用global 将分區數設定為1, 然後進行輸出:
dataStreamSource.map(User::toString).global().addSink(sink);
2.輸出到redis
(1)pom新增
<!--redis-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
(2)測試代碼
将使用者的username 作為key, fullname 作為value 存入到key 為users的hash 資料結構中。
package cn.qz.sink;
import cn.qz.source.User;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @author 喬利強
* @date 2022/6/30 19:42
* @description
*/
public class RedisSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(2);
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs1", "張三", 22),
new User("ls2", "李四", 23),
new User("ww3", "王五1", 21),
new User("ww4", "王五", 22),
new User("ww5", "王五", 22),
new User("ww6", "王五", 22),
new User("ww7", "王五", 22),
new User("zl8", "趙六", 27)
);
// 建立一個到redis連接配接的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.build();
dataStreamSource.addSink(new RedisSink<User>(conf, new MyRedisMapper()));
executionEnvironment.execute();
}
private static class MyRedisMapper implements RedisMapper<User> {
/**
* 傳回目前redis 操作指令的描述:表示将所有的使用者資訊放入hset 存入
*
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "users");
}
/**
* hash 中的 key
*
* @param data
* @return
*/
@Override
public String getKeyFromData(User data) {
return data.getUsername();
}
/**
* 傳回的值
*
* @param data
* @return
*/
@Override
public String getValueFromData(User data) {
return data.getFullname();
}
}
}
(3)redis檢視
127.0.0.1:6379> keys *
1) "users"
127.0.0.1:6379> type users
hash
127.0.0.1:6379> hgetall users
1) "ls2"
2) "\xe6\x9d\x8e\xe5\x9b\x9b"
3) "zs1"
4) "\xe5\xbc\xa0\xe4\xb8\x89"
5) "ww3"
6) "\xe7\x8e\x8b\xe4\xba\x941"
7) "ww4"
8) "\xe7\x8e\x8b\xe4\xba\x94"
9) "ww5"
10) "\xe7\x8e\x8b\xe4\xba\x94"
11) "ww6"
12) "\xe7\x8e\x8b\xe4\xba\x94"
13) "ww7"
14) "\xe7\x8e\x8b\xe4\xba\x94"
15) "zl8"
16) "\xe8\xb5\xb5\xe5\x85\xad"
3.輸出到mysql
(1)pom新增
5.1.47</version>
</dependency>
(2)測試代碼
package cn.qz.sink;
import cn.qz.source.User;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MysqlSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(2);
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs1", "張三", 22),
new User("ls2", "李四", 23),
new User("ww3", "王五1", 21),
new User("ww4", "王五", 22),
new User("ww5", "王五", 22),
new User("ww6", "王五", 22),
new User("ww7", "王五", 22),
new User("zl8", "趙六", 27)
);
dataStreamSource.addSink(
JdbcSink.sink(
"INSERT INTO user (username, fullname, age) VALUES (?, ?, ?)",
(statement, r) -> {
statement.setString(1, r.getUsername());
statement.setString(2, r.getFullname());
statement.setInt(3, r.getAge());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/flink")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
)
);
executionEnvironment.execute();
}
}
4.自定義輸出實作輸出到mysql
package cn.qz.sink;
import cn.qz.source.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
public class CustomSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(2);
DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
new User("zs1", "張三", 22),
new User("ls2", "李四", 23),
new User("ww3", "王五1", 21),
new User("ww4", "王五", 22),
new User("ww5", "王五", 22),
new User("ww6", "王五", 22),
new User("ww7", "王五", 22),
new User("zl8", "趙六", 27)
);
dataStreamSource.addSink(new RichSinkFunction<User>() {
Connection connection = null;
Statement statement = null;
private volatile int age = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class<?> aClass = Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink", "root", "123456");
statement = connection.createStatement();
System.out.println(Thread.currentThread().getName() + "\topen");
}
@Override
public void invoke(User value, Context context) throws Exception {
super.invoke(value, context);
++age;
statement.execute(String.format("INSERT INTO USER VALUES('%s', '%s', %s)", "username" + age, "使用者全名" + age, age));
}
@Override
public void close() throws Exception {
super.close();
statement.close();
connection.close();
System.out.println(Thread.currentThread().getName() + "\tclose");
}
}).setParallelism(2);
executionEnvironment.execute();
}
}
結果:
Sink: Unnamed (2/2)#0 open
Sink: Unnamed (1/2)#0 open
Sink: Unnamed (2/2)#0 close
Sink: Unnamed (1/2)#0 close