天天看點

大資料學習--flume

文章目錄

    • 官方文檔
    • flume概述
    • flume架構
    • Sources
      • netcat(監控一個端口)
      • exec(根據指令監控 一般是tail或cat)
      • spooldir(監控一個檔案夾)
      • taildir(監控多個檔案或者檔案夾 特點是:斷點續傳)
      • avro
    • Channels
      • memory(記憶體存儲 速度快 但是不安全)
      • file(本地檔案存儲 安全 速度慢)
    • Sinks
      • logger(輸出到控制台)
      • hdfs(輸出到hdfs)
      • kafka(輸出到kafka)
      • filr row(儲存在本地檔案)
    • 選擇器副本機制(複制)
    • 故障轉移
    • 負載均衡
    • 自定義攔截器

官方文檔

中文官方文檔

flume概述

Apache Flume 是一個分布式、高可靠、高可用的用來收集、聚合、轉移不同來源的大量日志資料到中央資料倉庫的工具

flume架構

Client:用戶端,資料産生的地方,如Web伺服器

Event:事件,指通過Agent傳輸的單個資料包,如日志資料通常對應一行資料

Agent:代理,一個獨立的JVM程序

Flume以一個或多個Agent部署運作

Agent包含三個元件:Source,Channel,Sink

大資料學習--flume

Agent中的三個元件Source、Channel、Sink

Source表示從哪裡讀

Channel表示存儲方式

Sink表示傳輸到哪裡去

Sources

netcat(監控一個端口)

大資料學習--flume

/opt/flume1.6/conf/job

目錄下建立一個conf檔案

netcat-flume-logger.conf

#Name 給三個元件起别名
a1.sources = r1   
a1.sinks = k1     
a1.channels = c1   

#Sources:類型選用netcat 監控的端口為本地的44444端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Sink:類型選為logger直接在控制台輸出友善測試
a1.sinks.k1.type = logger

#Channel:類型選用memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind 定義連接配接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

儲存後啟動agent指令

flume-ng agent 
--conf /opt/flume1.6/conf     //flume目錄下的conf檔案夾路徑
--conf-file /opt/flume1.6/conf/job/netcat-flume-logger.conf   //自己編寫的conf檔案路徑
--name a1   //自己編寫的conf檔案中的名字a1
-Dflume.root.logger=INFO,console   //當sink類型為logger時 此參數表示從控制台輸出 其他類型不需要
           

簡寫形式:

flume-ng agent -c conf -f /opt/flume1.6/conf/job/netcat-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

啟動之後:

大資料學習--flume

新開一個虛拟機視窗

輸入指令進入44444端口

telnet localhost 44444

如果報錯,執行下列指令安裝telnet

yum list telnet*			列出telnet相關的安裝包
yum install telnet-server	安裝telnet服務
yum install telnet.*		安裝telnet用戶端
           

成功後顯示:

大資料學習--flume

在端口輸入任意字元:

大資料學習--flume

都會在之前開啟的監控視窗中輸出在控制台:

大資料學習--flume

exec(根據指令監控 一般是tail或cat)

大資料學習--flume

/opt/flume1.6/conf/job

目錄下建立一個conf檔案

exec-flume-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /data/test.log  // 檔案格式任意 一般監控日志檔案 

a1.sinks.k1.type = logger   //這裡的sink還是選擇控制台輸出

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

           

啟動agent之後

/data/test.log

檔案中傳輸字段

控制台就會顯示

大資料學習--flume

spooldir(監控一個檔案夾)

大資料學習--flume

/opt/flume1.6/conf/job

目錄下建立一個conf檔案

spooldir-flume-hdfs.conf

a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = spooldir  //source類型為spooldir
a1.sources.r1.spoolDir = /data/test  //監控的檔案夾
a1.sources.r1.deserializer = LINE  
a1.sources.r1.deserializer.maxLineLength = 600000
a1.sources.r1.includePattern = test__[0-9]{4}-[0-9]{2}-[0-9]{2}.csv  //監控符合此正規表達式的檔案

a1.channels.c1.type = file   //channel類型選擇file 
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test

