天天看點

剝開比原看代碼07:比原節點收到“請求區塊資料”的資訊後如何應答?

作者:freewind

比原項目倉庫:

Github位址:https://github.com/Bytom/bytom

Gitee位址:https://gitee.com/BytomBlockchain/bytom

在上一篇,我們知道了比原是如何把“請求區塊資料”的資訊

BlockRequestMessage

發送給peer節點的,那麼本文研究的重點就是,當peer節點收到了這個資訊,它将如何應答?

那麼這個問題如果細分的話,也可以分為三個小問題:

  1. 比原節點是如何收到對方發過來的資訊的?
  2. 收到

    BlockRequestMessage

    後,将會給對方發送什麼樣的資訊?
  3. 這個資訊是如何發送出去的?

我們先從第一個小問題開始。

比原節點是如何接收對方發過來的資訊的?

如果我們在代碼中搜尋

BlockRequestMessage

,會發現隻有在

ProtocolReactor.Receive

方法中針對該資訊進行了應答。那麼問題的關鍵就是,比原是如何接收對方發過來的資訊,并且把它轉交給

ProtocolReactor.Receive

的。

如果我們對前一篇《比原是如何把請求區塊資料的資訊發出去的》有印象的話,會記得比原在發送資訊時,最後會把資訊寫入到

MConnection.bufWriter

中;與之相應的,

MConnection

還有一個

bufReader

,用于讀取資料,它也是與

net.Conn

綁定在一起的:

p2p/connection.go#L114-L118
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
    mconn := &MConnection{
        conn:        conn,
        bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
        bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
      

(其中

minReadBufferSize

的值為常量

1024

是以,要讀取對方發來的資訊,一定會讀取

bufReader

。經過簡單的搜尋,我們發現,它也是在

MConnection.Start

中啟動的:

p2p/connection.go#L152-L159
func (c *MConnection) OnStart() error {
    // ...
    go c.sendRoutine()
    go c.recvRoutine()
    // ...
}
      

其中的

c.recvRoutine()

就是我們本次所關注的。它上面的

c.sendRoutine

是用來發送的,是前一篇文章中我們關注的重點。

繼續

c.recvRoutine()

p2p/connection.go#L403-L502
func (c *MConnection) recvRoutine() {
    // ...
    for {
        c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)

        // ...

        pktType := wire.ReadByte(c.bufReader, &n, &err)
        c.recvMonitor.Update(int(n))
        // ...

        switch pktType {
        // ...
        case packetTypeMsg:
            pkt, n, err := msgPacket{}, int(0), error(nil)
            wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
            c.recvMonitor.Update(int(n))
            // ...
            channel, ok := c.channelsIdx[pkt.ChannelID]
            // ...
            msgBytes, err := channel.recvMsgPacket(pkt)
            // ...
            if msgBytes != nil {
                // ...
                c.onReceive(pkt.ChannelID, msgBytes)
            }
            // ...
        }
    }
    // ...
}
      

經過簡化以後,這個方法分成了三塊内容:

  1. 第一塊就限制接收速率,以防止惡意結點突然發送大量資料把節點撐死。跟發送一樣,它的限制是

    500K/s

  2. 第二塊是從

    c.bufReader

    中讀取出下一個資料包的類型。它的值目前有三個,兩個跟心跳有關:

    packetTypePing

    packetTypePong

    ,另一個表示是正常的資訊資料類型

    packetTypeMsg

    ,也是我們需要關注的
  3. 第三塊就是繼續從

    c.bufReader

    中讀取出完整的資料包,然後根據它的

    ChannelID

    找到相應的channel去處理它。

    ChannelID

    有兩個值,分别是

    BlockchainChannel

    PexChannel

    ,我們目前隻需要關注前者即可,它對應的reactor是

    ProtocolReactor

    。當最後調用

    c.onReceive(pkt.ChannelID, msgBytes)

    時,讀取的二進制資料

    msgBytes

    就會被

    ProtocolReactor.Receive

    處理

我們的重點是看第三塊内容。首先是

channel.recvMsgPacket(pkt)

,即通道是怎麼從packet包裡讀取到相應的二進制資料的呢?

p2p/connection.go#L667-L682
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
    // ...
    ch.recving = append(ch.recving, packet.Bytes...)
    if packet.EOF == byte(0x01) {
        msgBytes := ch.recving
        // ...
        ch.recving = ch.recving[:0]
        return msgBytes, nil
    }
    return nil, nil
}
      

這個方法我去掉了一些錯誤檢查和關于性能方面的注釋,有興趣的同學可以點接上方的源代碼檢視,這裡就忽略了。

這段代碼主要是利用了一個叫

recving

的通道,把

packet

中持有的位元組數組加到它後面,然後再判斷該packet是否代表整個資訊結束了,如果是的話,則把

ch.recving

的内容完整傳回,供調用者處理;否則的話,傳回一個

