天天看点

datax 定时执行多个job_Datax二次开发插件(KafkaWriter)

之前写了一篇关于datax的一些介绍和一些简单的操作,有的时候可能我们的需求可能datax实现不了,不过datax是支持二次开发的,我们可以自己开发来。今天就说下写入读取数据然后写入到kafka中

这个我之前已经写好了,整合了下网上已有的在加上自己的理解和需求简单的写了下。

下面说下配置

首先先将datax从github下载下来.

alibaba/DataX​github.com

datax 定时执行多个job_Datax二次开发插件(KafkaWriter)

然后去下载kafkawriter

https://download.csdn.net/download/qq_37716298/11142069

1.将datax解压并将kafkawriter放入到com.alibaba.datax.plugin.writer下

2.在总的pom.xml中modules中加入

<module>kafkawriter</module>           

3.在package.xml中加入

<fileSet>
            <directory>kafkawriter/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>           

4.将otsstreamreader中pom文件

<!--Other 依赖 -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>tablestore-streamclient</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
	修改成
	<!--Other 依赖 -->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>tablestore-streamclient</artifactId>
            <version>1.0.0</version>
        </dependency>           

原因是因为maven仓库中没有1.0.0-SNAPSHOT

datax 定时执行多个job_Datax二次开发插件(KafkaWriter)

5.跳过测试打包

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

6.写json脚本

{
  "job": {
    "setting": {
      "speed": {
        "channel":1
      }
    },
    "content": [
      {
        "reader": {
          "name": "postgresqlreader",
          "parameter": {
            "username": "***",
            "password": "***",
            "connection": [
              {
                "querySql": [
                  "select 字段名 from test  "
                ],
                "jdbcUrl": [
                  "jdbc:postgresql://xx.xx.xx.xx:5433/miasdw_dev"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "kafkawriter",
          "parameter": {
            "topic": "topic名称",
            "bootstrapServers": "xx.xx.xx.xx:9092",
            "fieldDelimiter":",",
			"columns":"字段名",
			"topicName":"额外的操作,也放入到kafka中"
          }
        }
        }
    ]
  }
}           

7。去datax bin中执行 python datax.py 脚本名.json (记得启动kafka)

成功!!!

datax 定时执行多个job_Datax二次开发插件(KafkaWriter)

datax成功写入kafka

datax 定时执行多个job_Datax二次开发插件(KafkaWriter)

kafka消费成功(因为我写了个简单的定时器,所以执行了三次)

继续阅读