天天看點

Go實作海量日志收集系統(二)

一篇文章主要是關于整體架構以及用到的軟體的一些介紹,這一篇文章是對各個軟體的使用介紹,當然這裡主要是關于架構中我們agent的實作用到的内容

關于zookeeper+kafka

我們需要先把兩者啟動,先啟動zookeeper,再啟動kafka

啟動ZooKeeper:./bin/zkServer.sh start

啟動kafka:./bin/kafka-server-start.sh ./config/server.properties 

操作kafka需要安裝一個包:go get github.com/Shopify/sarama

寫一個簡單的代碼,通過go調用往kafka裡扔資料:

package main

import (
    "github.com/Shopify/sarama"
    "fmt"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true
    msg := &sarama.ProducerMessage{}
    msg.Topic = "nginx_log"
    msg.Value = sarama.StringEncoder("this is a good test,my message is zhaofan")
    client,err := sarama.NewSyncProducer([]string{"192.168.0.118:9092"},config)
    if err != nil{
        fmt.Println("producer close err:",err)
        return
    }
    defer client.Close()

    pid,offset,err := client.SendMessage(msg)
    if err != nil{
        fmt.Println("send message failed,",err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n",pid,offset)
}      

config.Producer.RequiredAcks = sarama.WaitForAll 這裡表示是在給kafka扔資料的時候是否需要确認收到kafka的ack消息

msg.Topic = "nginx_log" 因為kafka是一個分布式系統,假如我們要讀的是nginx日志,apache日志,我們可以根據topic做區分,同時也是我們也可以有不同的分區

我們将上述代碼執行一下,就會往kafka中扔一條消息,可以通過kakfa中自帶的消費者指令檢視:

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nginx_log --from-beginning

Go實作海量日志收集系統(二)

我們可以将最後的代碼稍微更改一下,更改為循環發送:

for{
    pid,offset,err := client.SendMessage(msg)
    if err != nil{
        fmt.Println("send message failed,",err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n",pid,offset)
    time.Sleep(2*time.Second)
}      

這樣當我們再次執行的程式的時候,我們可以看到用戶端在不停的消費到資料:

Go實作海量日志收集系統(二)

這樣我們就實作一個kakfa的生産者的簡單的demo

接下來我們還需要知道一個工具的使用tailf

tailf

我們的agent需要讀日志目錄下的日志檔案,而日志檔案是不停的增加并且切換檔案的,是以我們就需要借助于tailf這個包來讀檔案,當然這裡的tailf和linux裡的tail -f指令雖然不同,但是效果是差不多的,都是為了擷取日志檔案新增加的内容。

而我們的用戶端非常重要的一個地方就是要讀日志檔案并且将讀到的日志檔案推送到kafka

這裡需要我們下載下傳一個包:go get github.com/hpcloud/tail

我們通過下面一個例子對這個包進行一個基本的使用,更詳細的api說明看:https://godoc.org/github.com/hpcloud/tail

package main

import (
    "github.com/hpcloud/tail"
    "fmt"
    "time"
)

func main() {
    filename := "/Users/zhaofan/go_project/src/go_dev/13/tailf/my.log"
    tails,err := tail.TailFile(filename,tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    })

    if err !=nil{
        fmt.Println("tail file err:",err)
        return
    }

    var msg *tail.Line
    var ok bool
    for true{
        msg,ok = <-tails.Lines
        if !ok{
            fmt.Printf("tail file close reopen,filenam:%s\n",tails,filename)
            time.Sleep(100*time.Millisecond)
            continue
        }
        fmt.Println("msg:",msg.Text)
    }
}      

最終實作的效果是當你檔案裡面添加内容後,就可以不斷的讀取檔案中的内容

日志庫的使用

這裡是通過beego的日志庫實作的,beego的日志庫是可以單獨拿出來用的,還是非常友善的,使用例子如下:

package main

import (
    "github.com/astaxie/beego/logs"
    "encoding/json"
    "fmt"
)

func main() {
    config := make(map[string]interface{})
    config["filename"] = "/Users/zhaofan/go_project/src/go_dev/13/log/logcollect.log"
    config["level"] = logs.LevelTrace
    configStr,err := json.Marshal(config)
    if err != nil{
        fmt.Println("marshal failed,err:",err)
        return
    }
    logs.SetLogger(logs.AdapterFile,string(configStr))
    logs.Debug("this is a debug,my name is %s","stu01")
    logs.Info("this is a info,my name is %s","stu02")
    logs.Trace("this is trace my name is %s","stu03")
    logs.Warn("this is a warn my name is %s","stu04")
}      

簡單版本logagent的實作

這裡主要是先實作核心的功能,後續再做優化和改進,主要實作能夠根據配置檔案中配置的日志路徑去讀取日志并将讀取的實時推送到kafka消息隊列中

關于logagent的主要結構如下:

Go實作海量日志收集系統(二)

程式目錄結構為:

├── conf
│   └── app.conf
├── config.go
├── kafka.go
├── logs
│   └── logcollect.log
├── main.go
└── server.go      

app.conf :配置檔案

config.go:用于初始化讀取配置檔案中的内容,這裡的配置檔案加載是通過之前自己實作的配置檔案熱加載包處理的,部落格位址:http://www.cnblogs.com/zhaof/p/8593204.html

logcollect.log:日志檔案

kafka.go:對kafka的操作,包括初始化kafka連接配接,以及給kafka發送消息

server.go:主要是tail 的相關操作,用于去讀日志檔案并将内容放到channel中

是以這裡我們主要的代碼邏輯或者重要的代碼邏輯就是kafka.go 以及server.go

kafka.go代碼内容為:

// 這裡主要是kafak的相關操作,包括了kafka的初始化,以及發送消息的操作
package main

import (
    "github.com/Shopify/sarama"
    "github.com/astaxie/beego/logs"
)

var (
    client sarama.SyncProducer
    kafkaSender *KafkaSender
)

type KafkaSender struct {
    client sarama.SyncProducer
    lineChan chan string
}

// 初始化kafka
func NewKafkaSender(kafkaAddr string)(kafka *KafkaSender,err error){
    kafka = &KafkaSender{
        lineChan:make(chan string,100000),
    }
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    client,err := sarama.NewSyncProducer([]string{kafkaAddr},config)
    if err != nil{
        logs.Error("init kafka client failed,err:%v\n",err)
        return
    }
    kafka.client = client
    for i:=0;i<appConfig.KafkaThreadNum;i++{
        // 根據配置檔案循環開啟線程去發消息到kafka
        go kafka.sendToKafka()
    }
    return
}

func initKafka()(err error){
    kafkaSender,err = NewKafkaSender(appConfig.kafkaAddr)
    return
}

func (k *KafkaSender) sendToKafka(){
    //從channel中讀取日志内容放到kafka消息隊列中
    for v := range k.lineChan{
        msg := &sarama.ProducerMessage{}
        msg.Topic = "nginx_log"
        msg.Value = sarama.StringEncoder(v)
        _,_,err := k.client.SendMessage(msg)
        if err != nil{
            logs.Error("send message to kafka failed,err:%v",err)
        }
    }
}

func (k *KafkaSender) addMessage(line string)(err error){
    //我們通過tailf讀取的日志檔案内容先放到channel裡面
    k.lineChan <- line
    return
}      

server.go的代碼為:

package main

import (
    "github.com/hpcloud/tail"
    "fmt"
    "sync"
    "github.com/astaxie/beego/logs"
    "strings"
)

type TailMgr struct {
    //因為我們的agent可能是讀取多個日志檔案,這裡通過存儲為一個map
    tailObjMap map[string]*TailObj
    lock sync.Mutex
}

type TailObj struct {
    //這裡是每個讀取日志檔案的對象
    tail *tail.Tail
    offset int64  //記錄目前位置
    filename string
}

var tailMgr *TailMgr
var waitGroup sync.WaitGroup

func NewTailMgr()(*TailMgr){
    tailMgr =  &TailMgr{
        tailObjMap:make(map[string]*TailObj,16),
    }
    return tailMgr
}

func (t *TailMgr) AddLogFile(filename string)(err error){
    t.lock.Lock()
    defer t.lock.Unlock()
    _,ok := t.tailObjMap[filename]
    if ok{
        err = fmt.Errorf("duplicate filename:%s\n",filename)
        return
    }
    tail,err := tail.TailFile(filename,tail.Config{
        ReOpen:true,
        Follow:true,
        Location:&tail.SeekInfo{Offset:0,Whence:2},
        MustExist:false,
        Poll:true,
    })

    tailobj := &TailObj{
        filename:filename,
        offset:0,
        tail:tail,
    }
    t.tailObjMap[filename] = tailobj
    return
}

func (t *TailMgr) Process(){
    //開啟線程去讀日志檔案
    for _, tailObj := range t.tailObjMap{
        waitGroup.Add(1)
        go tailObj.readLog()
    }
}

func (t *TailObj) readLog(){
    //讀取每行日志内容
    for line := range t.tail.Lines{
        if line.Err != nil {
            logs.Error("read line failed,err:%v",line.Err)
            continue
        }
        str := strings.TrimSpace(line.Text)
        if len(str)==0 || str[0] == '\n'{
            continue
        }

        kafkaSender.addMessage(line.Text)
    }
    waitGroup.Done()
}


func RunServer(){
    tailMgr = NewTailMgr()
    // 這一部分是要調用tailf讀日志檔案推送到kafka中
    for _, filename := range appConfig.LogFiles{
        err := tailMgr.AddLogFile(filename)
        if err != nil{
            logs.Error("add log file failed,err:%v",err)
            continue
        }

    }
    tailMgr.Process()
    waitGroup.Wait()
}      

可以整體示範一下代碼實作的效果,當我們運作程式之後我配置檔案配置的目錄為:

log_files=/app/log/a.log,/Users/zhaofan/a.log

我通過一個簡單的代碼對對a.log循環追加内容,你可以從kafka的用戶端消費力看到内容了:

Go實作海量日志收集系統(二)

完成的代碼位址:https://github.com/pythonsite/logagent

小結

這次隻是實作logagent的核心功能,實作了從日志檔案中通過多個線程擷取要讀的日志内容,這裡借助了tailf,并将擷取的内容放到channel中,kafka.go會從channel中取出日志内容并放到kafka的消息隊列中

這裡并沒有做很多細緻的處理,下一篇文章會在這個代碼的基礎上進行改進。同時現在的配置檔案的方式也不是最佳的,每次改動配置檔案都需要重新啟動程式,後面将通過etcd的方式。

所有的努力都值得期許,每一份夢想都應該灌溉!