a1.sinks.s1.type = hdfs   //sink類型選擇hdfs  結果輸出到hdfs上
a1.sinks.s1.hdfs.fileType = DataStream
a1.sinks.s1.hdfs.filePrefix = test_     //輸出到hdfs上是加的字首
a1.sinks.s1.hdfs.fileSuffix = .csv   //輸出到hdfs上是加的字尾
a1.sinks.s1.hdfs.path = hdfs://192.168.226.101:9000/data/test/%Y-%m-%d    //hdfs路徑
a1.sinks.s1.hdfs.useLocalTimeStamp = true       //是否使用本地時間戳
a1.sinks.s1.hdfs.batchSize = 640
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.rollSize = 64000000
a1.sinks.s1.hdfs.rollInterval = 30

a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
           

taildir(監控多個檔案或者檔案夾 特點是:斷點續傳)

大資料學習--flume
a1.sources = r1   
a1.sinks = k1     
a1.channels = c1  

a1.sources.r1.type = TATLDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /data/test1.log
a1.sources.r1.filegroups.f2 = /data/test2.txt
#存儲inode資訊 實作斷點續傳 不配置會傳入預設檔案夾
a1.sources.r1.positionFile = /opt/position/position.json

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

avro

大資料學習--flume

avro的source和sink可以參考下面的負載均衡和故障轉移

Channels

memory(記憶體存儲 速度快 但是不安全)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /data/test.log  

a1.sinks.k1.type = logger  

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
           

file(本地檔案存儲 安全 速度慢)

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/kb07Flume/flumeFile/test
a1.sources.s1.deserializer = LINE
a1.sources.s1.deserializer.maxLineLength = 60000
a1.sources.s1.includePattern = test_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.batchSize = 640
a1.sinks.k1.brokerList = 192.168.226.101:9092
a1.sinks.k1.topic = test

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
           

Sinks

logger(輸出到控制台)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /data/test.log  

a1.sinks.k1.type = logger   

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

           

hdfs(輸出到hdfs)

a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = spooldir  //source類型為spooldir
a1.sources.r1.spoolDir = /data/test  //監控的檔案夾
a1.sources.r1.deserializer = LINE  
a1.sources.r1.deserializer.maxLineLength = 600000
a1.sources.r1.includePattern = test__[0-9]{4}-[0-9]{2}-[0-9]{2}.csv  //監控符合此正規表達式的檔案

a1.channels.c1.type = file   //channel類型選擇file 
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test

a1.sinks.s1.type = hdfs   //sink類型選擇hdfs  結果輸出到hdfs上
a1.sinks.s1.hdfs.fileType = DataStream
a1.sinks.s1.hdfs.filePrefix = test_     //輸出到hdfs上是加的字首
a1.sinks.s1.hdfs.fileSuffix = .csv   //輸出到hdfs上是加的字尾
a1.sinks.s1.hdfs.path = hdfs://192.168.226.101:9000/data/test/%Y-%m-%d    //hdfs路徑
a1.sinks.s1.hdfs.useLocalTimeStamp = true       //是否使用本地時間戳
a1.sinks.s1.hdfs.batchSize = 640
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.rollSize = 64000000
a1.sinks.s1.hdfs.rollInterval = 30

a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1
           

kafka(輸出到kafka)

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/kb07Flume/flumeFile/test
a1.sources.s1.deserializer = LINE
a1.sources.s1.deserializer.maxLineLength = 60000
a1.sources.s1.includePattern = test_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/kb07Flume/flumeFile/checkpoint/test
a1.channels.c1.dataDirs = /opt/kb07Flume/flumeFile/data/test

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.batchSize = 640
a1.sinks.k1.brokerList = 192.168.226.101:9092
a1.sinks.k1.topic = test

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
           

filr row(儲存在本地檔案)

可以參考下面的 複制

選擇器副本機制(複制)

模式為一份輸入,多份輸出

需要建立兩個channel 兩個sink 一份輸出到hdfs 一分輸出儲存到本地檔案

建立flume1.conf

#Name  複制
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

#Source
a1.sources.r1.type = TATLDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/test1.log
#存儲inode資訊 實作斷點續傳 不配置會傳入預設檔案夾
a1.sources.r1.positionFile = /opt/position/position1.json

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop1
a1.sinks.k2.port = 4142

#Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
           

建立flume2.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1


