之前寫了一篇關于datax的一些介紹和一些簡單的操作,有的時候可能我們的需求可能datax實作不了,不過datax是支援二次開發的,我們可以自己開發來。今天就說下寫入讀取資料然後寫入到kafka中
這個我之前已經寫好了,整合了下網上已有的在加上自己的了解和需求簡單的寫了下。
下面說下配置
首先先将datax從github下載下傳下來.
alibaba/DataXgithub.com
然後去下載下傳kafkawriter
https://download.csdn.net/download/qq_37716298/11142069
1.将datax解壓并将kafkawriter放入到com.alibaba.datax.plugin.writer下
2.在總的pom.xml中modules中加入
<module>kafkawriter</module>
3.在package.xml中加入
<fileSet>
<directory>kafkawriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
4.将otsstreamreader中pom檔案
<!--Other 依賴 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-streamclient</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
修改成
<!--Other 依賴 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-streamclient</artifactId>
<version>1.0.0</version>
</dependency>
原因是因為maven倉庫中沒有1.0.0-SNAPSHOT
5.跳過測試打包
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
6.寫json腳本
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "***",
"password": "***",
"connection": [
{
"querySql": [
"select 字段名 from test "
],
"jdbcUrl": [
"jdbc:postgresql://xx.xx.xx.xx:5433/miasdw_dev"
]
}
]
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "topic名稱",
"bootstrapServers": "xx.xx.xx.xx:9092",
"fieldDelimiter":",",
"columns":"字段名",
"topicName":"額外的操作,也放入到kafka中"
}
}
}
]
}
}
7。去datax bin中執行 python datax.py 腳本名.json (記得啟動kafka)
成功!!!
datax成功寫入kafka
kafka消費成功(因為我寫了個簡單的定時器,是以執行了三次)