nil

,表示還沒拿完,暫時處理不了。在前一篇文章中關于發送資料的地方可以與這裡對應,隻不過發送方要麻煩的多,需要三個通道

sendQueue

sending

send

才能實作,這邊接收方就簡單了。

然後回到前面的方法

MConnection.recvRoutine

,我們繼續看最後的

c.onReceive

調用。這個

onReceive

實際上是一個由别人指派給該channel的一個函數,它位于

MConnection

建立的地方:

p2p/peer.go#L292-L310
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
    onReceive := func(chID byte, msgBytes []byte) {
        reactor := reactorsByCh[chID]
        if reactor == nil {
            if chID == PexChannel {
                return
            } else {
                cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
            }
        }
        reactor.Receive(chID, p, msgBytes)
    }

    onError := func(r interface{}) {
        onPeerError(p, r)
    }

    return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}
      

邏輯也比較簡單,就是目前面的

c.onReceive(pkt.ChannelID, msgBytes)

調用時,它會根據傳入的

chID

找到相應的

Reactor

,然後執行其

Receive

方法。對于本文來說,就會進入到

ProtocolReactor.Receive

那我們繼續看

ProtocolReactor.Receive

:

netsync/protocol_reactor.go#L179-L247
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
    _, msg, err := DecodeMessage(msgBytes)
    // ...
    switch msg := msg.(type) {
    case *BlockRequestMessage:
        // ...
}
      

DecodeMessage(...)

就是把傳入的二進制資料反序列化成一個

BlockchainMessage

對象,該對象是一個沒有任何内容的

interface

,它有多種實作類型。我們在後面繼續對該對象進行判斷,如果它是

BlockRequestMessage

類型的資訊,我們就會繼續做相應的處理。處理的代碼我在這裡暫時省略了,因為它是屬于下一個小問題的,我們先不考慮。

好像不知不覺我們就把第一個小問題的後半部分差不多搞清楚了。那麼前半部分是什麼?我們在前面說,讀取

bufReader

的代碼的起點是在

MConnection.Start

中,那麼前半部分就是:比原從啟動開始中,是在什麼情況下怎樣一步步走到

MConnection.Start

的呢?

好在前半部分的問題我們在前一篇文章《比原是如何把請求區塊資料的資訊發出去的》中進行了專門的讨論,這裡就不講了,有需要的話可以再過去看一下(可以先看最後“總結”那一小節)。

下面我們進入第二個小問題:

BlockRequestMessage

這裡就是接着前面的

ProtocolReactor.Receive

繼續向下講了。首先我們再貼一下它的較完整的代碼:

func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
    _, msg, err := DecodeMessage(msgBytes)
    // ...

    switch msg := msg.(type) {
    case *BlockRequestMessage:
        var block *types.Block
        var err error
        if msg.Height != 0 {
            block, err = pr.chain.GetBlockByHeight(msg.Height)
        } else {
            block, err = pr.chain.GetBlockByHash(msg.GetHash())
        }
        // ...
        response, err := NewBlockResponseMessage(block)
        // ...
        src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
    // ...
}
      

可以看到,邏輯還是比較簡單的,即根據對方發過來的

BlockRequestMessage

中指定的

height

或者

hash

資訊,在本地的區塊鍊資料中找到相應的block,組成

BlockResponseMessage

發過去就行了。

其中

chain.GetBlockByHeight(...)

chain.GetBlockByHash(...)

如果詳細說明的話,需要深刻了解區塊鍊資料在比原節點中是如何儲存的,我們在本文先不講,等到後面專門研究。

在這裡,我覺得我們隻需要知道我們會查詢區塊資料并且構造出一個

BlockResponseMessage

,再通過

BlockchainChannel

這個通道發送出去就可以了。

最後一句代碼中調用了

src.TrySend

方法,它是把資訊向對方peer發送過去。(其中的

src

就是指的對方peer)

那麼,它到底是怎麼發送出去的呢?下面我們進入最後一個小問題:

這個

BlockResponseMessage

資訊是如何發送出去的?

我們先看看

peer.TrySend

代碼:

p2p/peer.go#L242-L247
func (p *Peer) TrySend(chID byte, msg interface{}) bool {
    if !p.IsRunning() {
        return false
    }
    return p.mconn.TrySend(chID, msg)
}
      

它在内部将會調用

MConnection.TrySend

方法,其中

chID

BlockchainChannel

,也就是它對應的Reactor是

ProtocolReactor

再接着就是我們熟悉的

MConnection.TrySend

,由于它在前一篇文章中進行了全面的講解,在本文就不提了,如果需要可以過去翻看一下。

那麼今天的問題就算是解決啦。

到這裡,我們總算能夠完整的了解清楚,當我們向一個比原節點請求“區塊資料”,我們這邊需要怎麼做,對方節點又需要怎麼做了。