天天看點

Flume(NG)架構設計要點及配置實踐

flume ng是一個分布式、可靠、可用的系統,它能夠将不同資料源的海量日志資料進行高效收集、聚合、移動,最後存儲到一個中心化資料存儲系統中。由原來的flume og到現在的flume ng,進行了架構重構,并且現在ng版本完全不相容原來的og版本。經過架構重構後,flume ng更像是一個輕量的小工具,非常簡單,容易适應各種方式日志收集,并支援failover和負載均衡。

架構設計要點

flume的架構主要有一下幾個核心概念:

event:一個資料單元,帶有一個可選的消息頭

flow:event從源點到達目的點的遷移的抽象

client:操作位于源點處的event,将其發送到flume agent

agent:一個獨立的flume程序,包含元件source、channel、sink

source:用來消費傳遞到該元件的event

channel:中轉event的一個臨時存儲,儲存有source元件傳遞過來的event

sink:從channel中讀取并移除event,将event傳遞到flow pipeline中的下一個agent(如果有的話)

flume ng架構,如圖所示:

Flume(NG)架構設計要點及配置實踐

外部系統産生日志,直接通過flume的agent的source元件将事件(如日志行)發送到中間臨時的channel元件,最後傳遞給sink元件,hdfs sink元件可以直接把資料存儲到hdfs叢集上。

一個最基本flow的配置,格式如下:

<code>01</code>

<code># list the sources, sinks and channels for the agent</code>

<code>02</code>

<code>&lt;agent&gt;.sources = &lt;source1&gt; &lt;source2&gt;</code>

<code>03</code>

<code>&lt;agent&gt;.sinks = &lt;sink1&gt; &lt;sink2&gt;</code>

<code>04</code>

<code>&lt;agent&gt;.channels = &lt;channel1&gt; &lt;channel2&gt;</code>

<code>05</code>

<code>06</code>

<code># set channel for source</code>

<code>07</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.channels = &lt;channel1&gt; &lt;channel2&gt; ...</code>

<code>08</code>

<code>&lt;agent&gt;.sources.&lt;source2&gt;.channels = &lt;channel1&gt; &lt;channel2&gt; ...</code>

<code>09</code>

<code>10</code>

<code># set channel for sink</code>

<code>11</code>

<code>&lt;agent&gt;.sinks.&lt;sink1&gt;.channel = &lt;channel1&gt;</code>

<code>12</code>

<code>&lt;agent&gt;.sinks.&lt;sink2&gt;.channel = &lt;channel2&gt;</code>

尖括号裡面的,我們可以根據實際需求或業務來修改名稱。下面詳細說明:

表示配置一個agent的名稱,一個agent肯定有一個名稱。與是agent的source元件的名稱,消費傳遞過來的event。與是agent的channel元件的名稱。與是agent的sink元件的名稱,從channel中消費(移除)event。

上面配置内容中,第一組中配置source、sink、channel,它們的值可以有1個或者多個;第二組中配置source将把資料存儲(put)到哪一個channel中,可以存儲到1個或多個channel中,同一個source将資料存儲到多個channel中,實際上是replication;第三組中配置sink從哪一個channel中取(task)資料,一個sink隻能從一個channel中取資料。

下面,根據官網文檔,我們展示幾種flow pipeline,各自适應于什麼樣的應用場景:

多個agent順序連接配接

Flume(NG)架構設計要點及配置實踐

可以将多個agent順序連接配接起來,将最初的資料源經過收集,存儲到最終的存儲系統中。這是最簡單的情況,一般情況下,應該控制這種順序連接配接的agent的數量,因為資料流經的路徑變長了,如果不考慮failover的話,出現故障将影響整個flow上的agent收集服務。

多個agent的資料彙聚到同一個agent

Flume(NG)架構設計要點及配置實踐

這種情況應用的場景比較多,比如要收集web網站的使用者行為日志,web網站為了可用性使用的負載均衡的叢集模式,每個節點都産生使用者行為日志,可以為每個節點都配置一個agent來單獨收集日志資料,然後多個agent将資料最終彙聚到一個用來存儲資料存儲系統,如hdfs上。

多路(multiplexing)agent

Flume(NG)架構設計要點及配置實踐

這種模式,有兩種方式,一種是用來複制(replication),另一種是用來分流(multiplexing)。replication方式,可以将最前端的資料源複制多份,分别傳遞到多個channel中,每個channel接收到的資料都是相同的,配置格式,如下所示:

