天天看点

Flume数据采集案例(三)多数据源汇总

准备工作:

同上上一篇 Flume数据采集案例之单数据源多出口(选择器)

本次需求:

使用在机器weekend01上部署的Flume-a1准实时监控日志文件 /home/hadoop/datas/test.log 的变动, 在机器weekend110上部署的Flume-a2准实时监控本机目标端口44444的数据变动,Flume-a1与Flume-a2将数据发送给在机器weekend02上部署的的Flume-a3,Flume-a3将最终数据存储到HDFS.

实现:

  1. 分别在三台机器上的 /home/hadoop/app/flume-1.7.0-bin/ 下创建一个job目录,用来专门存放要配置的conf配置文件。
  2. 分别在三台机器的 /home/hadoop/app/flume-1.7.0-bin/job 目录下配置各自的conf文件,如下:

在weekend01上,配置Source用于监控test.log文件,配置Sink输出数据到下一级Flume,配置文件 flume1-file-flume.conf 的内容如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/datas/test.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = weekend02
a1.sinks.k1.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

在weekend110上,配置Source监控端口44444数据流,配置Sink数据到下一级Flume,配置文件 flume2-netcat-flume.conf 的内容如下:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = weekend110
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = weekend02
a2.sinks.k1.port = 4142

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
           

在weekend02上,配置source用于接收flume1与flume2发送过来的数据流,最终合并后sink到HDFS,配置文件 flume3-flume-hdfs.conf 的内容如下:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = weekend02
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://weekend110:9000/flume5/%Y%m%d/%H
# #???????
a3.sinks.k1.hdfs.filePrefix = flume5-
# #???????????
a3.sinks.k1.hdfs.round = true
# #???????????????
a3.sinks.k1.hdfs.roundValue = 1
# #????????
a3.sinks.k1.hdfs.roundUnit = hour
# #?????????
a3.sinks.k1.hdfs.useLocalTimeStamp = true
# #?????Event?flush?HDFS??
a3.sinks.k1.hdfs.batchSize = 100
# #??????,?????
a3.sinks.k1.hdfs.fileType = DataStream
# #??????????
a3.sinks.k1.hdfs.rollInterval = 120
#??????????????128M
a3.sinks.k1.hdfs.rollSize = 134217700
#??????Event????
a3.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
           
  1. 分别执行对应的配置文件:
[hadoop@weekend01 flume-1.7.0-bin]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume1-file-flume.conf 
[hadoop@weekend110 flume-1.7.0-bin]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume2-netcat-flume.conf 
[hadoop@weekend02 flume-1.7.0-bin]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume3-flume-hdfs.conf 
           
  1. 执行测试和查看最终结果
    Flume数据采集案例(三)多数据源汇总
    Flume数据采集案例(三)多数据源汇总
    Flume数据采集案例(三)多数据源汇总

继续阅读