天天看點

剖析nsq消息隊列(四) 消息的負載處理

剖析nsq消息隊列-目錄

實際應用中,一部分服務叢集可能會同時訂閱同一個

topic

,并且處于同一個

channel

下。當

nsqd

有消息需要發送給訂閱用戶端去處理時,發給哪個用戶端是需要考慮的,也就是我要說的消息的負載。

如果不考慮負載情況,把随機的把消息發送到某一個客服端去處理消息,如果機器的性能不同,可能發生的情況就是某一個或幾個用戶端處理速度慢,但還有大量新的消息需要處理,其他的用戶端處于空閑狀态。理想的狀态是,找到目前相對空閑的用戶端去處理消息。

nsq

的處理方式是用戶端主動向

nsqd

報告自已的可處理消息數量(也就是

RDY

指令)。

nsqd

根據每個連接配接的用戶端的可處理消息的狀态來随機把消息發送到可用的用戶端,來進行消息處理

如下圖所示:

用戶端更新RDY

從第一篇文章的例子中我們就有配置consumer的config

config := nsq.NewConfig()
    config.MaxInFlight = 1000
    config.MaxBackoffDuration = 5 * time.Second
    config.DialTimeout = 10 * time.Second           

MaxInFlight

來設定最大的進行中的消息數量,會根據這個變量計算在是否更新

RDY

初始化的時候 用戶端會向連接配接的nsqd服務端來發送updateRDY來設定最大處理數,

func (r *Consumer) maybeUpdateRDY(conn *Conn) {
    inBackoff := r.inBackoff()
    inBackoffTimeout := r.inBackoffTimeout()
    if inBackoff || inBackoffTimeout {
        r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
            conn, inBackoff, inBackoffTimeout)
        return
    }

    remain := conn.RDY()
    lastRdyCount := conn.LastRDY()
    count := r.perConnMaxInFlight()

    // refill when at 1, or at 25%, or if connections have changed and we're imbalanced
    if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
        r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
            conn, count, remain, lastRdyCount)
        r.updateRDY(conn, count)
    } else {
        r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
            conn, count, remain, lastRdyCount)
    }
}           

當剩餘的可用處理數量

remain

小于等于1,或者小于最後一次設定的可用數量

lastRdyCount

的1/4時,或者可用連接配接平均的maxInFlight大于0并且小于

remain

時,則更新

RDY

狀态

當有多個

nsqd

時,會把最大的消息進行平均計算:

// perConnMaxInFlight calculates the per-connection max-in-flight count.
//
// This may change dynamically based on the number of connections to nsqd the Consumer
// is responsible for.
func (r *Consumer) perConnMaxInFlight() int64 {
    b := float64(r.getMaxInFlight())
    s := b / float64(len(r.conns()))
    return int64(math.Min(math.Max(1, s), b))
}
           

當有消息從

nsqd

發送過來後也會調用

maybeUpdateRDY

方法,計算是否需要發送

RDY

指令

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
    atomic.AddInt64(&r.totalRdyCount, -1)
    atomic.AddUint64(&r.messagesReceived, 1)
    r.incomingMessages <- msg
    r.maybeUpdateRDY(c)
}           

上面就是主要的處理邏輯,但還有一些邏輯,就是當消息處理發生錯誤時,

nsq

有自己的退避算法

backoff

也會更新

RDY

簡單來說就是當發現有處理錯誤時,來進行重試和指數退避,在退避期間

RDY

會為0,重試時會先放嘗試

RDY

為1看有沒有錯誤,如果沒有錯誤則全部放開,這個算法這篇文章我就不詳細說了。

服務端nsqd選擇用戶端進行發送消息

同時訂閱同一

topic

的用戶端(comsumer)有很多個,每個用戶端根據自己的配置或狀态發送

RDY

指令到

nsqd

表明自己能處理多少消息量

nsqd服務端會檢查每個用戶端的的狀态是否可以發送消息。也就是

IsReadyForMessages

方法,判斷inFlightCount是否大于readyCount,如果大于或者等于就不再給用戶端發送資料,等待

Ready

後才會再給用戶端發送資料

func (c *clientV2) IsReadyForMessages() bool {
    if c.Channel.IsPaused() {
        return false
    }

    readyCount := atomic.LoadInt64(&c.ReadyCount)
    inFlightCount := atomic.LoadInt64(&c.InFlightCount)

    c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)

    if inFlightCount >= readyCount || readyCount <= 0 {
        return false
    }

    return true
           

每一次發送消息

inFlightCount

會+1并儲存到發送中的隊列中,當用戶端發送FIN會-1在之前的文章中有說過。

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    // ...
    for {
        // 檢查訂閱狀态和消息是否可處理狀态    
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // ...
            flushed = true
        } else if flushed {
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan:
            // ...
        // 消息處理            
        case b := <-backendMsgChan:
            client.SendingMessage()
            // ...
        case msg := <-memoryMsgChan:
            client.SendingMessage()        
            //...
        }
    }
// ...
}           

繼續閱讀