天天看點

07 Flume

1 Flume概述

1.1 定義

● Flume是Cloudera提供的一個高可用,高可靠,分布式的海量日志采集、聚合、傳輸的架構。

● 主要作用:實時讀取伺服器本地磁盤的資料,将資料寫入到HDFS。 資料(伺服器)——> HDFS

1.2 基礎架構

Flume基礎構架圖:

07 Flume

(1) Agent

● 是flume的部署單元

● 是一個JVM程序, 源頭—資料(事件)—>目的

● 組成:Source、Channel、Sink。

(2) Source

● 接收資料給Flume Agent元件

● 處理日志資料(各種類型、格式)

● 包括:avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。

(3) Sink

● 不斷輪詢Channel中的事件,批量移除。批量寫入到存儲、索引系統,或被發送到另一個Flume Agent。

● 元件類型:hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定義。

(4) Channel

● 位于Source和Sink之間的緩沖區

● 允許Source和Sink運作在不同的速率上

● 線程安全,可以支援多個Source的寫入和多個Sink的讀取。

● 類型:

  • Memory Channel:記憶體中的隊列。(快,不安全)
  • File Channel:将事件寫入磁盤。(慢,安全)

(5) Event

● 傳輸單元,Flume資料傳輸的基本單元,以Event的形式将資料從源頭送至目的地。

● 組成

  • Header:存放event的屬性,k-v結構
  • Body:存放資料,位元組數組

2 Flume入門

2.1 Flume安裝部署

上傳——解壓——改名flume——删除jguava-11.0.2.jar(處理相容問題)

2.2 Flume入門案例

2.2.1 案例1(監控端口資料)

1)案例需求

● 使用Flume監聽一個端口,收集該端口資料,并列印到控制台。   

● 案例圖 

07 Flume

2)需求分析

07 Flume

3)實作步驟

● 安裝netcat工具 —— 判斷端口情況 —— 建立job檔案夾 —— 在job/simpleCase檔案下,建立Flume Agent配置檔案flume-1-netcat-logger.conf

檢視代碼

# Name the components on this agent
a1.sources = r1                                      # 為a1的Source元件命名為r1,多個元件用空格間隔
a1.sinks = k1                                        # 為a1的Sink元件命名為k1,多個元件用空格間隔
a1.channels = c1                                     # 為a1的Channel元件命名為c1,多個元件用空格間隔

# Describe/configure the source
a1.sources.r1.type = netcat                          # 配置r1的類型
a1.sources.r1.bind = localhost                       # 配置r1的綁定位址(注意localhost和hadoop102的差別)
a1.sources.r1.port = 44444                           # 配置r1的監聽端口

# Describe the sink
a1.sinks.k1.type = logger                            # 配置k1的類型為logger

# Use a channel which buffers events in memory    
a1.channels.c1.type = memory                         # 配置c1的類型為memory
a1.channels.c1.capacity = 1000                       # 配置c1的容量為1000個事件
a1.channels.c1.transactionCapacity = 100             # 配置c1的事務容量為100個事件

# Bind the source and sink to the channel
a1.sources.r1.channels = c1                          # 配置r1的channel屬性,指定r1連接配接到那個channel
a1.sinks.k1.channel = c1                             # 配置k1的channel屬性,指定k1連接配接到那個channel           

● 開啟flume監聽端口 & netcat的使用

寫法 指令
方法1 $ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
方法2 $ bin/flume-ng agent -c conf/ -n a1 -f job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
使用netcat工具向本機的44444端口發送内容 $ nc localhost 44444     //  nc IP 端口号

參數說明

參數 說明
--conf / -c 表示配置檔案存儲在conf/目錄
--name / -a 表示給agent起名為a1
--conf-file / -f 指定讀取的配置檔案是在job/simpleCase檔案夾下的flume-1-1netcat-logger.conf檔案     
07 Flume
-Dflume.root.logger=INFO,console -D表示flume運作時動态修改flume.root.logger參數屬性值,并将控制台日志列印級别設定為INFO級别。日志級别包括:log、info、warn、error。 

2.2.2 案例2(實時監控目錄下的多個追加檔案)

配置
Exec source 适用于監控一個實時追加的檔案,不能實作斷點續傳
Spooldir Source 适用于同步新檔案,但不适合對實時追加日志的檔案進行監聽并同步
Taildir Source 适合用于監聽多個實時追加的檔案,并且能夠實作斷點續傳

3 Flume進階

3.1 Flume事務

07 Flume

● Put事務和Take事務

事務
Put doPut — 将批資料先寫入臨時緩沖區putList
doCommit — 檢查channel記憶體隊列是否足夠合并
doRollback — channel記憶體隊列空間不足,復原資料
如果出現error,會删除putList中的資料,将error傳回給Source後,再重新推送事務
Take doTake — 将資料取到臨時緩沖區takeList,并将資料放送到HDFS
doCommit — 如果資料全部發送成功,則清楚臨時緩沖區takeList
資料發送過程中如果出現異常,rollback将臨時緩沖區takeList中的資料歸還給channel記憶體隊列

3.2 Flume Agent内部原理

07 Flume

重要元件

元件
ChannelSelector 作用:選出Event将要被發往哪個Channel
兩種類型:Replicating(複制)和 Multiplexing(多路複用)
Replicating Channel Selector:會将同一個Event發往所有的Channel。
Mulitiplexing Channel Selector:會根據響應的原則,将不同的Event發往不同的Channel。(分類)
SinkProcessor 三種類型:長度
DefaultSinkProcessor:對應的是單個Sink。
LoadBalancingSinkProcessor:對應的是Sink Group,可以實作負載均衡的功能。
FailoverSinkProcessor:對應的是Sink group,可以實作恢複的功能。(故障轉移)