Map
DataStream → DataStream:輸入一個參數産生一個參數。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
val streamMap = stream.map { x => x * 2 }
streamFilter.print()
env.execute("FirstJob")
注意:stream.print():每一行前面的數字代表這一行是哪一個并行線程輸出的。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.utils.ParameterTool;
import scala.Tuple2;
import java.util.Random;
public class StuScore {
private static Random rand = new Random();
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
DataSet<String> text;
if (params.has("input")) {
text = env.readTextFile("F:\\date\\flinkdata\\stu.txt");
}else{
System.out.println("請檢查你的輸入");
return;
}
MapOperator<String, Tuple2<String, Integer>> stuscore = text.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, rand.nextInt(100) + 1);
}
});
if (params.has("output")) {
stuscore.writeAsCsv("F:\\date\\flinkdata\\personinput\\A");
}else {
System.out.println("列印到控制台");
stuscore.print();
}
}
}
FlatMap
DataStream → DataStream:輸入一個參數,産生0個、1個或者多個輸出。
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
streamFilter.print()
env.execute("FirstJob")
Filter
DataStream → DataStream:結算每個元素的布爾值,并傳回布爾值為true的元素。下面這個例子是過濾出非0的元素:
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
val streamFilter = stream.filter{
//列印奇數
x => (x % 2 != 0)
}
streamFilter.print()
env.execute("FirstJob")
Connect
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL0UzM0ETOyIjM4EjNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
DataStream,DataStream → ConnectedStreams:連接配接兩個保持他們類型的資料流,兩個資料流被Connect之後,隻是被放在了一個同一個流中,内部依然保持各自的資料和形式不發生任何變化,兩個流互相獨立。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))
//streamMap和streamCollect交換順序不會影響結果
val streamConnect = streamMap.connect(streamCollect)
streamConnect.map(item=>println(item), item=>println(item))
env.execute("FirstJob")
CoMap,CoFlatMap
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分别進行map和flatMap處理。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
(str) => str + "connect",
(in) => in + 100
)
streamCoMap.print()
env.execute("FirstJob")
//========================
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
(str1) => str1.split(" "),
(str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))
env.execute("FirstJob")
Split
DataStream → SplitStream:根據某些特征把一個DataStream拆分成兩個或者多個DataStream。注:此代碼無法運作出結果,使用Select即可運作
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
num =>
//字元串内容為hadoop的組成一個DataStream,其餘的組成一個DataStream
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)
env.execute("FirstJob")
Select
SplitStream→DataStream:從一個SplitStream中擷取一個或者多個DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
num =>
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)
val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
other.print()
env.execute("FirstJob")
Union
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,産生一個包含所有DataStream元素的新DataStream。注意:如果你将一個DataStream跟它自己做union操作,在新的DataStream中,你将看到每一個元素都出現兩次。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)
env.execute("FirstJob")
KeyBy
DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地将一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在内部以hash的形式實作的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
val streamMap = streamFlatMap.map{
x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")
Reduce
KeyedStream → DataStream:一個分組資料流的聚合操作,合并目前的元素和上次聚合的結果,産生一個新的值,傳回的流中包含每一次聚合的結果,而不是隻傳回最後一次聚合的最終結果。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
val streamReduce = stream.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
streamReduce.print()
env.execute("FirstJob")
Fold
KeyedStream → DataStream:一個有初始值的分組資料流的滾動折疊操作,合并目前元素和前一次折疊操作的結果,并産生一個新的值,傳回的流中包含每一次折疊的結果,而不是隻傳回最後一次折疊的最終結果。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
val streamReduce = stream.fold(100)(
(begin, item) => (begin + item._2)
)
streamReduce.print()
env.execute("FirstJob")
Aggregations
KeyedStream → DataStream:分組資料流上的滾動聚合操作。min和minBy的差別是min傳回的是一個最小值,而minBy傳回的是其字段中包含最小值的元素(同樣原理适用于max和maxBy),傳回的流中包含每一次聚合的結果,而不是隻傳回最後一次聚合的最終結果。
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
val streamReduce = stream.sum(1)
streamReduce.print()
env.execute("FirstJob")
在2.3.10之前的算子都是可以直接作用在Stream上的,因為他們不是聚合類型的操作,但是到2.3.10後你會發現,我們雖然可以對一個無邊界的流資料直接應用聚合算子,但是它會記錄下每一次的聚合結果,這往往不是我們想要的,其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,隻有配合Window,才能得到想要的結果。