Flink 批處理Api
2.1 Source
Flink+kafka是如何實作exactly-once語義的Flink通過
checkpoint來儲存資料是否處理完成的狀态;
有JobManager協調各個TaskManager進行checkpoint存儲,checkpoint儲存在
StateBackend中,預設StateBackend是記憶體級的,也可以改為檔案級的進行持久化儲存。
執行過程實際上是一個兩段式送出,
每個算子執行完成,會進行“預送出”,直到執行完sink操作,會發起“确認送出”,如果執行失敗,預送出會放棄掉。 如果當機需要通過StateBackend進行恢複,隻能恢複所有确認送出的操作。Spark中要想實作有狀态的,需要使用updateBykey或者借助redis;
而Fink是把它記錄在State Bachend,隻要是經過keyBy等處理之後結果會記錄在State Bachend(
已處理未送出; 如果是處理完了就是已送出狀态;),
它還會記錄另外一種狀态值:
keyState,比如keyBy累積的結果;StateBachend如果不想存儲在記憶體中,也可以存儲在fs檔案中或者HDFS中;
IDEA的工具隻支援memory記憶體式存儲,一旦重新開機就沒了;部署到linux中就支援存儲在檔案中了; Kakfa的自動送出:“enable.auto.commit”,比如從kafka出來後到sparkStreaming之後,一進來consumer會幫你自動送出,如果在處理過程中,到最後有一個沒有寫出去(比如寫到redis、ES),雖然處理失敗了但kafka的偏移量已經發生改變;是以移偏移量的時機很重要;Transform 轉換算子
map
object StartupApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) //dstream.print().setParallelism(1) 測試從kafka中獲得資料是否打通到了flink中 //将json轉換成json對象 val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString => JSON.parseObject(jsonString, classOf[StartupLog]) } //需求一 相同管道的值進行累加 val sumDStream: DataStream[(String, Int)] = startupLogDStream
.map{ startuplog => (startuplog.ch, 1) }.keyBy(0)
.reduce{ (startuplogCount1, startuplogCount2) => val newCount: Int = startuplogCount1._2 + startuplogCount2._2 (startuplogCount1._1, newCount) } //val sumDStream: DataStream[(String, Int)] = startupLogDStream.map{startuplog => (startuplog.ch,1)}.keyBy(0).sum(1) //sumDStream.print() env.execute() } }