a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop1
a2.sources.r1.port = 4141

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = hdfs   
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.filePrefix = test_     
a2.sinks.k1.hdfs.fileSuffix = .csv   
a2.sinks.k1.hdfs.path = hdfs://192.168.226.101:9000/data/test/%Y-%m-%d    
a2.sinks.k1.hdfs.useLocalTimeStamp = true     
a2.sinks.k1.hdfs.batchSize = 640
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.rollSize = 64000000
a2.sinks.k1.hdfs.rollInterval = 30

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
           

建立flume3.conf

a3.sources = r1
a3.channels = c1
a3.sinks = k1


a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop1
a3.sources.r1.port = 4142

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink:file_row儲存到本地檔案 需要先建立目錄
a3.sinks.k1.type = file_row   
a3.sinks.k1.sink.directory = /data/file_row


#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
           

故障轉移

模式為一個channel對兩個sinks,優先使用優先級高的 k2挂掉 自動使用k1

建立三個flume.conf

flume1.conf

#故障轉移 

#Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1

#Sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop1
a1.sinks.k2.port = 4142

#Sink Group
a1.sinkgroups.g1.sink = k1 k2
#故障轉移的政策
a1.sinkgroups.g1.processor.type = failover
#優先級
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
           

flume2.conf

#Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

#Sources
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop1
a2.sources.r1.port = 4141

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
           

flume3.conf

#Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

#Sources
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop1
a3.sources.r1.port = 4142

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
           

負載均衡

模式為一個source輸入 将接受的資訊平均分發到多個sink上 不會造成資料傾斜

先建立三個flume.conf

建立flume1.conf

#負載均衡

#Name
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1

#Sources
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop1
a1.sinks.k2.port = 4142

#Sink Group
a1.sinkgroups.g1.sink = k1 k2
#負載均衡的政策
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
           

建立flume2.conf

#Name
a2.sources = r1
a2.sinks = k1
a2.channels = c1

#Sources
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop1
a2.sources.r1.port = 4141

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
           

建立flume3.conf

#Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

#Sources
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop1
a3.sources.r1.port = 4142

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
           

自定義攔截器

在idea中編寫自定義攔截器并打包上傳到

/opt/flume1.6/lib

導入maven依賴

<dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.6.0</version>
    </dependency>
           
package cn.kgc.kb07.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author WGY
 * 攔截器
 */
public class InterceptorDemo implements Interceptor {
    //聲明一個存放時間的集合
    private List<Event> addHeaderEvents;

    @Override
    public void initialize() {
        //初始化
        addHeaderEvents = new ArrayList<>();

    }

    //對單個事件處理
    @Override
    public Event intercept(Event event) {
        //擷取事件中的頭資訊
        Map<String, String> headers = event.getHeaders();

        //擷取事件中的body資訊
        String body = new String(event.getBody());

        //根據body中是否含有“hello”來決定添加怎樣的頭資訊
        if(body.contains("hello")){
            //添加頭資訊
            headers.put("type","1");
        }else{
            //添加頭資訊
            headers.put("type","2");
        }


        return null;
    }

    //對批量事件處理
    @Override
    public List<Event> intercept(List<Event> list) {
        //1、清空集合
        addHeaderEvents.clear();
        //2、周遊list,
        for (Event event : list) {
            //3、給每一個事件添加頭資訊
            addHeaderEvents.add(intercept(event));
        }
        //傳回結果
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new InterceptorDemo();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

           
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1   //自定義攔截器
a1.sources.r1.interceptors.i1.type = cn.kgc.kb07.flume.InterceptorDemo$Builder  

a1.sources.r1.selector.type = multiplexing  //定義選擇器
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.gree = c1
a1.sources.r1.selector.mapping.lijia = c2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.filePrefix = gree
a1.sinks.k1.hdfs.fileSuffix = .csv
a1.sinks.k1.hdfs.path = hdfs://192.168.226.101:9000/data/greedemo/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 640
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 100
a1.sinks.k1.hdfs.rollInterval = 3

a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.filePrefix = lijia
a1.sinks.k2.hdfs.fileSuffix = .csv
a1.sinks.k2.hdfs.path = hdfs://192.168.226.101:9000/data/lijiademo/%Y-%m-%d
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.batchSize = 640
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollSize = 100
a1.sinks.k2.hdfs.rollInterval = 3

a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
           

繼續閱讀