天天看点

flume分布式日志收集测试

官方参考文档

<a href="https://flume.apache.org/FlumeUserGuide.html#file-channel" target="_blank">https://flume.apache.org/FlumeUserGuide.html#file-channel</a>

Flume NG是一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。

架构设计要点

Flume的架构主要有一下几个核心概念:

Event:一个数据单元,带有一个可选的消息头

Flow:Event从源点到达目的点的迁移的抽象

Client:操作位于源点处的Event,将其发送到Flume Agent

Agent:一个独立的Flume进程,包含组件Source、Channel、Sink

Source:用来消费传递到该组件的Event

Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event

Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

Flume NG架构,如图所示:

外部系统产生日志,直接通过Flume的Agent的Source组件将事件(如日志行)发送到中间临时的channel组件,最后传递给Sink组件,HDFS Sink组件可以直接把数据存储到HDFS集群上。

一个最基本Flow的配置,格式如下:

<code># list the sources, sinks and channels for the agent</code>

<code>&lt;Agent&gt;.sources = &lt;Source1&gt; &lt;Source2&gt;</code>

<code>&lt;Agent&gt;.sinks = &lt;Sink1&gt; &lt;Sink2&gt;</code>

<code>&lt;Agent&gt;.channels = &lt;Channel1&gt; &lt;Channel2&gt;</code>

<code># set channel for source</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.channels = &lt;Channel1&gt; &lt;Channel2&gt; ...</code>

<code>&lt;Agent&gt;.sources.&lt;Source2&gt;.channels = &lt;Channel1&gt; &lt;Channel2&gt; ...</code>

<code># set channel for sink</code>

<code>&lt;Agent&gt;.sinks.&lt;Sink1&gt;.channel = &lt;Channel1&gt;</code>

<code>&lt;Agent&gt;.sinks.&lt;Sink2&gt;.channel = &lt;Channel2&gt;</code>

尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:

表示配置一个Agent的名称,一个Agent肯定有一个名称。与是Agent的Source组件的名称,消费传递过来的Event。与是Agent的Channel组件的名称。与是Agent的Sink组件的名称,从Channel中消费(移除)Event。

上面配置内容中,第一组中配置Source、Sink、Channel,它们的值可以有1个或者多个;第二组中配置Source将把数据存储(Put)到哪一个Channel中,可以存储到1个或多个Channel中,同一个Source将数据存储到多个Channel中,实际上是Replication;第三组中配置Sink从哪一个Channel中取(Task)数据,一个Sink只能从一个Channel中取数据。

下面,根据官网文档,我们展示几种Flow Pipeline,各自适应于什么样的应用场景:

多个Agent顺序连接

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

多个Agent的数据汇聚到同一个Agent

这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

多路(Multiplexing)Agent

这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:

<code># List the sources, sinks and channels for the agent</code>

<code>&lt;Agent&gt;.sources = &lt;Source1&gt;</code>

<code># set list of channels for source (separated by space)</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.channels = &lt;Channel1&gt; &lt;Channel2&gt;</code>

<code># set channel for sinks</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.selector.</code><code>type</code> <code>= replicating</code>

上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。

Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:

<code># Mapping for multiplexing selector</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.selector.</code><code>type</code> <code>= multiplexing</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.selector.header = &lt;someHeader&gt;</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.selector.mapping.&lt;Value1&gt; = &lt;Channel1&gt;</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.selector.mapping.&lt;Value2&gt; = &lt;Channel1&gt; &lt;Channel2&gt;</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.selector.mapping.&lt;Value3&gt; = &lt;Channel2&gt;</code>

<code>#...</code>

<code>&lt;Agent&gt;.sources.&lt;Source1&gt;.selector.default = &lt;Channel2&gt;</code>

上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。

实现load balance功能

Load balancing Sink Processor能够实现load balance功能,上图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:

<code>a1.sinkgroups = g1</code>

<code>a1.sinkgroups.g1.sinks = k1 k2 k3</code>

<code>a1.sinkgroups.g1.processor.</code><code>type</code> <code>= load_balance</code>

<code>a1.sinkgroups.g1.processor.backoff = </code><code>true</code>

<code>a1.sinkgroups.g1.processor.selector = round_robin</code>

<code>a1.sinkgroups.g1.processor.selector.maxTimeOut=10000</code>

实现failover能

Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:

<code>a1.sinkgroups.g1.processor.</code><code>type</code> <code>= failover</code>

