文章目錄
-
- 官方文檔
- flume概述
- flume架構
- Sources
-
- netcat(監控一個端口)
- exec(根據指令監控 一般是tail或cat)
- spooldir(監控一個檔案夾)
- taildir(監控多個檔案或者檔案夾 特點是:斷點續傳)
- avro
- Channels
-
- memory(記憶體存儲 速度快 但是不安全)
- file(本地檔案存儲 安全 速度慢)
- Sinks
-
- logger(輸出到控制台)
- hdfs(輸出到hdfs)
- kafka(輸出到kafka)
- filr row(儲存在本地檔案)
- 選擇器副本機制(複制)
- 故障轉移
- 負載均衡
- 自定義攔截器
官方文檔
中文官方文檔
flume概述
Apache Flume 是一個分布式、高可靠、高可用的用來收集、聚合、轉移不同來源的大量日志資料到中央資料倉庫的工具
flume架構
Client:用戶端,資料産生的地方,如Web伺服器
Event:事件,指通過Agent傳輸的單個資料包,如日志資料通常對應一行資料
Agent:代理,一個獨立的JVM程序
Flume以一個或多個Agent部署運作
Agent包含三個元件:Source,Channel,Sink
Agent中的三個元件Source、Channel、Sink
Source表示從哪裡讀
Channel表示存儲方式
Sink表示傳輸到哪裡去
Sources
netcat(監控一個端口)
在
/opt/flume1.6/conf/job
目錄下建立一個conf檔案
netcat-flume-logger.conf
#Name 給三個元件起别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Sources:類型選用netcat 監控的端口為本地的44444端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Sink:類型選為logger直接在控制台輸出友善測試
a1.sinks.k1.type = logger
#Channel:類型選用memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind 定義連接配接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
儲存後啟動agent指令
flume-ng agent
--conf /opt/flume1.6/conf //flume目錄下的conf檔案夾路徑
--conf-file /opt/flume1.6/conf/job/netcat-flume-logger.conf //自己編寫的conf檔案路徑
--name a1 //自己編寫的conf檔案中的名字a1
-Dflume.root.logger=INFO,console //當sink類型為logger時 此參數表示從控制台輸出 其他類型不需要
簡寫形式:
flume-ng agent -c conf -f /opt/flume1.6/conf/job/netcat-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
啟動之後:
新開一個虛拟機視窗
輸入指令進入44444端口
telnet localhost 44444
如果報錯,執行下列指令安裝telnet
yum list telnet* 列出telnet相關的安裝包
yum install telnet-server 安裝telnet服務
yum install telnet.* 安裝telnet用戶端
成功後顯示:
在端口輸入任意字元:
都會在之前開啟的監控視窗中輸出在控制台:
exec(根據指令監控 一般是tail或cat)
在
/opt/flume1.6/conf/job
目錄下建立一個conf檔案
exec-flume-logger.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /data/test.log // 檔案格式任意 一般監控日志檔案
a1.sinks.k1.type = logger //這裡的sink還是選擇控制台輸出
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動agent之後
往
/data/test.log
檔案中傳輸字段
控制台就會顯示
spooldir(監控一個檔案夾)
在
/opt/flume1.6/conf/job
目錄下建立一個conf檔案
spooldir-flume-hdfs.conf
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type = spooldir //source類型為spooldir
a1.sources.r1.spoolDir = /data/test //監控的檔案夾
a1.sources.r1.deserializer = LINE
a1.sources.r1.deserializer.maxLineLength = 600000
a1.sources.r1.includePattern = test__[0-9]{4}-[0-9]{2}-[0-9]{2}.csv //監控符合此正規表達式的檔案
a1.channels.c1.type = file //channel類型選擇file
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test
a1.sinks.s1.type = hdfs //sink類型選擇hdfs 結果輸出到hdfs上
a1.sinks.s1.hdfs.fileType = DataStream
a1.sinks.s1.hdfs.filePrefix = test_ //輸出到hdfs上是加的字首
a1.sinks.s1.hdfs.fileSuffix = .csv //輸出到hdfs上是加的字尾
a1.sinks.s1.hdfs.path = hdfs://192.168.226.101:9000/data/test/%Y-%m-%d //hdfs路徑
a1.sinks.s1.hdfs.useLocalTimeStamp = true //是否使用本地時間戳
a1.sinks.s1.hdfs.batchSize = 640
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.rollSize = 64000000
a1.sinks.s1.hdfs.rollInterval = 30
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
taildir(監控多個檔案或者檔案夾 特點是:斷點續傳)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TATLDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /data/test1.log
a1.sources.r1.filegroups.f2 = /data/test2.txt
#存儲inode資訊 實作斷點續傳 不配置會傳入預設檔案夾
a1.sources.r1.positionFile = /opt/position/position.json
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
avro
avro的source和sink可以參考下面的負載均衡和故障轉移
Channels
memory(記憶體存儲 速度快 但是不安全)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /data/test.log
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
file(本地檔案存儲 安全 速度慢)
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/kb07Flume/flumeFile/test
a1.sources.s1.deserializer = LINE
a1.sources.s1.deserializer.maxLineLength = 60000
a1.sources.s1.includePattern = test_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.batchSize = 640
a1.sinks.k1.brokerList = 192.168.226.101:9092
a1.sinks.k1.topic = test
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Sinks
logger(輸出到控制台)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /data/test.log
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
hdfs(輸出到hdfs)
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type = spooldir //source類型為spooldir
a1.sources.r1.spoolDir = /data/test //監控的檔案夾
a1.sources.r1.deserializer = LINE
a1.sources.r1.deserializer.maxLineLength = 600000
a1.sources.r1.includePattern = test__[0-9]{4}-[0-9]{2}-[0-9]{2}.csv //監控符合此正規表達式的檔案
a1.channels.c1.type = file //channel類型選擇file
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test
a1.sinks.s1.type = hdfs //sink類型選擇hdfs 結果輸出到hdfs上
a1.sinks.s1.hdfs.fileType = DataStream
a1.sinks.s1.hdfs.filePrefix = test_ //輸出到hdfs上是加的字首
a1.sinks.s1.hdfs.fileSuffix = .csv //輸出到hdfs上是加的字尾
a1.sinks.s1.hdfs.path = hdfs://192.168.226.101:9000/data/test/%Y-%m-%d //hdfs路徑
a1.sinks.s1.hdfs.useLocalTimeStamp = true //是否使用本地時間戳
a1.sinks.s1.hdfs.batchSize = 640
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.rollSize = 64000000
a1.sinks.s1.hdfs.rollInterval = 30
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
kafka(輸出到kafka)
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/kb07Flume/flumeFile/test
a1.sources.s1.deserializer = LINE
a1.sources.s1.deserializer.maxLineLength = 60000
a1.sources.s1.includePattern = test_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.batchSize = 640
a1.sinks.k1.brokerList = 192.168.226.101:9092
a1.sinks.k1.topic = test
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
filr row(儲存在本地檔案)
可以參考下面的 複制
選擇器副本機制(複制)
模式為一份輸入,多份輸出
需要建立兩個channel 兩個sink 一份輸出到hdfs 一分輸出儲存到本地檔案
建立flume1.conf
#Name 複制
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#Source
a1.sources.r1.type = TATLDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/test1.log
#存儲inode資訊 實作斷點續傳 不配置會傳入預設檔案夾
a1.sources.r1.positionFile = /opt/position/position1.json
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop1
a1.sinks.k2.port = 4142
#Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
建立flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop1
a2.sources.r1.port = 4141
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.filePrefix = test_
a2.sinks.k1.hdfs.fileSuffix = .csv
a2.sinks.k1.hdfs.path = hdfs://192.168.226.101:9000/data/test/%Y-%m-%d
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 640
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.rollSize = 64000000
a2.sinks.k1.hdfs.rollInterval = 30
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
建立flume3.conf
a3.sources = r1
a3.channels = c1
a3.sinks = k1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop1
a3.sources.r1.port = 4142
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#Sink:file_row儲存到本地檔案 需要先建立目錄
a3.sinks.k1.type = file_row
a3.sinks.k1.sink.directory = /data/file_row
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
故障轉移
模式為一個channel對兩個sinks,優先使用優先級高的 k2挂掉 自動使用k1
建立三個flume.conf
flume1.conf
#故障轉移
#Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
#Sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop1
a1.sinks.k2.port = 4142
#Sink Group
a1.sinkgroups.g1.sink = k1 k2
#故障轉移的政策
a1.sinkgroups.g1.processor.type = failover
#優先級
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume2.conf
#Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1
#Sources
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop1
a2.sources.r1.port = 4141
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1
#Sources
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop1
a3.sources.r1.port = 4142
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
負載均衡
模式為一個source輸入 将接受的資訊平均分發到多個sink上 不會造成資料傾斜
先建立三個flume.conf
建立flume1.conf
#負載均衡
#Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
#Sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop1
a1.sinks.k2.port = 4142
#Sink Group
a1.sinkgroups.g1.sink = k1 k2
#負載均衡的政策
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
建立flume2.conf
#Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1
#Sources
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop1
a2.sources.r1.port = 4141
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
建立flume3.conf
#Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1
#Sources
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop1
a3.sources.r1.port = 4142
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
自定義攔截器
在idea中編寫自定義攔截器并打包上傳到
/opt/flume1.6/lib
導入maven依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
package cn.kgc.kb07.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author WGY
* 攔截器
*/
public class InterceptorDemo implements Interceptor {
//聲明一個存放時間的集合
private List<Event> addHeaderEvents;
@Override
public void initialize() {
//初始化
addHeaderEvents = new ArrayList<>();
}
//對單個事件處理
@Override
public Event intercept(Event event) {
//擷取事件中的頭資訊
Map<String, String> headers = event.getHeaders();
//擷取事件中的body資訊
String body = new String(event.getBody());
//根據body中是否含有“hello”來決定添加怎樣的頭資訊
if(body.contains("hello")){
//添加頭資訊
headers.put("type","1");
}else{
//添加頭資訊
headers.put("type","2");
}
return null;
}
//對批量事件處理
@Override
public List<Event> intercept(List<Event> list) {
//1、清空集合
addHeaderEvents.clear();
//2、周遊list,
for (Event event : list) {
//3、給每一個事件添加頭資訊
addHeaderEvents.add(intercept(event));
}
//傳回結果
return addHeaderEvents;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new InterceptorDemo();
}
@Override
public void configure(Context context) {
}
}
}
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1 //自定義攔截器
a1.sources.r1.interceptors.i1.type = cn.kgc.kb07.flume.InterceptorDemo$Builder
a1.sources.r1.selector.type = multiplexing //定義選擇器
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.gree = c1
a1.sources.r1.selector.mapping.lijia = c2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = gree
a1.sinks.k1.hdfs.fileSuffix = .csv
a1.sinks.k1.hdfs.path = hdfs://192.168.226.101:9000/data/greedemo/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 640
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 100
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.filePrefix = lijia
a1.sinks.k2.hdfs.fileSuffix = .csv
a1.sinks.k2.hdfs.path = hdfs://192.168.226.101:9000/data/lijiademo/%Y-%m-%d
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.batchSize = 640
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollSize = 100
a1.sinks.k2.hdfs.rollInterval = 3
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2