前言碎語
昨天部落客寫了
《windows環境下flink入門demo執行個體》實作了官方提供的最簡單的單詞計數功能,今天更新下,将資料源從socket流換成生産級的消息隊列kafka來完成一樣的單詞計數功能。本文實作的重點主要有兩個部分,一是kafka環境的搭建,二是如何使用官方提供的flink-connector-kafka_2.12來消費kafka消息,其他的邏輯部分和上文類似。
進入正題
本篇博文涉及到的軟體工具以及下載下傳位址:
Apache Flink :
https://flink.apache.org/downloads.html,請下載下傳最新版1.7.x,選擇單機版本
kafka:
http://kafka.apache.org/downloads,請下載下傳最新的2.1.0
第一步:安裝kafka,并驗證
從上面的下載下傳位址選擇二進制包下載下傳後是個壓縮包,解壓後的目錄如下:
進入bin\windows下,找到kafka-server-start.bat和zookeeper-server-start.bat。配置檔案在config目錄下,主要配置一些日志和kafka server和zookeeper,都預設就好。如果你本地已經有zk的環境,就可以忽略zk,不然按照下面的步驟執行即可。
1. 啟動zk服務
執行:zookeeper-server-start.bat ..\..\config\zookeeper.properties
2.啟動kafka服務
執行:kafka-server-start.bat ..\..\config\server.properties
3.建立test主題
執行:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4.檢視上一步的主題是否建立成功,成功的話控制台會輸出test
執行:kafka-topics.bat --list --zookeeper localhost:2181
5.訂閱test主題消息
執行:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
6.釋出消息
執行:kafka-console-producer.bat --broker-list localhost:9092 --topic test
以上步驟成功後,我們需要驗證下是否都成功了。在第六條指令的視窗中輸入abc。如果在第5個指令視窗輸出了就代表kafka環境ok了。然後可以關掉第5個指令視窗,下面就讓Flink來消費kafka的消息
第二步:編寫消費kafka消息的Flink job
基礎步驟參考
一文。唯一的差別就是因為要消費kafka中的資料,是以需要引入一個kafka連接配接器,官方已提供到maven倉庫中,引入最新版本即可,如下:
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.7.1</version>
然後建立一個KafkaToFlink類 ,代碼邏輯和昨天的一樣,都是從一段字元串中統計每個詞語出現的次數,這個場景比較像我們的熱搜關鍵字,我标題簡化為熱詞統計了。主要的代碼如下:
/**
* Created by kl on 2019/1/30.
* Content :消費kafka資料
*/
public class KafkaToFlink {
public static void main(String\[\] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
/\*\*
\* 這裡主要配置KafkaConsumerConfig需要的屬性,如:
\* --bootstrap.servers localhost:9092 --topic test --group.id test-consumer-group
*/
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
DataStream<WordWithCount> windowCounts = dataStream.rebalance().flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
System.out.println("接收到kafka資料:" + value);
for (String word : value.split("\\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("KafkaToFlink");
}
}
注意下這個地方:ParameterTool.fromArgs(args);我們所有的關于KafkaConsumerConfig的配置,都是通過啟動參數傳入的,然後Flink提供了一個從args中擷取參數的工具類。這裡需要配置的就三個資訊,和我們在指令視窗建立訂閱一樣的參數即可
第三步:驗證Flink job是否符合預期
将應用打成jar包後通過Flink web上傳到Flink Server。然後,找到你送出的job,輸入如下的啟動參數,送出submit即可:
成功運作的job的頁面如下圖,如果下圖框框中的名額一直在轉圈圈,那麼很有可能是因為你運作了其他的job,導緻Available Task Slots不夠用了。
預設的Flink的Slots配置是1,當出現任務插槽不夠用時,上圖圈圈轉一會就會失敗,然後打開job manager 點選log就可以看到job因為沒有可用的任務插槽而失敗了。
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2, slots allocated: 0
at org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:535)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
上面的問題可以通過修改conf/flink-conf.yaml中的taskmanager.numberOfTaskSlots來設定,具體指單個TaskManager可以運作的并行操作員或使用者功能執行個體的數量。如果此值大于1,則單個TaskManager将擷取函數或運算符的多個執行個體。這樣,TaskManager可以使用多個CPU核心,但同時,可用記憶體在不同的操作員或功能執行個體之間劃分。此值通常與TaskManager的計算機具有的實體CPU核心數成比例(例如,等于核心數,或核心數的一半)。當然,如果你修改了配置檔案,Flink Server是需要重新開機的。重新開機成功後,可以在大盤看到,如下圖箭頭:
一切就緒後,在kafka-console-producer視窗中輸入字元串回車,就會在flink job視窗中看到相關的資訊了,效果前文一樣,如圖:
文末結語
本文算昨天hello wrod入門程式的更新版,實作了消費kafka中的消息來統計熱詞的功能。後面生産環境也打算使用kafka來傳遞從mysql binlog中心解析到的消息,算是一個生産執行個體的敲門磚吧。正如部落客昨天所說的,落地的過程肯定會有很多問題,像上面的taskmanager.numberOfTaskSlots的設定。後面會繼續将我們落地過程中的問題記錄下來,歡迎關注凱京科技一起交流。