<code>a1.sinkgroups.g1.processor.priority.k1 = 5</code>

<code>a1.sinkgroups.g1.processor.priority.k2 = 7</code>

<code>a1.sinkgroups.g1.processor.priority.k3 = 6</code>

<code>a1.sinkgroups.g1.processor.maxpenalty = 20000</code>

基本功能

我们看一下,Flume NG都支持哪些功能(目前最新版本是1.5.0.1),了解它的功能集合,能够让我们在应用中更好地选择使用哪一种方案。说明Flume NG的功能,实际还是围绕着Agent的三个组件Source、Channel、Sink来看它能够支持哪些技术或协议。我们不再对各个组件支持的协议详细配置进行说明,通过列表的方式分别对三个组件进行概要说明:

<code>Source类型                    说明</code>

<code>Avro Source                    支持Avro协议(实际上是Avro RPC),内置支持</code>

<code>Thrift Source                  支持Thrift协议,内置支持</code>

<code>Exec Source                    基于Unix的</code><code>command</code><code>在标准输出上生产数据</code>

<code>JMS Source                 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过</code>

<code>Spooling Directory Source 监控指定目录内数据变更</code>

<code>Twitter 1% firehose Source   通过API持续下载Twitter数据,试验性质</code>

<code>Netcat Source                  监控某个端口,将流经端口的每一个文本行数据作为Event输入</code>

<code>Sequence Generator Source 序列生成器数据源,生产序列数据</code>

<code>Syslog Sources                 读取syslog数据,产生Event,支持UDP和TCP两种协议</code>

<code>HTTP Source                        基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式</code>

<code>Legacy Sources                 兼容老的Flume OG中Source(0.9.x版本)</code>

<code>Flume Channel</code>

<code>###############################################################</code>

<code>Channel类型                   说明</code>

<code>Memory Channel                 Event数据存储在内存中</code>

<code>JDBC Channel                   Event数据存储在持久化存储中,当前Flume Channel内置支持Derby</code>

<code>File Channel                   Event数据存储在磁盘文件中</code>

<code>Spillable Memory Channel  Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)</code>

<code>Pseudo Transaction Channel    测试用途</code>

<code>Custom Channel                 自定义Channel实现</code>

<code>Flume Sink</code>

<code>###################################################################</code>

<code>Sink类型                  说明</code>

<code>HDFS Sink                  数据写入HDFS</code>

<code>Logger Sink                    数据写入日志文件</code>

<code>Avro Sink                  数据被转换成Avro Event,然后发送到配置的RPC端口上</code>

<code>Thrift Sink                    数据被转换成Thrift Event,然后发送到配置的RPC端口上</code>

<code>IRC Sink                   数据在IRC上进行回放</code>

<code>File Roll Sink            存储数据到本地文件系统</code>

<code>Null Sink                  丢弃到所有数据</code>

<code>HBase Sink                 数据写入HBase数据库</code>

<code>Morphline Solr Sink           数据发送到Solr搜索服务器(集群)</code>

<code>ElasticSearch Sink         数据发送到Elastic Search搜索服务器(集群)</code>

<code>Kite Dataset Sink         写数据到Kite Dataset,试验性质的</code>

<code>Custom Sink                    自定义Sink实现</code>

<code>#################################################################</code>

<code>另外还有Channel Selector、Sink Processor、Event Serializer、Interceptor等组件,可以参考官网提供的用户手册。</code>

安装配置略,可以参考网上教程

下面是测试的配置文件

agent 配置文件如下

<code># Name the  components on this agent</code>

<code>a1.sources =  r1</code>

<code>a1.sinks =  k1</code>

<code>a1.channels  = c1</code>

<code>a1.sinks.k1.</code><code>type</code> <code>= avro </code>

<code>a1.sinks.k1.</code><code>hostname</code> <code>= 127.0.0.1</code>

<code>a1.sinks.k1.port = 44444 </code>

<code>a1.sinks.k1.channel = c1</code>

<code># Use a  channel which buffers events in memory</code>

<code>a1.channels.c1.</code><code>type</code>  <code>= memory</code>

<code>a1.channels.c1.capacity  = 1000</code>

<code>a1.channels.c1.transactionCapacity  = 100</code>

<code>a1.sources.r1.</code><code>type</code>  <code>= </code><code>exec</code>

<code>a1.sources.r1.</code><code>command</code>  <code>= </code><code>tail</code> <code>-F  </code><code>/var/log/nginx/access</code><code>.log</code>