<code>&lt;agent&gt;.sources = &lt;source1&gt;</code>

<code># set list of channels for source (separated by space)</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.channels = &lt;channel1&gt; &lt;channel2&gt;</code>

<code># set channel for sinks</code>

<code>13</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.selector.type = replicating</code>

上面指定了selector的type的值為replication,其他的配置沒有指定,使用的replication方式,source1會将資料分别存儲到channel1和channel2,這兩個channel裡面存儲的資料是相同的,然後資料被傳遞到sink1和sink2。

multiplexing方式,selector可以根據header的值來确定資料傳遞到哪一個channel,配置格式,如下所示:

<code>1</code>

<code># mapping for multiplexing selector</code>

<code>2</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.selector.type = multiplexing</code>

<code>3</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.selector.header = &lt;someheader&gt;</code>

<code>4</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.selector.mapping.&lt;value1&gt; = &lt;channel1&gt;</code>

<code>5</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.selector.mapping.&lt;value2&gt; = &lt;channel1&gt; &lt;channel2&gt;</code>

<code>6</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.selector.mapping.&lt;value3&gt; = &lt;channel2&gt;</code>

<code>7</code>

<code>#...</code>

<code>8</code>

<code>9</code>

<code>&lt;agent&gt;.sources.&lt;source1&gt;.selector.default = &lt;channel2&gt;</code>

上面selector的type的值為multiplexing,同時配置selector的header資訊,還配置了多個selector的mapping的值,即header的值:如果header的值為value1、value2,資料從source1路由到channel1;如果header的值為value2、value3,資料從source1路由到channel2。

Flume(NG)架構設計要點及配置實踐

實作load balance功能

Flume(NG)架構設計要點及配置實踐

load balancing sink processor能夠實作load balance功能,上圖agent1是一個路由節點,負責将channel暫存的event均衡到對應的多個sink元件上,而每個sink元件分别連接配接到一個獨立的agent上,示例配置,如下所示:

<code>a1.sinkgroups = g1</code>

<code>a1.sinkgroups.g1.sinks = k1 k2 k3</code>

<code>a1.sinkgroups.g1.processor.type = load_balance</code>

<code>a1.sinkgroups.g1.processor.backoff = true</code>

<code>a1.sinkgroups.g1.processor.selector = round_robin</code>

<code>a1.sinkgroups.g1.processor.selector.maxtimeout=10000</code>

實作failover能

failover sink processor能夠實作failover功能,具體流程類似load balance,但是内部處理機制與load balance完全不同:failover sink processor維護一個優先級sink元件清單,隻要有一個sink元件可用,event就被傳遞到下一個元件。如果一個sink能夠成功處理event,則會加入到一個pool中,否則會被移出pool并計算失敗次數,設定一個懲罰因子,示例配置如下所示:

<code>a1.sinkgroups.g1.processor.type = failover</code>

<code>a1.sinkgroups.g1.processor.priority.k1 = 5</code>

<code>a1.sinkgroups.g1.processor.priority.k2 = 7</code>

<code>a1.sinkgroups.g1.processor.priority.k3 = 6</code>

<code>a1.sinkgroups.g1.processor.maxpenalty = 20000</code>

基本功能

我們看一下,flume ng都支援哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能夠讓我們在應用中更好地選擇使用哪一種方案。說明flume ng的功能,實際還是圍繞着agent的三個元件source、channel、sink來看它能夠支援哪些技術或協定。我們不再對各個元件支援的協定詳細配置進行說明,通過清單的方式分别對三個元件進行概要說明:

flume source

<b>source類型</b>

<b>說明</b>

avro source

支援avro協定(實際上是avro rpc),内置支援

thrift source

支援thrift協定,内置支援

exec source

基于unix的command在标準輸出上生産資料

jms source

從jms系統(消息、主題)中讀取資料,activemq已經測試過

spooling directory source

監控指定目錄内資料變更

twitter 1% firehose source

通過api持續下載下傳twitter資料,試驗性質

netcat source

監控某個端口,将流經端口的每一個文本行資料作為event輸入

sequence generator source

序列生成器資料源,生産序列資料

syslog sources

讀取syslog資料,産生event,支援udp和tcp兩種協定

http source

基于http post或get方式的資料源,支援json、blob表示形式

legacy sources

相容老的flume og中source(0.9.x版本)

flume channel

<b>channel類型</b>

memory channel

event資料存儲在記憶體中

jdbc channel

event資料存儲在持久化存儲中,目前flume channel内置支援derby

file channel

