天天看點

datax 定時執行多個job_Datax二次開發插件(KafkaWriter)

之前寫了一篇關于datax的一些介紹和一些簡單的操作,有的時候可能我們的需求可能datax實作不了,不過datax是支援二次開發的,我們可以自己開發來。今天就說下寫入讀取資料然後寫入到kafka中

這個我之前已經寫好了,整合了下網上已有的在加上自己的了解和需求簡單的寫了下。

下面說下配置

首先先将datax從github下載下傳下來.

alibaba/DataX​github.com

datax 定時執行多個job_Datax二次開發插件(KafkaWriter)

然後去下載下傳kafkawriter

https://download.csdn.net/download/qq_37716298/11142069

1.将datax解壓并将kafkawriter放入到com.alibaba.datax.plugin.writer下

2.在總的pom.xml中modules中加入

<module>kafkawriter</module>           

3.在package.xml中加入

<fileSet>
            <directory>kafkawriter/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>           

4.将otsstreamreader中pom檔案

<!--Other 依賴 -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>tablestore-streamclient</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
	修改成
	<!--Other 依賴 -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>tablestore-streamclient</artifactId>
            <version>1.0.0</version>
        </dependency>           

原因是因為maven倉庫中沒有1.0.0-SNAPSHOT

datax 定時執行多個job_Datax二次開發插件(KafkaWriter)

5.跳過測試打包

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

6.寫json腳本

{
  "job": {
    "setting": {
      "speed": {
        "channel":1
      }
    },
    "content": [
      {
        "reader": {
          "name": "postgresqlreader",
          "parameter": {
            "username": "***",
            "password": "***",
            "connection": [
              {
                "querySql": [
                  "select 字段名 from test  "
                ],
                "jdbcUrl": [
                  "jdbc:postgresql://xx.xx.xx.xx:5433/miasdw_dev"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "kafkawriter",
          "parameter": {
            "topic": "topic名稱",
            "bootstrapServers": "xx.xx.xx.xx:9092",
            "fieldDelimiter":",",
			"columns":"字段名",
			"topicName":"額外的操作,也放入到kafka中"
          }
        }
        }
    ]
  }
}           

7。去datax bin中執行 python datax.py 腳本名.json (記得啟動kafka)

成功!!!

datax 定時執行多個job_Datax二次開發插件(KafkaWriter)

datax成功寫入kafka

datax 定時執行多個job_Datax二次開發插件(KafkaWriter)

kafka消費成功(因為我寫了個簡單的定時器,是以執行了三次)

繼續閱讀