<code>a1.sources.r1.channels  = c1</code>

<code>a1.sources.s.deserializer.maxLineLength=65535</code>

server端配置文件如下:  测试复制(Replication)1个source 复制到多个channels 输出到多个sink

<code>b1.sources =  r1</code>

<code>b1.sinks =  k1 k2 k3 </code>

<code>b1.channels  = c1 c2 c3</code>

<code>b1.sources.r1.selector.</code><code>type</code> <code>= replicating</code>

<code>b1.sources.r1.</code><code>type</code> <code>= avro </code>

<code>b1.sources.r1.channels = c1 c2 c3 </code>

<code>b1.sources.r1.bind = 0.0.0.0 </code>

<code>b1.sources.r1.port = 44444</code>

<code>b1.channels.c1.</code><code>type</code> <code>= </code><code>file</code> 

<code>b1.channels.c1.write-timeout = 10 </code>

<code>b1.channels.c1.keep-alive = 10 </code>

<code>b1.channels.c1.checkpointDir = </code><code>/flume/check</code>

<code>b1.channels.c1.useDualCheckpoints = </code><code>true</code> 

<code>b1.channels.c1.backupCheckpointDir = </code><code>/flume/backup</code>

<code>b1.channels.c1.dataDirs = </code><code>/flume</code>

<code>b1.channels.c2.</code><code>type</code><code>=memory  </code>

<code>b1.channels.c2.capacity=2000000  </code>

<code>b1.channels.c2.transactionCapacity=10000  </code>

<code>b1.channels.c3.</code><code>type</code><code>=memory  </code>

<code>b1.channels.c3.capacity=2000000  </code>

<code>b1.channels.c3.transactionCapacity=10000  </code>

<code># Describe the sink </code>

<code>b1.sinks.k1.</code><code>type</code>  <code>= hdfs</code>

<code>b1.sinks.k1.channel  = c1</code>

<code>b1.sinks.k1.hdfs.path  = hdfs:</code><code>//localhost</code><code>:9000</code><code>/user/hadoop/flume/collected/</code>

<code>b1.sinks.k1.hdfs.filePrefix  = chen_test</code>

<code>b1.sinks.k1.hdfs.round  = </code><code>true</code>

<code>b1.sinks.k1.hdfs.roundValue  = 10</code>

<code>b1.sinks.k1.hdfs.roundUnit  = minute</code>

<code>b1.sinks.k2.channel = c2</code>

<code>b1.sinks.k2.</code><code>type</code> <code>= file_roll</code>

<code>b1.sinks.k2.batchSize = 100000000</code>

<code>b1.sinks.k2.rollInterval = 1000000</code>

<code>b1.sinks.k2.serializer = TEXT</code>

<code>b1.sinks.k2.sink.directory = </code><code>/var/log/flume</code>

<code>b1.sinks.k3.channel = c3 </code>

<code>b1.sinks.k3.</code><code>type</code> <code>= logger</code>

启动测试命令

flume-ng agent -c . -f test.conf   -n b1  -Dflume.root.logger=INFO,console

-c  配置文件目录  -f配置文件  -n  节点名字  和配置文件对应     console打到终端

参考链接

<a href="http://blog.csdn.net/lskyne/article/details/37662835" target="_blank">http://blog.csdn.net/lskyne/article/details/37662835</a>

6. 测试   区分 flume日志合并在一起的日志

<code>a1配置</code>

<code>[root@host_12 </code><code>test</code><code>]</code><code># cat  a1.conf </code>

<code>a1.sources.r1.interceptors = i1</code>

<code>a1.sources.r1.interceptors.i1.</code><code>type</code><code>=static</code>

<code>a1.sources.r1.interceptors.i1.key=nginx</code>

<code>a1.sources.r1.interceptors.i1.value=nginx_1</code>

<code>a1.sources.r1.interceptors.i1.preserveExisting=</code><code>false</code>

<code>#a1.sources.r1.interceptors = i1</code>

<code>#a1.sources.r1.interceptors.i1.type = host</code>

<code>#a1.sources.r1.interceptors.i1.hostHeader = hostname</code>

<code>###匹配shell  tomcat yyyy:mm:dd:hh格式的日志</code>

<code>a1.sources.r1.shell = </code><code>/bin/bash</code> <code>-c</code>

