DataStream编程模型
DataStream API 主要分为三个模块
- DataSource模块
- Transformation模块
- DataSink模块
DataSources数据输入
内置数据源
文件
val textStream = env.readTextFile("data_example.log")
socket
val socketDataStream = env.socketTextStream("localhost", 9999)
集合类型
fromElements
fromCollection
java.util.List
第三方数据源
基本介绍: 定义flink和外部系统交互的接口逻辑
kafka
pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
code
def test10(): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
}
转换操作TransForma
单single-DataStream
- map
//map算子
def test11(): Unit ={
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val mapStream = dataStream.map(t => (t._1, t._2+1))
mapStream.print()
env.execute()
}
实现 MapFunction[泛型1, 泛型2]
def test12(): Unit ={
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val mapStream: DataStream[(String, Int)] = dataStream.map(new
MapFunction[(String, Int), (String, Int)] {
override def map(t: (String, Int)): (String, Int) = {
(t._1, t._2 + 1)}
})
mapStream.print()
env.execute()
} //java方式用的多些
- flatMap
扁平化+map
- filter
进行过滤: 保留为true的数据
- keyBy
按照指定的key对数据进行Partition操作 分组,将数据放在相同分区中
- reduce
滚动的进行数据聚合处理
- Aggregations
对reduce算子进行分装
sum
min:最小值
minBy:返回最小值对应的元素
max:最大值
maxBy:最大值对应的元素
Multi-DataStream
- Union
将两个或多个输入的数据集合合成一个数据集 需要保证数据集格式一致 输入和输出数据集格式一致
- Connect
合并多种不同数据类型的数据集 保留原有的数据集类型
CoMapFunction
def test13(): Unit ={
val dataStream1 = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
val dataStream2 = env.fromElements(1,2,4,5,6)
val connectStream = dataStream1.connect(dataStream2)
//connectStream不能直接进行print输出,需要进行map或flatMap进行转换
val resultStream: DataStream[(Int, String)] = connectStream.map(new CoMapFunction[(String, Int), Int, (Int, String)] {
override def map1(in1: (String, Int)): (Int, String) = (in1._2, in1._1)
override def map2(in2: Int): (Int, String) = {
(in2, "default")
}
}) //两个函数交替执行产生结果
resultStream.print()
env.execute()
}
CoFlatMapFuntion 也是类似
- split
/**
* 测试split
*/
def test14(): Unit ={
//创建数据集
val dataStream1: DataStream[(String, Int)] = env.fromElements(("a", 3), ("d",
4), ("c", 2), ("c", 5), ("a", 5))
//合并两个DataStream数据集
val splitedStream: SplitStream[(String, Int)] = dataStream1.split(t => if (t._2
% 2 == 0) Seq("even") else Seq("odd"))
splitedStream.select("even").print("even")
env.execute()
}
- select
- Iterate