天天看點

Flink 流處理API之 Source TransForm

DataStream程式設計模型

DataStream API 主要分為三個子產品
  • DataSource子產品
  • Transformation子產品
  • DataSink子產品

DataSources資料輸入

内置資料源

檔案

val textStream = env.readTextFile("data_example.log")      

socket

val socketDataStream = env.socketTextStream("localhost", 9999)      

集合類型

fromElements
fromCollection
java.util.List      

第三方資料源

基本介紹: 定義flink和外部系統互動的接口邏輯

kafka

pom依賴

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>      

code

def test10(): Unit ={
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
  }      

轉換操作TransForma

單single-DataStream

  • map
//map算子
  def test11(): Unit ={
    val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
    val mapStream = dataStream.map(t => (t._1, t._2+1))
    mapStream.print()
    env.execute()
  }

  實作 MapFunction[泛型1, 泛型2]
  def test12(): Unit ={
    val dataStream = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
    val mapStream: DataStream[(String, Int)] = dataStream.map(new
        MapFunction[(String, Int), (String, Int)] {
      override def map(t: (String, Int)): (String, Int) = {
        (t._1, t._2 + 1)}
    })
    mapStream.print()
    env.execute()
  } //java方式用的多些
        
  • flatMap
扁平化+map
  • filter
進行過濾: 保留為true的資料
  • keyBy
按照指定的key對資料進行Partition操作 分組,将資料放在相同分區中
  • reduce
滾動的進行資料聚合處理
  • Aggregations
對reduce算子進行分裝
sum
min:最小值
minBy:傳回最小值對應的元素
max:最大值
maxBy:最大值對應的元素      

Multi-DataStream

  • Union
将兩個或多個輸入的資料集合合成一個資料集 需要保證資料集格式一緻 輸入和輸出資料集格式一緻
  • Connect
合并多種不同資料類型的資料集 保留原有的資料集類型
CoMapFunction

def test13(): Unit ={
    val dataStream1 = env.fromElements(("a",3),("d",4),("c",2),("c",5),("a",5))
    val dataStream2 = env.fromElements(1,2,4,5,6)

    val connectStream = dataStream1.connect(dataStream2)
    //connectStream不能直接進行print輸出,需要進行map或flatMap進行轉換
    val resultStream: DataStream[(Int, String)] = connectStream.map(new CoMapFunction[(String, Int), Int, (Int, String)] {
      override def map1(in1: (String, Int)): (Int, String) = (in1._2, in1._1)

      override def map2(in2: Int): (Int, String) = {
        (in2, "default")
      }
    }) //兩個函數交替執行産生結果
    resultStream.print()
    env.execute()
  }

CoFlatMapFuntion 也是類似      
  • split
/**
    * 測試split
    */
  def  test14(): Unit ={
    //建立資料集
    val dataStream1: DataStream[(String, Int)] = env.fromElements(("a", 3), ("d",
      4), ("c", 2), ("c", 5), ("a", 5))
    //合并兩個DataStream資料集
    val splitedStream: SplitStream[(String, Int)] = dataStream1.split(t => if (t._2
      % 2 == 0) Seq("even") else Seq("odd"))
    splitedStream.select("even").print("even")
    env.execute()
  }      
  • select
  • Iterate