<code>a1.sources.r1.</code><code>command</code>  <code>= </code><code>tail</code> <code>-f  </code><code>/var/log/nginx_1/access_</code><code>`</code><code>date</code> <code>+%Y%m%d%H`.log</code>

<code>#########匹配替换行里的文本的内容</code>

<code>#a1.sources.r1.interceptors.i1.type = search_replace</code>

<code>#a1.sources.r1.interceptors.i1.searchPattern = [0-9]+</code>

<code>#a1.sources.r1.interceptors.i1.replaceString = lxw1234</code>

<code>#a1.sources.r1.interceptors.i1.charset = UTF-8</code>

<code>###########################################</code>

<code>a2配置</code>

<code>[root@host_12 </code><code>test</code><code>]</code><code># cat  a2.conf </code>

<code>a2.sources =  r1</code>

<code>a2.sinks =  k1</code>

<code>a2.channels  = c1</code>

<code>a2.sources.r1.interceptors = i1</code>

<code>a2.sources.r1.interceptors.i1.</code><code>type</code><code>=static</code>

<code>a2.sources.r1.interceptors.i1.key=nginx</code>

<code>a2.sources.r1.interceptors.i1.value=nginx_2</code>

<code>a2.sources.r1.interceptors.i1.preserveExisting=</code><code>false</code>

<code>a2.sinks.k1.</code><code>type</code> <code>= avro </code>

<code>a2.sinks.k1.</code><code>hostname</code> <code>= 127.0.0.1</code>

<code>a2.sinks.k1.port = 44444 </code>

<code>a2.sinks.k1.channel = c1</code>

<code>a2.channels.c1.</code><code>type</code>  <code>= memory</code>

<code>a2.channels.c1.capacity  = 1000</code>

<code>a2.channels.c1.transactionCapacity  = 100</code>

<code>a2.sources.r1.</code><code>type</code>  <code>= </code><code>exec</code>

<code>a2.sources.r1.shell = </code><code>/bin/bash</code> <code>-c</code>

<code>a2.sources.r1.</code><code>command</code>  <code>= </code><code>tail</code> <code>-f  </code><code>/var/log/nginx_2/access_</code><code>`</code><code>date</code> <code>+%Y%m%d%H`.log</code>

<code>a2.sources.r1.channels  = c1</code>

<code>####################################</code>

<code>server配置</code>

<code>[root@host_12 </code><code>test</code><code>]</code><code># cat   h1.conf</code>

<code>serv_1.sources =  r1</code>

<code>serv_1.sinks =   k2 k3 </code>

<code>serv_1.channels  =  c2 c3</code>

<code>#serv_1.sources.r1.selector.type = replicating</code>

<code>serv_1.sources.r1.selector.</code><code>type</code> <code>= multiplexing</code>

<code>serv_1.sources.r1.selector.header = nginx</code>

<code>serv_1.sources.r1.selector.mapping.nginx_1 = c2</code>

<code>serv_1.sources.r1.selector.mapping.nginx_2 = c3</code>

<code> </code> 

<code>serv_1.sources.r1.</code><code>type</code> <code>= avro </code>

<code>serv_1.sources.r1.channels =  c2 c3 </code>

<code>serv_1.sources.r1.bind = 0.0.0.0 </code>

<code>serv_1.sources.r1.port = 44444</code>

<code>serv_1.channels.c2.</code><code>type</code><code>=memory  </code>

<code>serv_1.channels.c2.capacity=2000000  </code>

<code>serv_1.channels.c2.transactionCapacity=10000  </code>

<code>serv_1.channels.c3.</code><code>type</code><code>=memory  </code>

<code>serv_1.channels.c3.capacity=2000000  </code>

<code>serv_1.channels.c3.transactionCapacity=10000  </code>

<code>serv_1.sinks.k2.channel = c2</code>

<code>serv_1.sinks.k2.</code><code>type</code> <code>= file_roll</code>

<code>serv_1.sinks.k2.batchSize = 100000000</code>

<code>serv_1.sinks.k2.rollInterval = 1000000</code>

<code>serv_1.sinks.k2.serializer = TEXT</code>

<code>serv_1.sinks.k2.sink.directory = </code><code>/var/log/flume/</code>

<code>serv_1.sinks.k3.channel = c3 </code>

<code>serv_1.sinks.k3.</code><code>type</code> <code>= logger</code>

本文转自   tianshuai369   51CTO博客,原文链接:http://blog.51cto.com/kkkkkk/1722390

上一篇: 序列化
下一篇: RPC

继续阅读