event資料存儲在磁盤檔案中

spillable memory channel

event資料存儲在記憶體中和磁盤上,當記憶體隊列滿了,會持久化到磁盤檔案(目前試驗性的,不建議生産環境使用)

pseudo transaction channel

測試用途

custom channel

自定義channel實作

flume sink

<b>sink類型</b>

hdfs sink

資料寫入hdfs

logger sink

資料寫入日志檔案

avro sink

資料被轉換成avro event,然後發送到配置的rpc端口上

thrift sink

資料被轉換成thrift event,然後發送到配置的rpc端口上

irc sink

資料在irc上進行回放

file roll sink

存儲資料到本地檔案系統

null sink

丢棄到所有資料

hbase sink

資料寫入hbase資料庫

morphline solr sink

資料發送到solr搜尋伺服器(叢集)

elasticsearch sink

資料發送到elastic search搜尋伺服器(叢集)

kite dataset sink

寫資料到kite dataset,試驗性質的

custom sink

自定義sink實作

另外還有channel selector、sink processor、event serializer、interceptor等元件,可以參考官網提供的使用者手冊。

應用實踐

安裝flume ng非常簡單,我們使用最新的1.5.0.1版本,執行如下指令:

<code>cd /usr/local</code>

<code>tar xvzf apache-flume-1.5.0.1-bin.tar.gz</code>

<code>cd apache-flume-1.5.0.1-bin</code>

如果需要使用到hadoop叢集,保證hadoop相關的環境變量都已經正确配置,并且hadoop叢集可用。下面,通過一些實際的配置執行個體,來了解flume的使用。為了簡單期間,channel我們使用memory類型的channel。

avro source+memory channel+logger sink

使用apache-flume-1.5.0.1自帶的例子,使用avro source接收外部資料源,logger作為sink,即通過avro rpc調用,将資料緩存在channel中,然後通過logger列印出調用發送的資料。

配置agent,修改配置檔案conf/flume-conf.properties,内容如下:

<code># define a memory channel called ch1 on agent1</code>

<code>agent1.channels.ch1.type = memory</code>

<code># define an avro source called avro-source1 on agent1 and tell it</code>

<code># to bind to 0.0.0.0:41414. connect it to channel ch1.</code>

<code>agent1.sources.avro-source1.channels = ch1</code>

<code>agent1.sources.avro-source1.type = avro</code>

<code>agent1.sources.avro-source1.bind = 0.0.0.0</code>

<code>agent1.sources.avro-source1.port = 41414</code>

<code># define a logger sink that simply logs all events it receives</code>

<code># and connect it to the other end of the same channel.</code>

<code>agent1.sinks.log-sink1.channel = ch1</code>

<code>14</code>

<code>agent1.sinks.log-sink1.type = logger</code>

<code>15</code>

<code>16</code>

<code># finally, now that we've defined all of our components, tell</code>

<code>17</code>

<code># agent1 which ones we want to activate.</code>

<code>18</code>

<code>agent1.channels = ch1</code>

<code>19</code>

<code>agent1.channels.ch1.capacity = 1000</code>

<code>20</code>

<code>agent1.sources = avro-source1</code>

<code>21</code>

<code>agent1.sinks = log-sink1</code>

首先,啟動agent程序:

<code>bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -dflume.root.logger=debug,console -n agent1</code>

然後,啟動avro client,發送資料:

<code>bin/flume-ng avro-client -c ./conf/ -h 0.0.0.0 -p 41414 -f /usr/</code><code>local</code><code>/programs/logs/</code><code>sync</code><code>.log -dflume.root.logger=debug,console</code>

avro source+memory channel+hdfs sink

配置agent,修改配置檔案conf/flume-conf-hdfs.properties,内容如下:

<code># define a source, channel, sink</code>

<code>agent1.sinks = hdfs-sink</code>

<code># configure channel</code>

<code>agent1.channels.ch1.capacity = 1000000</code>

<code>agent1.channels.ch1.transactioncapacity = 500000</code>

<code>agent1.sinks.hdfs-sink1.channel = ch1</code>

<code>agent1.sinks.hdfs-sink1.type = hdfs</code>

<code>22</code>

<code>23</code>

<code>agent1.sinks.hdfs-sink1.hdfs.fileprefix = sync_file</code>

<code>24</code>

<code>agent1.sinks.hdfs-sink1.hdfs.filesuffix = .log</code>

<code>25</code>

<code>agent1.sinks.hdfs-sink1.hdfs.rollsize = 1048576</code>

