之前写了一篇关于datax的一些介绍和一些简单的操作,有的时候可能我们的需求可能datax实现不了,不过datax是支持二次开发的,我们可以自己开发来。今天就说下写入读取数据然后写入到kafka中
这个我之前已经写好了,整合了下网上已有的在加上自己的理解和需求简单的写了下。
下面说下配置
首先先将datax从github下载下来.
alibaba/DataXgithub.com
然后去下载kafkawriter
https://download.csdn.net/download/qq_37716298/11142069
1.将datax解压并将kafkawriter放入到com.alibaba.datax.plugin.writer下
2.在总的pom.xml中modules中加入
<module>kafkawriter</module>
3.在package.xml中加入
<fileSet>
<directory>kafkawriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
4.将otsstreamreader中pom文件
<!--Other 依赖 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-streamclient</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
修改成
<!--Other 依赖 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-streamclient</artifactId>
<version>1.0.0</version>
</dependency>
原因是因为maven仓库中没有1.0.0-SNAPSHOT
5.跳过测试打包
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
6.写json脚本
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "***",
"password": "***",
"connection": [
{
"querySql": [
"select 字段名 from test "
],
"jdbcUrl": [
"jdbc:postgresql://xx.xx.xx.xx:5433/miasdw_dev"
]
}
]
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"topic": "topic名称",
"bootstrapServers": "xx.xx.xx.xx:9092",
"fieldDelimiter":",",
"columns":"字段名",
"topicName":"额外的操作,也放入到kafka中"
}
}
}
]
}
}
7。去datax bin中执行 python datax.py 脚本名.json (记得启动kafka)
成功!!!
datax成功写入kafka
kafka消费成功(因为我写了个简单的定时器,所以执行了三次)