天天看点

阿里云数据总线(DataHub)使用Flume插件导入数据示例

Step By Step

主要操作步骤

1、Java环境安装

2、Apache Maven安装

3、Flume-NG安装

4、配置导入数据

一、JAVA环境安装

1、更新软件包列表

sudo apt-get update

2、安装openjdk-8-jdk

sudo apt-get install openjdk-8-jdk

3、查看java版本,看看是否安装成功

java -version
阿里云数据总线(DataHub)使用Flume插件导入数据示例

二、Apache Maven安装

1、安装

apt install maven

2、查看安装版本

mvn -v
阿里云数据总线(DataHub)使用Flume插件导入数据示例

三、Flume-NG安装

1、flume下载,下载

地址
wget https://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

2、解压

tar zxvf apache-flume-1.9.0-bin.tar.gz
阿里云数据总线(DataHub)使用Flume插件导入数据示例

3、下载flume-datahub插件,下载

https://aliyun-datahub.oss-cn-hangzhou.aliyuncs.com/tools/aliyun-flume-datahub-sink-2.0.4.tar.gz

4、解压flume插件并放在${FLUME_HOME}/plugins.d目录下(本示例${FLUME_HOME}值为:apache-flume-1.9.0-bin)

tar -zxvf aliyun-flume-datahub-sink-2.0.4.tar.gz

mkdir apache-flume-1.9.0-bin/plugins.d

mv aliyun-flume-datahub-sink apache-flume-1.9.0-bin/plugins.d

5、安装效果查看

apache-flume-1.9.0-bin/bin/flume-ng version
阿里云数据总线(DataHub)使用Flume插件导入数据示例

四、配置导入数据

1、数据文件(demo.txt)

0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
0,YxCOHXcst1NlL5ebJM9YmvQ1f8oy8neb3obdeoS0,true,1254275.1144629316,1573206062763,1254275.1144637289
1,hHVNjKW5DsRmVXjguwyVDjzjn60wUcOKos9Qym0V,false,1254275.1144637289,1573206062763,1254275.1144637289
2,vnXOEuKF4Xdn5WnDCPbzPwTwDj3k1m3rlqc1vN2l,true,1254275.1144637289,1573206062763,1254275.1144637289
3,t0AGT8HShzroBVM3vkP37fIahg2yDqZ5xWfwDFJs,false,1254275.1144637289,1573206062763,1254275.1144637289
4,MKwZ1nczmCBp6whg1lQeFLZ6E628lXvFncUVcYWI,true,1254275.1144637289,1573206062763,1254275.1144637289
5,bDPQJ656xvPGw1PPjhhTUZyLJGILkNnpqNLaELWV,false,1254275.1144637289,1573206062763,1254275.1144637289
6,wWF7i4X8SXNhm4EfClQjQF4CUcYQgy3XnOSz0StX,true,1254275.1144637289,1573206062763,1254275.1144637289
7,whUxTNREujMP6ZrAJlSVhCEKH1KH9XYJmOFXKbh8,false,1254275.1144637289,1573206062763,1254275.1144637289
8,OYcS1WkGcbZFbPLKaqU5odlBf7rHDObkQJdBDrYZ,true,1254275.1144637289,1573206062763,1254275.1144637289           

2、DataHub Topic Schema

字段名称 字段类型
id BIGINT
name STRING
gender BOOLEAN
salary DOUBLE
my_time TIMESTAMP
decimal DECIMAL
阿里云数据总线(DataHub)使用Flume插件导入数据示例

3、配置文件

# A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /root/flume/demo.txt
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessId = LTAIOZZ******
a1.sinks.k1.datahub.accessKey = v7CjUJCMk7j9aKdu************
a1.sinks.k1.datahub.endPoint = https://dh-cn-shanghai.aliyuncs.com
a1.sinks.k1.datahub.project = flume_project
a1.sinks.k1.datahub.topic = flume
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,
a1.sinks.k1.serializer.fieldnames = id,name,gender,salary,my_time,decimal
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.datahub.retryTimes = 5
a1.sinks.k1.datahub.retryInterval = 5
a1.sinks.k1.datahub.batchSize = 100
a1.sinks.k1.datahub.batchTimeout = 5
a1.sinks.k1.datahub.enablePb = true
a1.sinks.k1.datahub.compressType = DEFLATE
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1           

4、测试效果(本地测试按照自己实际文件路径配置即可)

apache-flume-1.9.0-bin/bin/flume-ng agent -n a1 -c conf -f datahub.conf -Dflume.root.logger=INFO,console
阿里云数据总线(DataHub)使用Flume插件导入数据示例
阿里云数据总线(DataHub)使用Flume插件导入数据示例

参考链接

Flume插件 Flume-ng 的原理和使用