天天看点

大数据5_02_Flume入门案例

3 Flume入门

3.1 监控端口数据(官方案例)

使用Flume监听一个端口,收集该端口数据,并打印到控制台。

首先可以确定的是source用netcat;channel用memory;sink用logger

步骤1:安装netcat工具

sudo yum install -y nc
           

步骤2:判断44444端口是否被占用

sudo netstat -lnp | grep 44444
           

步骤3:创建job文件夹,在里面写flume agent的配置文件flume-netcat-logger.conf

cd /opt/module/flume
mkdir job
cd job
vim flume-netcat-logger.conf
           

步骤4:flume-netcat-logger.conf内写:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html

步骤5:开启flume监听端口

–conf / -c : 表示配置文件存储的conf目录

–conf-file / -f : 表示本次启动读取的配置文件是在job文件下的flume-telnet.conf文件

–name / -n : 表示给agent起名为a1

-Dflume.root.logger=INFO,console:表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。

写法一:

[[email protected] flume]$ bin/flume-ng agent --conf conf/ --conf-file job/flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
           

写法二:

[[email protected] flume]$ bin/flume-ng agent -c conf/ -f job/flume-netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
           

步骤6:使用netcat工具向本机的44444端口发送内容

[[email protected] ~]$ nc localhost 44444
hello 
atguigu
           

步骤7:在Flume监听页面观察接收数据情况

大数据5_02_Flume入门案例

3.2 实时监控单个追加文件

exec - memory - logger

实时监测单个hive.log文件,在控制台上监控该文件的日志变化。

首先可以确定的是source用exec;channel用memory;sink用logger

步骤1:创建flume-exec-logger.conf文件

vim flume-exec-logger.conf
           

步骤2:

如果tail -f /opt/module/hive/logs/hive.log首次开启hive时,此时hive的日志还没有滚动,这个时候用 tail -f无法成功!需要用-F监控。

tail -F -c +0 可以从头读到尾。

# Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Sources
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Sink
a1.sinks.k1.type = logger

# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

步骤3:开启flume监听端口

[[email protected] flume]$ bin/flume-ng agent -c conf/ -f flume-exec-logger.conf -n a1 -Dflume.root.logger=INFO,console
           

步骤4:向/opt/module/hive/logs/hive.log追加内容

echo "sunyan" >> /opt/module/hive/logs/hive.log
           
大数据5_02_Flume入门案例

exec - memory - hdfs

实时监控单个hive.log文件,将日志文件上传到hdfs

首先可以确定的是source用exec;channel用memory;sink用hdfs

步骤1:创建flume-exec-hdfs.conf文件

vim flume-exec-hdfs.conf
           

步骤2:

# Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Sink

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

步骤3:开启hive,执行些hql

大数据5_02_Flume入门案例

3.3 实时监控目录下的多个新文件

使用Flume监听整个目录的文件,并上传至HDFS

可以确定的是source使用spooldir;channel用memory;sink用hdfs

spooldir-memory-hdfs

需要说明的是,

​ - 1 spooldir仅仅能够监控文件,如果在spooldir监控的本地文件夹内创建文件夹,不会监控到。

​ - 2 spooldir不能够监控文件内容的改变。比如echo “aaa” >> 0.txt 不会监控到。

步骤1:创建flume-spooldir-hdfs.conf

cd /opt/module/flume/job
vim flume-spooldir-hdfs.conf
           

步骤2:配置文件:

# Name
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Bind
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
           

步骤3:开启flume-ng agent

[[email protected] flume]$ flume-ng agent -c conf/ -f job/flume-spooldir-hdfs2.conf -n a3
           

步骤4:想spooldir监控的文件夹,创建几个新的文件

cd /opt/module/flume/upload 
touch 0.txt
touch 1.txt
           

步骤5:可以看到该文件夹内的文件名都变成了

大数据5_02_Flume入门案例

另外可以在Sink的HDFS的路径可以看到:

大数据5_02_Flume入门案例

3.4 实时监控目录下的多个追加文件

使用Flume监听整个目录的实时追加文件,并上传至HDFS

可以确定的是source使用taildir;channel使用memory;sink使用hdfs

Exec source适用于监控一个实时追加的文件,不能实现断点续传;其内部是a1.sources.r1.command = tail -F /var/log/secure

Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;

Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

taildir - memory - logger

步骤1:创建flume-taildir-logger.conf

cd /opt/modulr/flume/job/
vim flume-taildir-flume-logger.conf
           

步骤2:配置文件:

注意taildir的类型是大写!
# Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Source
a1.sources.r1.type = TAILDIR
# 维护了一个json格式的inode
a1.sources.r1.positionFile = /opt/module/flume/data/taildir2_position.json
# 监控的文件组,可以监控多个文件
a1.sources.r1.filegroups = f1
# 文件的具体路径,监控所有含有log的文件。
a1.sources.r1.filegroups.f1 = /opt/module/flume/data/.*log.*
# inode的key值,也就是envent的key值。可以不设置。
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

# Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Sink
a1.sinks.k1.type = logger

# Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

步骤3:开启flume-ng agent

因为sink是logger,所以需要加上-Dflume.root.logger=INFO,console
[[email protected] job]$ flume-ng agent -c ../conf/ -f flume-taildir-logger2.conf -n a1 -Dflume.root.logger=INFO,console
           

步骤4:监控的文件追加内容

[[email protected] data]$ echo "licai" >> group.log 
           

步骤5:查看结果

大数据5_02_Flume入门案例

taildir -memory - hdfs

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
           

继续阅读