天天看点

Flink 流处理API之 Source TransForm

DataStream编程模型

DataStream API 主要分为三个模块
  • DataSource模块
  • Transformation模块
  • DataSink模块

DataSources数据输入

内置数据源

文件

val textStream = env.readTextFile("data_example.log")      

socket

val socketDataStream = env.socketTextStream("localhost", 9999)      

集合类型

fromElements
fromCollection
java.util.List      

第三方数据源

基本介绍: 定义flink和外部系统交互的接口逻辑

kafka

pom依赖

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>      

code

def test10(): Unit ={
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
  }      

转换操作TransForma

单single-DataStream

  • map
//map算子
  def test11(): Unit ={
    val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
    val mapStream = dataStream.map(t => (t._1, t._2+1))
    mapStream.print()
    env.execute()
  }

  实现 MapFunction[泛型1, 泛型2]
  def test12(): Unit ={
    val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
    val mapStream: DataStream[(String, Int)] = dataStream.map(new
        MapFunction[(String, Int), (String, Int)] {
      override def map(t: (String, Int)): (String, Int) = {
        (t._1, t._2 + 1)}
    })
    mapStream.print()
    env.execute()
  } //java方式用的多些
        
  • flatMap
扁平化+map
  • filter
进行过滤: 保留为true的数据
  • keyBy
按照指定的key对数据进行Partition操作 分组,将数据放在相同分区中
  • reduce
滚动的进行数据聚合处理
  • Aggregations
对reduce算子进行分装
sum
min:最小值
minBy:返回最小值对应的元素
max:最大值
maxBy:最大值对应的元素      

Multi-DataStream

  • Union
将两个或多个输入的数据集合合成一个数据集 需要保证数据集格式一致 输入和输出数据集格式一致
  • Connect
合并多种不同数据类型的数据集 保留原有的数据集类型
CoMapFunction

def test13(): Unit ={
    val dataStream1 = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
    val dataStream2 = env.fromElements(1,2,4,5,6)

    val connectStream = dataStream1.connect(dataStream2)
    //connectStream不能直接进行print输出,需要进行map或flatMap进行转换
    val resultStream: DataStream[(Int, String)] = connectStream.map(new CoMapFunction[(String, Int), Int, (Int, String)] {
      override def map1(in1: (String, Int)): (Int, String) = (in1._2, in1._1)

      override def map2(in2: Int): (Int, String) = {
        (in2, "default")
      }
    }) //两个函数交替执行产生结果
    resultStream.print()
    env.execute()
  }

CoFlatMapFuntion 也是类似      
  • split
/**
    * 测试split
    */
  def  test14(): Unit ={
    //创建数据集
    val dataStream1: DataStream[(String, Int)] = env.fromElements(("a", 3), ("d",
      4), ("c", 2), ("c", 5), ("a", 5))
    //合并两个DataStream数据集
    val splitedStream: SplitStream[(String, Int)] = dataStream1.split(t => if (t._2
      % 2 == 0) Seq("even") else Seq("odd"))
    splitedStream.select("even").print("even")
    env.execute()
  }      
  • select
  • Iterate