<code>26</code>

<code>agent1.sinks.hdfs-sink1.rollinterval = 0</code>

<code>27</code>

<code>agent1.sinks.hdfs-sink1.hdfs.rollcount = 0</code>

<code>28</code>

<code>agent1.sinks.hdfs-sink1.hdfs.batchsize = 1500</code>

<code>29</code>

<code>agent1.sinks.hdfs-sink1.hdfs.round = true</code>

<code>30</code>

<code>agent1.sinks.hdfs-sink1.hdfs.roundunit = minute</code>

<code>31</code>

<code>agent1.sinks.hdfs-sink1.hdfs.threadspoolsize = 25</code>

<code>32</code>

<code>agent1.sinks.hdfs-sink1.hdfs.uselocaltimestamp = true</code>

<code>33</code>

<code>agent1.sinks.hdfs-sink1.hdfs.minblockreplicas = 1</code>

<code>34</code>

<code>agent1.sinks.hdfs-sink1.filetype = sequencefile</code>

<code>35</code>

<code>agent1.sinks.hdfs-sink1.writeformat = text</code>

首先,啟動agent:

<code>bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -dflume.root.logger=info,console -n agent1</code>

可以檢視同步到hdfs上的資料:

<code>hdfs dfs -</code><code>ls</code> <code>/data/flume</code>

結果示例,如下所示:

<code>-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log</code>

spooling directory source+memory channel+hdfs sink

配置agent,修改配置檔案flume-conf-spool.properties,内容如下:

<code># define source, channel, sink</code>

<code>agent1.sources = spool-source1</code>

<code>agent1.sinks = hdfs-sink1</code>

<code># define and configure an spool directory source</code>

<code>agent1.sources.spool-source1.channels = ch1</code>

<code>agent1.sources.spool-source1.type = spooldir</code>

<code>agent1.sources.spool-source1.spooldir = /home/shirdrn/data/</code>

<code>agent1.sources.spool-source1.ignorepattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.completed)?</code>

<code>agent1.sources.spool-source1.batchsize = 50</code>

<code>agent1.sources.spool-source1.inputcharset = utf-8</code>

<code># define and configure a hdfs sink</code>

<code>agent1.sinks.hdfs-sink1.hdfs.fileprefix = event_%y-%m-%d_%h_%m_%s</code>

啟動agent程序,執行如下指令:

<code>bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -dflume.root.logger=info,console -n agent1</code>

可以檢視hdfs上同步過來的資料:

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log</code>

<code>-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log</code>

exec source+memory channel+file roll sink

配置agent,修改配置檔案flume-conf-file.properties,内容如下:

<code>agent1.sources = tail-source1</code>

<code>agent1.sinks = file-sink1</code>

<code># define and configure an exec source</code>

<code>agent1.sources.tail-source1.channels = ch1</code>

<code>agent1.sources.tail-source1.type = exec</code>

<code>agent1.sources.tail-source1.command = tail -f /home/shirdrn/data/event.log</code>

<code>agent1.sources.tail-source1.shell = /bin/sh -c</code>

<code>agent1.sources.tail-source1.batchsize = 50</code>

<code># define and configure a file roll sink</code>

<code>agent1.sinks.file-sink1.channel = ch1</code>

<code>agent1.sinks.file-sink1.type = file_roll</code>

<code>agent1.sinks.file-sink1.batchsize = 100</code>

<code>agent1.sinks.file-sink1.serializer = text</code>

<code>agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data</code>

<code>bin/flume-ng agent -c ./conf/ -f conf/flume-conf-</code><code>file</code><code>.properties -dflume.root.logger=info,console -n agent1</code>

可以檢視file roll sink對應的本地檔案系統目錄/home/shirdrn/sink_data下,示例如下所示:

<code>-rw-rw-r-- 1 shirdrn shirdrn 13944825 sep 17 11:36 1410924990039-1</code>

<code>-rw-rw-r-- 1 shirdrn shirdrn 11288870 sep 17 11:37 1410924990039-2</code>

<code>-rw-rw-r-- 1 shirdrn shirdrn 0 sep 17 11:37 1410924990039-3</code>

<code>-rw-rw-r-- 1 shirdrn shirdrn 20517500 sep 17 11:38 1410924990039-4</code>

<code>-rw-rw-r-- 1 shirdrn shirdrn 16343250 sep 17 11:38 1410924990039-5</code>

有關flume ng更多配置及其說明,請參考官方使用者手冊,非常詳細。

繼續閱讀