- Flink DataStream
- DataStream相關概念
5.1.1 ExecutionEnvironment執行環境
- 執行環境建立方式
和Flink互動需要一個入口,這個入口就是ExecutionEnvironment執行環境。在Stream API中,它的執行環境就使用StreamExecutionEnvironment來建立,裡面包含了建立各種執行環境的靜态方法。
這裡這些靜态方法都可以建立執行環境,我們最常用的就是getExecutionEnvironment方法,它會根據實際的執行環境來判斷是運作在Local還是叢集上。
// 建立流處理環境 StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); // 建立一個本地運作環境 StreamExecutionEnvironment.createLocalEnvironment(2,new Configuration()); // 建立一個本地帶WebUI的執行環境,需要引入flink-runtime-web的依賴 StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); // 建立遠端執行環境,給定遠端位址将任務直接送出到叢集上,需要提供jar包 StreamExecutionEnvironment.createRemoteEnvironment("hadoop01",6123); |
- 遠端執行RemoteEnv
示例:
// 因為下面用到了一個類AddKeyMapFunction這是使用者自己定義的一個類,這個類預設Flink的jar包環境裡面是沒有的,現在需要先将這個類打成一個jar包, // 下面建立環境第三個參數填入這個jar包的路徑,運作時就可以将jar包上傳給Flink,任務加載到就可以開始運作了 StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "hadoop01", 8081, "com.example.flink.datastream/target/com.example.flink.datastream-0.0.1-SNAPSHOT.jar" ); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); source.map(new AddKeyMapFunction()).print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } |
- 異步送出任務
DataStream的内部操作是懶執行的,要想觸發執行動作需要執行execute動作。這裡execute()方法是線程阻塞的,可以通過executeAsync()來異步送出任務,後續可以通過其傳回值jobClient來監控任務狀态。
try { // 如果是execution()程式就會卡在這裡 // 可以使用executeAsync()異步送出,這樣程式就不會卡在這裡,後續可以通過jobClient來監控任務狀态 JobClient jobClient = env.executeAsync(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(() -> { while (true) { try { JobStatus jobStatus = jobClient.getJobStatus().get(); if (!JobStatus.RUNNING.equals(jobStatus)) break; else { TimeUnit.SECONDS.sleep(1); System.out.println("運作狀态:" + jobStatus.name()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } |
5.1.2 什麼是DataStream
DataStream API 得名于一個特殊的DataStream類,該類用于表示 Flink 程式中的資料集合。您可以将它們視為可以包含重複項的不可變資料集合。這些資料可以是有限的,也可以是無限的,用于處理它們的 API 是相同的。
在用法上DataStream與正常 Java 相似,Collection但在某些關鍵方面卻大不相同。它們是不可變的,這意味着一旦它們被建立,你就不能添加或删除元素。您也不能簡單地檢查内部元素,而隻能使用DataStreamAPI 操作(也稱為轉換)處理它們。
大概的意思就是DataStream就是一個大的資料集向Collection一樣,但是這個資料集内的資料不能被改變隻能用它提供出來的API進行轉換。
-
- DataStreamAPI操作
5.2.1 Connector連接配接器
一些比較基本的 Source 和 Sink 已經内置在 Flink 裡。 預定義 data sources 支援從檔案、目錄、socket,以及 collections 和 iterators 中讀取資料。 預定義 data sinks 支援把資料寫入檔案、标準輸出(stdout)、标準錯誤輸出(stderr)和 socket。
以下是1.12版本Flink社群已經開發完成的連接配接器source是輸入sink是輸出。
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- FileSystem(包括 Hadoop ) - 僅支援流 (sink)
- FileSystem(包括 Hadoop ) - 流批統一 (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
- JDBC (sink)
-
-
- Source
-
-
- 内部資料源讀取
以上都是可以從内部資料源讀取資料的API
- 文本讀取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.readTextFile("data/streaming/AFINN-111.txt"); source.print(); env.execute(); |
- Socket讀取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); source.print(); env.execute(); |
- Kafka讀取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String topic= "sensor"; Properties consumerConfig = new Properties(); consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092"); consumerConfig.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"flink_consumer"); DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),consumerConfig)); source.print(); env.execute(); |
- 自定義Source
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String topic = "sensor"; Properties consumerConfig = new Properties(); consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092"); consumerConfig.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_consumer"); DataStreamSource<String> source = env.addSource(new MySource(topic, consumerConfig)); source.print(); env.execute(); } static class MySource implements SourceFunction<String> { private final String topic; private final Properties config; private volatile boolean run = true; public MySource(String topic, Properties config) { this.topic = topic; this.config = config; this.run = true; } @Override public void run(SourceContext<String> ctx) throws Exception { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config); consumer.subscribe(Collections.singletonList(this.topic)); while (run) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { ctx.collect(consumerRecord.value()); } } } @Override public void cancel() { this.run = false; } } |
-
-
-
- Sink
-
-
- Kafka Sink
// 從Socket接收資料發送到Kafka StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); String brokerList = "hadoop01:9092"; String sendTopic = "sensor"; source.addSink(new FlinkKafkaProducer<String>(brokerList, sendTopic, new SimpleStringSchema())); env.execute(); |
- JDBC Sink
// 從Socket接收資料寫入到mysql StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); String writeSql = "INSERT INTO tbl2(value) VALUES(?)"; source.addSink(JdbcSink.sink(writeSql, (JdbcStatementBuilder<String>) (preparedStatement, s) -> { preparedStatement.setObject(1, s); }, // JDBC是以批的方式寫入的,這裡改下批次大小好看到效果 new JdbcExecutionOptions.Builder().withBatchSize(1) .build() , new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("") .withUrl("jdbc:mysql:///test") .build() )); env.execute(); |
-
-
- Operators
- Transform資料流轉換
- Operators
-
- Map & FatMap
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 在接收到的消息前加個字首列印出來 SingleOutputStreamOperator<String> result = source.map(new MapFunction<String, String>() { private static final String PREFIX = "input message is : "; @Override public String map(String value) throws Exception { return PREFIX.concat(value); } }); result.print("map result:"); SingleOutputStreamOperator<String> flatMapResult = source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for (String word : value.split(" ")) { out.collect(word); } } }); flatMapResult.print("flatmap result:"); env.execute(); |
- Fiter
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 在接收到的消息前加個字首列印出來 SingleOutputStreamOperator<String> result = source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { // 對内容進行過濾,如果消息的長度超過10就被濾掉 return StringUtils.length(value) <= 10; } }); result.print("長度不超過10的消息:"); env.execute(); |
- Union
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> hadoop01Stream = env.socketTextStream("hadoop01", 9999); DataStreamSource<String> hadoop02Stream = env.socketTextStream("hadoop02", 9999); // 将兩個相同泛型的DataStream Union到一起聯合處理 DataStream<String> unionResult = hadoop01Stream.union(hadoop02Stream); unionResult.print("union stream:"); env.execute(); |
- Connect
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> hadoop01Stream = env.socketTextStream("hadoop01", 9999); DataStream<Integer> hadoop02Stream = env.socketTextStream("hadoop02", 9999).map(Integer::parseInt); // 将兩個流connect到一起,不管泛型一不一緻 ConnectedStreams<String, Integer> connect = hadoop01Stream.connect(hadoop02Stream); // 後續對兩個流的操作就是兩個方法單獨處理都是Co開頭的Function最終整合成一個流 SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<String, Integer, String>() { @Override public String map1(String value) throws Exception { return value; } @Override public String map2(Integer value) throws Exception { return value.toString(); } }); result.print("connect stream : "); env.execute(); |
- KeyBy
keyBy算子是用來分組的算子,傳回結果就是KeyedStream鍵控流,後面的Reduce算子Aggravate算子都是需要分組之後才能操作。
// 對source進行分組,WordCount統計單詞個數 KeyedStream<Tuple2<String, Long>, String> wordKeyedStream = wordStream.keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> tuple2) throws Exception { return tuple2.f0; } }); |
- Reduce
Reduce累計,接收的是KeyedStream,按照什麼Key來彙總,group by後的聚合操作。
SingleOutputStreamOperator<Tuple2<String, Long>> reduceStream = wordKeyedStream.reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }); |
- Aggregations
Aggregations操作就是分組之後的聚合操作,簡單的聚合例如sum,count,min,max之類的函數。
KeyedStream<Word, String> wordSumKeyedStream = wordStream.keyBy(new KeySelector<Word, String>() { @Override public String getKey(Word value) throws Exception { return "長度比較"; } }); // 再看哪些單詞長度長 SingleOutputStreamOperator<Word> minByResult = wordSumKeyedStream.minBy("len"); SingleOutputStreamOperator<Word> maxByResult = wordSumKeyedStream.maxBy("len"); minByResult.print("minByResult:"); maxByResult.print("maxByResult:"); env.execute(); |
-
-
-
- 分區
-
-
前面運作架構中有資料從上遊到下遊的發送方式,可以是一對一可以是重新分發。
重新分發又有幾種分發方式,使用者定義分發方式、随機分發、平衡随機分發、組内随機、下遊每個分區一份。
- RebalancePartitioner
Rebalance方式将資料發送至下遊
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
通過這種方式來得出是發往哪個分區的,并不是随機而是一種輪詢的方式發送。
API操作:
source.rebalance().print("rebalance").setParallelism(6);
- RescalePartitioner
這個是個限制版的rebalence,和它很像,但是rescale是會對下遊或上遊進行分組,rebalance不分組就是直接輪詢假如下遊有4個分區也不管上遊幾個分區,向下遊發送時就是固定的一個順序1,2,3,4,1,2,3,4,1,2,3,4....但是Rescale就不會這麼粗暴的輪詢,而是上遊和下遊進行一個對應分組,假如上遊有2個分區,下遊有4個分區那麼,上遊的0分區就會在1,2之間輪詢,1分區就會在3,4之間輪詢。如果上遊是4個下遊是2個那麼就是1,2向下遊的0發送,3,4向下遊的1發送。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
API操作
.rescale().print("rescale").setParallelism(4);
- GlobalPartitioner
簡單粗暴,直接給到0分區,不管怎麼來資料都是給到下遊的ID=0分區。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
API操作:
source.global().print("global").setParallelism(4);
- KeyGroupStreamPartitioner
通過Key的Hash值判斷發送給下遊的哪個分區
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
// KeyGroupRangeAssignment中的方法
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
// KeyGroupRangeAssignment中的方法
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
// KeyGroupRangeAssignment中的方法
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
API操作
.KeyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
- ForwardPartitioner
僅将元素轉發到本地運作的下遊操作的分區器,将記錄輸出到下遊本地的operator執行個體。ForwardPartitioner分區器要求上下遊算子并行度一樣,上下遊Operator同屬一個SubTasks。
注意上下遊并行度一定得是對等的,否則會運作就會報錯: Forward partitioning does not allow change of parallelism. Upstream operation: Map-15 parallelism: 2, downstream operation: Sink: Print to Std. Out-17 parallelism: 4 You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.意思就是使用Forward分區就得一緻,不然你用其他的分區政策。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
API操作:
.forward().print(“forward”);
- ShufflePartitioner
也不是輪詢就是從下遊通道中随機選一個分區。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
API操作
source.shuffle().print("shuffle").setParallelism(4);
- CustomPartitionerWrapper
使用者自定義分區,使用者指定這條資料要發送的分區編号。
public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
this.partitioner = partitioner;
this.keySelector = keySelector;
}
API操作:
source.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
int mid = numPartitions / 2;
return key.length() > 5 ? RandomUtils.nextInt(0, mid) : RandomUtils.nextInt(mid, numPartitions);
}
}, new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).map(Object::toString).print("custom").setParallelism(4);
- BroadcastPartitioner
下遊分區每個分區都分發。
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}
@Override
public boolean isBroadcast() {
return true;
}
API操作
source.broadcast().print("broadcast").setParallelism(4);
-
-
-
- 共享組
-
-
禁用算子鍊操作
算子鍊就是Operator chain,運作架構中介紹了操作鍊,可以将相同并行度的算子放到同一個Slot中執行,這樣能避免資料在多個task之間流轉來提高性能。
操作鍊可以通過env.disableChaining()全局禁用,也可以在某個算子操作後對後面的算子禁用。
Source -> map -> sink
- 并行度始終為1,預設開啟效果
- 并行度始終為1,全局禁用效果
- 并行度始終為1,Source禁用算子鍊效果
- Map并行度增大,任何階段不禁用算子鍊效果
由此可見并行度并不是随着上個算子傳遞下來的,而是沒有設定就是預設,最進階别依次是 算子後設定并行度 > env設定并行度 > 任務送出參數設定的并行度 > flink-conf.yaml配置檔案中的并行度。
設定共享組操作
設定操作的槽位共享組。Flink 會将具有相同槽共享組的操作放在同一個槽中,而将沒有槽共享組的操作保留在其他槽中。這可用于隔離插槽。如果所有輸入操作都在同一個槽共享組中,則槽共享組從輸入操作繼承。預設插槽共享組的名稱為“default”,可以通過調用 slotSharingGroup(“default”) 将操作顯式放入該組。