Connect是Kafka 0.9版本新增的功能,可以友善的從其它源導入資料到Kafka資料流(指定Topic中),也可以友善的從Kafka資料流(指定Topic中)導出資料到其它源。
下面結合官方教程詳述如何使用File Connector導入資料到Kafka Topic,和導出資料到File:
(1)建立文本檔案test.txt,作為其它資料源。
[root@localhost home]# echo -e "connector\ntest" > test.txt
(2)啟動Connect實驗腳本,此腳本為官方提供的實驗腳本,預設Connector是 File Connector。
[root@localhost kafka_2.12-0.10.2.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
出現下方錯誤,是因為檔案位置不對,預設應将test.txt檔案建立在Kafka目錄下,和bin目錄同級。
[2017-03-20 13:36:14,879] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:106)
出現下方錯誤,是因為Standalone模式Zookeeper會自動停止工作,重新開機Zookeeper伺服器即可,如錯誤繼續出現,重新開機Kafka伺服器即可。
[2017-03-20 13:38:07,832] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)
[2017-03-20 13:38:22,833] ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)
(3)檢視導出檔案,test.sink.txt,可以看到消費到的消息。
[root@localhost kafka_2.12-0.10.2.0]# cat test.sink.txt
connector
test
(4)消息已被存儲到Topic:connect-test ,也可以啟動一個消費者消費消息。
[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning &
消費者消費的消息 :
[root@localhost kafka_2.12-0.10.2.0]# {"schema":{"type":"string","optional":false},"payload":"connector"}
{"schema":{"type":"string","optional":false},"payload":"test"}
(5)編輯檔案test.txt,新增一條消息,由于Connector此時已經啟動,可以實時的看到消費者消費到的新消息。
[root@localhost kafka_2.12-0.10.2.0]# echo "Another line" >> test.txt
新的消息,已被實時消費:
[root@localhost kafka_2.12-0.10.2.0]# {"schema":{"type":"string","optional":false},"payload":"connector"}
{"schema":{"type":"string","optional":false},"payload":"test"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}
本文屬作者原創,轉貼請聲明!