天天看點

flink checkpoint 重新開機_Flink 批處理Api 0428

flink checkpoint 重新開機_Flink 批處理Api 0428

Flink 批處理Api

2.1 Source

Flink+kafka是如何實作exactly-once語義的

  Flink通過

checkpoint

來儲存資料是否處理完成的狀态;

  有JobManager協調各個TaskManager進行checkpoint存儲,checkpoint儲存在

StateBackend

中,預設StateBackend是記憶體級的,也可以改為檔案級的進行持久化儲存。

執行過程實際上是一個兩段式送出,

每個算子執行完成,會進行“預送出”,直到執行完sink操作,會發起“确認送出”,如果執行失敗,預送出會放棄掉。 如果當機需要通過StateBackend進行恢複,隻能恢複所有确認送出的操作。
flink checkpoint 重新開機_Flink 批處理Api 0428

Spark中要想實作有狀态的,需要使用updateBykey或者借助redis;

而Fink是把它記錄在State Bachend,隻要是經過keyBy等處理之後結果會記錄在State Bachend(

已處理未送出

; 如果是處理完了就是已送出狀态;),

它還會記錄另外一種狀态值:

keyState,比如keyBy累積的結果;

StateBachend如果不想存儲在記憶體中,也可以存儲在fs檔案中或者HDFS中;

IDEA的工具隻支援memory記憶體式存儲,一旦重新開機就沒了;部署到linux中就支援存儲在檔案中了; Kakfa的自動送出:“enable.auto.commit”,比如從kafka出來後到sparkStreaming之後,一進來consumer會幫你自動送出,如果在處理過程中,到最後有一個沒有寫出去(比如寫到redis、ES),雖然處理失敗了但kafka的偏移量已經發生改變;是以移偏移量的時機很重要;

Transform 轉換算子

map

object StartupApp { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) //dstream.print().setParallelism(1) 測試從kafka中獲得資料是否打通到了flink中 //将json轉換成json對象 val startupLogDStream: DataStream[StartupLog] = dstream.map { jsonString => JSON.parseObject(jsonString, classOf[StartupLog]) } //需求一 相同管道的值進行累加 val sumDStream: DataStream[(String, Int)] = startupLogDStream

.map

{ startuplog => (startuplog.ch, 1) }.keyBy(0)

 .reduce

{ (startuplogCount1, startuplogCount2) => val newCount: Int = startuplogCount1._2 + startuplogCount2._2 (startuplogCount1._1, newCount) } //val sumDStream: DataStream[(String, Int)] = startupLogDStream.map{startuplog => (startuplog.ch,1)}.keyBy(0).sum(1) //sumDStream.print() env.execute() } }