天天看点

07 Flume

1 Flume概述

1.1 定义

● Flume是Cloudera提供的一个高可用,高可靠,分布式的海量日志采集、聚合、传输的框架。

● 主要作用:实时读取服务器本地磁盘的数据,将数据写入到HDFS。 数据(服务器)——> HDFS

1.2 基础架构

Flume基础构架图:

07 Flume

(1) Agent

● 是flume的部署单元

● 是一个JVM进程, 源头—数据(事件)—>目的

● 组成:Source、Channel、Sink。

(2) Source

● 接收数据给Flume Agent组件

● 处理日志数据(各种类型、格式)

● 包括:avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。

(3) Sink

● 不断轮询Channel中的事件,批量移除。批量写入到存储、索引系统,或被发送到另一个Flume Agent。

● 组件类型:hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

(4) Channel

● 位于Source和Sink之间的缓冲区

● 允许Source和Sink运作在不同的速率上

● 线程安全,可以支持多个Source的写入和多个Sink的读取。

● 类型:

  • Memory Channel:内存中的队列。(快,不安全)
  • File Channel:将事件写入磁盘。(慢,安全)

(5) Event

● 传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。

● 组成

  • Header:存放event的属性,k-v结构
  • Body:存放数据,字节数组

2 Flume入门

2.1 Flume安装部署

上传——解压——改名flume——删除jguava-11.0.2.jar(处理兼容问题)

2.2 Flume入门案例

2.2.1 案例1(监控端口数据)

1)案例需求

● 使用Flume监听一个端口,收集该端口数据,并打印到控制台。   

● 案例图 

07 Flume

2)需求分析

07 Flume

3)实现步骤

● 安装netcat工具 —— 判断端口情况 —— 创建job文件夹 —— 在job/simpleCase文件下,创建Flume Agent配置文件flume-1-netcat-logger.conf

查看代码

# Name the components on this agent
a1.sources = r1                                      # 为a1的Source组件命名为r1,多个组件用空格间隔
a1.sinks = k1                                        # 为a1的Sink组件命名为k1,多个组件用空格间隔
a1.channels = c1                                     # 为a1的Channel组件命名为c1,多个组件用空格间隔

# Describe/configure the source
a1.sources.r1.type = netcat                          # 配置r1的类型
a1.sources.r1.bind = localhost                       # 配置r1的绑定地址(注意localhost和hadoop102的区别)
a1.sources.r1.port = 44444                           # 配置r1的监听端口

# Describe the sink
a1.sinks.k1.type = logger                            # 配置k1的类型为logger

# Use a channel which buffers events in memory    
a1.channels.c1.type = memory                         # 配置c1的类型为memory
a1.channels.c1.capacity = 1000                       # 配置c1的容量为1000个事件
a1.channels.c1.transactionCapacity = 100             # 配置c1的事务容量为100个事件

# Bind the source and sink to the channel
a1.sources.r1.channels = c1                          # 配置r1的channel属性,指定r1连接到那个channel
a1.sinks.k1.channel = c1                             # 配置k1的channel属性,指定k1连接到那个channel           

● 开启flume监听端口 & netcat的使用

写法 指令
方法1 $ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
方法2 $ bin/flume-ng agent -c conf/ -n a1 -f job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
使用netcat工具向本机的44444端口发送内容 $ nc localhost 44444     //  nc IP 端口号

参数说明

参数 说明
--conf / -c 表示配置文件存储在conf/目录
--name / -a 表示给agent起名为a1
--conf-file / -f 指定读取的配置文件是在job/simpleCase文件夹下的flume-1-1netcat-logger.conf文件     
07 Flume
-Dflume.root.logger=INFO,console -D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。 

2.2.2 案例2(实时监控目录下的多个追加文件)

配置
Exec source 适用于监控一个实时追加的文件,不能实现断点续传
Spooldir Source 适用于同步新文件,但不适合对实时追加日志的文件进行监听并同步
Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传

3 Flume进阶

3.1 Flume事务

07 Flume

● Put事务和Take事务

事务
Put doPut — 将批数据先写入临时缓冲区putList
doCommit — 检查channel内存队列是否足够合并
doRollback — channel内存队列空间不足,回滚数据
如果出现error,会删除putList中的数据,将error返回给Source后,再重新推送事务
Take doTake — 将数据取到临时缓冲区takeList,并将数据放送到HDFS
doCommit — 如果数据全部发送成功,则清楚临时缓冲区takeList
数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

3.2 Flume Agent内部原理

07 Flume

重要组件

组件
ChannelSelector 作用:选出Event将要被发往哪个Channel
两种类型:Replicating(复制)和 Multiplexing(多路复用)
Replicating Channel Selector:会将同一个Event发往所有的Channel。
Mulitiplexing Channel Selector:会根据响应的原则,将不同的Event发往不同的Channel。(分类)
SinkProcessor 三种类型:长度
DefaultSinkProcessor:对应的是单个Sink。
LoadBalancingSinkProcessor:对应的是Sink Group,可以实现负载均衡的功能。
FailoverSinkProcessor:对应的是Sink group,可以实现恢复的功能。(故障转移)