天天看点

Flink——DataStream API

关于Flink程序的开发流程和具体案例请参考:Flink——从零搭建Flink应用。

DataSource

Datasource用于Flink程序读取数据,可通过:

StreamExecutionEnvironment.

进行配置。

内置数据源

  • 文件数据源:
    • readTextFile(path)

      :直接读取文本文件;
    • readFile(fileInputFormat, path)

      :读取指定类型的文件;
    • readFile(fileInputFormat, path, watchType, interval, pathFilter)

      :可指定读取文件的类型、检测文件变换的时间间隔、文件路径过滤条件等。

      watchType

      分为两种模式:
      • PROCESS_CONTINUOUSLY

        :一旦检测到文件变化,会将改文件全部内容加载到Flink。该模式无法实现

        Excatly Once

      • PROCESS_ONCE

        :一旦检测到文件变化,只会将变化的数据加载到Flink。该模式无法实现

        Excatly Once

  • socket数据源:
    • socketTextStream(hostname, port)

      :从Socket端口传入数据;
  • 集合数据源:
    • fromCollection(Seq)

    • fromCollection(Iterator)

    • fromElements(elements: _*)

    • fromParallelCollection(SplittableIterator)

    • generateSequence(from, to)

外部数据源

对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此,Flink通过实现

SourceFunction

定义了丰富的第三方数据连接器(支持自定义数据源):

  • Apache Kafka
  • Amazon Kinesis Streams
  • RabbitMQ
  • Apache NiFi
  • Twitter Streaming API
  • Google PubSub

DataStream Transformations

Operator

Transformation

Example

map DataStream → DataStream dataStream.map { x => x * 2 }
flatMap DataStream → DataStream dataStream.flatMap { str => str.split(" ") }
filter DataStream → DataStream dataStream.filter { _ != 0 }
keyBy DataStream → KeyedStream

dataStream.keyBy(“someKey”) // Key by field “someKey”

dataStream.keyBy(0) // Key by the first element of a Tuple

reduce KeyedStream → DataStream keyedStream.reduce { _ + _ }
fold KeyedStream → DataStream val result: DataStream[String] = keyedStream.fold(“start”)((str, i) => { str + “-” + i })
aggregations KeyedStream → DataStream

keyedStream.sum(0)

keyedStream.sum(“key”)

keyedStream.min(0)

keyedStream.min(“key”)

keyedStream.max(0)

keyedStream.max(“key”)

keyedStream.minBy(0)

keyedStream.minBy(“key”)

keyedStream.maxBy(0)

keyedStream.maxBy(“key”)

window KeyedStream → WindowedStream dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
windowAll DataStream → AllWindowedStream dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream

allWindowedStream.apply { AllWindowFunction }

Window Reduce WindowedStream → DataStream windowedStream.reduce { _ + _ }
Window Fold WindowedStream → DataStream val result: DataStream[String] = windowedStream.fold(“start”, (str, i) => { str + “-” + i })
Aggregations on windows WindowedStream → DataStream

windowedStream.sum(0)

windowedStream.sum(“key”)

windowedStream.min(0)

windowedStream.min(“key”)

windowedStream.max(0)

windowedStream.max(“key”)

windowedStream.minBy(0)

windowedStream.minBy(“key”)

windowedStream.maxBy(0)

windowedStream.maxBy(“key”)

union DataStream* → DataStream dataStream.union(otherStream1, otherStream2, …)
Window Join DataStream,DataStream → DataStream

dataStream.join(otherStream)

.where().equalTo()

.window(TumblingEventTimeWindows.of(Time.seconds(3)))

.apply { … }

Window CoGroup DataStream,DataStream → DataStream

dataStream.coGroup(otherStream)

.where(0).equalTo(1)

.window(TumblingEventTimeWindows.of(Time.seconds(3)))

.apply {}

connect DataStream,DataStream → ConnectedStreams

someStream : DataStream[Int] = …

otherStream : DataStream[String] = …

val connectedStreams = someStream.connect(otherStream)

CoMap, CoFlatMap ConnectedStreams → DataStream connectedStreams.map( (_ : Int) => true, (_ : String) => false ) connectedStreams.flatMap( (_ : Int) => true, (_ : String) => false )
split DataStream → SplitStream val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List(“even”) case 1 => List(“odd”) } )
select SplitStream → DataStream val even = split select “even” val odd = split select “odd” val all = split.select(“even”,“odd”)
iterate DataStream → IterativeStream → DataStream initialStream.iterate { iteration => { val iterationBody = iteration.map {/do something/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } }
Extract Timestamps DataStream → DataStream stream.assignTimestamps { timestampExtractor }

DataSink

经过各种数据转换操作之后,形成最终结果数据集。通常情况下,需要将结果输出在外部存储介质或者传输到下游的消息中间件内,在Flink中将DataStream数据输出到外部系统的过程被定义为DataSink操作。可通过:

StreamExecutionEnvironment.

进行配置。

内置数据源

  • writeAsText()

    /

    TextOutputFormat

  • writeAsCsv(...)

    /

    CsvOutputFormat

  • print()

    /

    printToErr()

  • writeUsingOutputFormat()

    /

    FileOutputFormat

  • writeToSocket

外部数据源

  • Apache Kafka
  • Apache Cassandra
  • Amazon Kinesis Streams
  • Elasticsearch
  • Hadoop FileSystem
  • RabbitMQ
  • Apache NiFi
  • Google PubSub

继续阅读