3 Flume入门
3.1 监控端口数据(官方案例)
使用Flume监听一个端口,收集该端口数据,并打印到控制台。
首先可以确定的是source用netcat;channel用memory;sink用logger
步骤1:安装netcat工具
sudo yum install -y nc
步骤2:判断44444端口是否被占用
sudo netstat -lnp | grep 44444
步骤3:创建job文件夹,在里面写flume agent的配置文件flume-netcat-logger.conf
cd /opt/module/flume
mkdir job
cd job
vim flume-netcat-logger.conf
步骤4:flume-netcat-logger.conf内写:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html
步骤5:开启flume监听端口
–conf / -c : 表示配置文件存储的conf目录
–conf-file / -f : 表示本次启动读取的配置文件是在job文件下的flume-telnet.conf文件
–name / -n : 表示给agent起名为a1
-Dflume.root.logger=INFO,console:表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。
写法一:
[[email protected] flume]$ bin/flume-ng agent --conf conf/ --conf-file job/flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
写法二:
[[email protected] flume]$ bin/flume-ng agent -c conf/ -f job/flume-netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
步骤6:使用netcat工具向本机的44444端口发送内容
[[email protected] ~]$ nc localhost 44444
hello
atguigu
步骤7:在Flume监听页面观察接收数据情况
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iZmRWZhJTM0AjZ4QTNlNTY1QGO5YmN1EmN2UDZzQWYh9CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
3.2 实时监控单个追加文件
exec - memory - logger
实时监测单个hive.log文件,在控制台上监控该文件的日志变化。
首先可以确定的是source用exec;channel用memory;sink用logger
步骤1:创建flume-exec-logger.conf文件
vim flume-exec-logger.conf
步骤2:
如果tail -f /opt/module/hive/logs/hive.log首次开启hive时,此时hive的日志还没有滚动,这个时候用 tail -f无法成功!需要用-F监控。
tail -F -c +0 可以从头读到尾。
# Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Sources
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Sink
a1.sinks.k1.type = logger
# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
步骤3:开启flume监听端口
[[email protected] flume]$ bin/flume-ng agent -c conf/ -f flume-exec-logger.conf -n a1 -Dflume.root.logger=INFO,console
步骤4:向/opt/module/hive/logs/hive.log追加内容
echo "sunyan" >> /opt/module/hive/logs/hive.log
exec - memory - hdfs
实时监控单个hive.log文件,将日志文件上传到hdfs
首先可以确定的是source用exec;channel用memory;sink用hdfs
步骤1:创建flume-exec-hdfs.conf文件
vim flume-exec-hdfs.conf
步骤2:
# Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Sink
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
步骤3:开启hive,执行些hql
3.3 实时监控目录下的多个新文件
使用Flume监听整个目录的文件,并上传至HDFS
可以确定的是source使用spooldir;channel用memory;sink用hdfs
spooldir-memory-hdfs
需要说明的是,
- 1 spooldir仅仅能够监控文件,如果在spooldir监控的本地文件夹内创建文件夹,不会监控到。
- 2 spooldir不能够监控文件内容的改变。比如echo “aaa” >> 0.txt 不会监控到。
步骤1:创建flume-spooldir-hdfs.conf
cd /opt/module/flume/job
vim flume-spooldir-hdfs.conf
步骤2:配置文件:
# Name
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Bind
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
步骤3:开启flume-ng agent
[[email protected] flume]$ flume-ng agent -c conf/ -f job/flume-spooldir-hdfs2.conf -n a3
步骤4:想spooldir监控的文件夹,创建几个新的文件
cd /opt/module/flume/upload
touch 0.txt
touch 1.txt
步骤5:可以看到该文件夹内的文件名都变成了
另外可以在Sink的HDFS的路径可以看到:
3.4 实时监控目录下的多个追加文件
使用Flume监听整个目录的实时追加文件,并上传至HDFS
可以确定的是source使用taildir;channel使用memory;sink使用hdfs
Exec source适用于监控一个实时追加的文件,不能实现断点续传;其内部是a1.sources.r1.command = tail -F /var/log/secure
Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
taildir - memory - logger
步骤1:创建flume-taildir-logger.conf
cd /opt/modulr/flume/job/
vim flume-taildir-flume-logger.conf
步骤2:配置文件:
注意taildir的类型是大写!
# Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Source
a1.sources.r1.type = TAILDIR
# 维护了一个json格式的inode
a1.sources.r1.positionFile = /opt/module/flume/data/taildir2_position.json
# 监控的文件组,可以监控多个文件
a1.sources.r1.filegroups = f1
# 文件的具体路径,监控所有含有log的文件。
a1.sources.r1.filegroups.f1 = /opt/module/flume/data/.*log.*
# inode的key值,也就是envent的key值。可以不设置。
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Sink
a1.sinks.k1.type = logger
# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
步骤3:开启flume-ng agent
因为sink是logger,所以需要加上-Dflume.root.logger=INFO,console
[[email protected] job]$ flume-ng agent -c ../conf/ -f flume-taildir-logger2.conf -n a1 -Dflume.root.logger=INFO,console
步骤4:监控的文件追加内容
[[email protected] data]$ echo "licai" >> group.log
步骤5:查看结果
taildir -memory - hdfs
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3