天天看點

Windows環境下Flink消費Kafka實作熱詞統計前言碎語進入正題第一步:安裝kafka,并驗證 第二步:編寫消費kafka消息的Flink job第三步:驗證Flink job是否符合預期文末結語

前言碎語

昨天部落客寫了

《windows環境下flink入門demo執行個體》

實作了官方提供的最簡單的單詞計數功能,今天更新下,将資料源從socket流換成生産級的消息隊列kafka來完成一樣的單詞計數功能。本文實作的重點主要有兩個部分,一是kafka環境的搭建,二是如何使用官方提供的flink-connector-kafka_2.12來消費kafka消息,其他的邏輯部分和上文類似。

進入正題

本篇博文涉及到的軟體工具以及下載下傳位址:

Apache Flink :

https://flink.apache.org/downloads.html

 ,請下載下傳最新版1.7.x,選擇單機版本

kafka:

http://kafka.apache.org/downloads

,請下載下傳最新的2.1.0

第一步:安裝kafka,并驗證 

從上面的下載下傳位址選擇二進制包下載下傳後是個壓縮包,解壓後的目錄如下:

Windows環境下Flink消費Kafka實作熱詞統計前言碎語進入正題第一步:安裝kafka,并驗證 第二步:編寫消費kafka消息的Flink job第三步:驗證Flink job是否符合預期文末結語

進入bin\windows下,找到kafka-server-start.bat和zookeeper-server-start.bat。配置檔案在config目錄下,主要配置一些日志和kafka server和zookeeper,都預設就好。如果你本地已經有zk的環境,就可以忽略zk,不然按照下面的步驟執行即可。

1. 啟動zk服務

執行:zookeeper-server-start.bat ..\..\config\zookeeper.properties

2.啟動kafka服務

執行:kafka-server-start.bat ..\..\config\server.properties

3.建立test主題

執行:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4.檢視上一步的主題是否建立成功,成功的話控制台會輸出test

執行:kafka-topics.bat --list --zookeeper localhost:2181

5.訂閱test主題消息

執行:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

6.釋出消息

執行:kafka-console-producer.bat --broker-list localhost:9092 --topic test

以上步驟成功後,我們需要驗證下是否都成功了。在第六條指令的視窗中輸入abc。如果在第5個指令視窗輸出了就代表kafka環境ok了。然後可以關掉第5個指令視窗,下面就讓Flink來消費kafka的消息

第二步:編寫消費kafka消息的Flink job

基礎步驟參考

一文。唯一的差別就是因為要消費kafka中的資料,是以需要引入一個kafka連接配接器,官方已提供到maven倉庫中,引入最新版本即可,如下:

<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.1</version>           

然後建立一個KafkaToFlink類 ,代碼邏輯和昨天的一樣,都是從一段字元串中統計每個詞語出現的次數,這個場景比較像我們的熱搜關鍵字,我标題簡化為熱詞統計了。主要的代碼如下:

/**

* Created by kl on 2019/1/30.

* Content :消費kafka資料

*/

public class KafkaToFlink {

public static void main(String\[\] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();           

  env.enableCheckpointing(5000);

/\*\*
     \* 這裡主要配置KafkaConsumerConfig需要的屬性,如:
     \* --bootstrap.servers localhost:9092 --topic test --group.id test-consumer-group
     */
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
    DataStream<WordWithCount> windowCounts = dataStream.rebalance().flatMap(new FlatMapFunction<String, WordWithCount>() {
        public void flatMap(String value, Collector<WordWithCount> out) {
            System.out.println("接收到kafka資料:" + value);
            for (String word : value.split("\\\s")) {
                out.collect(new WordWithCount(word, 1L));
            }
        }
    }).keyBy("word")
            .timeWindow(Time.seconds(2))
            .reduce(new ReduceFunction<WordWithCount>() {
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });
    windowCounts.print().setParallelism(1);
    env.execute("KafkaToFlink");
}           

}

注意下這個地方:ParameterTool.fromArgs(args);我們所有的關于KafkaConsumerConfig的配置,都是通過啟動參數傳入的,然後Flink提供了一個從args中擷取參數的工具類。這裡需要配置的就三個資訊,和我們在指令視窗建立訂閱一樣的參數即可

第三步:驗證Flink job是否符合預期

将應用打成jar包後通過Flink web上傳到Flink Server。然後,找到你送出的job,輸入如下的啟動參數,送出submit即可:

Windows環境下Flink消費Kafka實作熱詞統計前言碎語進入正題第一步:安裝kafka,并驗證 第二步:編寫消費kafka消息的Flink job第三步:驗證Flink job是否符合預期文末結語

成功運作的job的頁面如下圖,如果下圖框框中的名額一直在轉圈圈,那麼很有可能是因為你運作了其他的job,導緻Available Task Slots不夠用了。

Windows環境下Flink消費Kafka實作熱詞統計前言碎語進入正題第一步:安裝kafka,并驗證 第二步:編寫消費kafka消息的Flink job第三步:驗證Flink job是否符合預期文末結語

預設的Flink的Slots配置是1,當出現任務插槽不夠用時,上圖圈圈轉一會就會失敗,然後打開job manager 點選log就可以看到job因為沒有可用的任務插槽而失敗了。

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0

at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:535)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
           

上面的問題可以通過修改conf/flink-conf.yaml中的taskmanager.numberOfTaskSlots來設定,具體指單個TaskManager可以運作的并行操作員或使用者功能執行個體的數量。如果此值大于1,則單個TaskManager将擷取函數或運算符的多個執行個體。這樣,TaskManager可以使用多個CPU核心,但同時,可用記憶體在不同的操作員或功能執行個體之間劃分。此值通常與TaskManager的計算機具有的實體CPU核心數成比例(例如,等于核心數,或核心數的一半)。當然,如果你修改了配置檔案,Flink Server是需要重新開機的。重新開機成功後,可以在大盤看到,如下圖箭頭:

Windows環境下Flink消費Kafka實作熱詞統計前言碎語進入正題第一步:安裝kafka,并驗證 第二步:編寫消費kafka消息的Flink job第三步:驗證Flink job是否符合預期文末結語

一切就緒後,在kafka-console-producer視窗中輸入字元串回車,就會在flink job視窗中看到相關的資訊了,效果前文一樣,如圖:

Windows環境下Flink消費Kafka實作熱詞統計前言碎語進入正題第一步:安裝kafka,并驗證 第二步:編寫消費kafka消息的Flink job第三步:驗證Flink job是否符合預期文末結語

文末結語

本文算昨天hello wrod入門程式的更新版,實作了消費kafka中的消息來統計熱詞的功能。後面生産環境也打算使用kafka來傳遞從mysql binlog中心解析到的消息,算是一個生産執行個體的敲門磚吧。正如部落客昨天所說的,落地的過程肯定會有很多問題,像上面的taskmanager.numberOfTaskSlots的設定。後面會繼續将我們落地過程中的問題記錄下來,歡迎關注凱京科技一起交流。