DataStream程式設計模型
DataStream API 主要分為三個子產品
- DataSource子產品
- Transformation子產品
- DataSink子產品
DataSources資料輸入
内置資料源
檔案
val textStream = env.readTextFile("data_example.log")
socket
val socketDataStream = env.socketTextStream("localhost", 9999)
集合類型
fromElements
fromCollection
java.util.List
第三方資料源
基本介紹: 定義flink和外部系統互動的接口邏輯
kafka
pom依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
code
def test10(): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
}
轉換操作TransForma
單single-DataStream
- map
//map算子
def test11(): Unit ={
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val mapStream = dataStream.map(t => (t._1, t._2+1))
mapStream.print()
env.execute()
}
實作 MapFunction[泛型1, 泛型2]
def test12(): Unit ={
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val mapStream: DataStream[(String, Int)] = dataStream.map(new
MapFunction[(String, Int), (String, Int)] {
override def map(t: (String, Int)): (String, Int) = {
(t._1, t._2 + 1)}
})
mapStream.print()
env.execute()
} //java方式用的多些
- flatMap
扁平化+map
- filter
進行過濾: 保留為true的資料
- keyBy
按照指定的key對資料進行Partition操作 分組,将資料放在相同分區中
- reduce
滾動的進行資料聚合處理
- Aggregations
對reduce算子進行分裝
sum
min:最小值
minBy:傳回最小值對應的元素
max:最大值
maxBy:最大值對應的元素
Multi-DataStream
- Union
将兩個或多個輸入的資料集合合成一個資料集 需要保證資料集格式一緻 輸入和輸出資料集格式一緻
- Connect
合并多種不同資料類型的資料集 保留原有的資料集類型
CoMapFunction
def test13(): Unit ={
val dataStream1 = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2 = env.fromElements(1,2,4,5,6)
val connectStream = dataStream1.connect(dataStream2)
//connectStream不能直接進行print輸出,需要進行map或flatMap進行轉換
val resultStream: DataStream[(Int, String)] = connectStream.map(new CoMapFunction[(String, Int), Int, (Int, String)] {
override def map1(in1: (String, Int)): (Int, String) = (in1._2, in1._1)
override def map2(in2: Int): (Int, String) = {
(in2, "default")
}
}) //兩個函數交替執行産生結果
resultStream.print()
env.execute()
}
CoFlatMapFuntion 也是類似
- split
/**
* 測試split
*/
def test14(): Unit ={
//建立資料集
val dataStream1: DataStream[(String, Int)] = env.fromElements(("a", 3), ("d",
4), ("c", 2), ("c", 5), ("a", 5))
//合并兩個DataStream資料集
val splitedStream: SplitStream[(String, Int)] = dataStream1.split(t => if (t._2
% 2 == 0) Seq("even") else Seq("odd"))
splitedStream.select("even").print("even")
env.execute()
}
- select
- Iterate