天天看點

開源代碼學習-nsq(v0.1.1版本)源碼分析

版權聲明:本文為作者原創,如需轉載請通知本人,并标明出處和作者。擅自轉載的,保留追究其侵權的權利。golang群:570992072。qq 29185807 個人公衆号:月牙寂道長 公衆号微信号yueyajidaozhang https://blog.csdn.net/screscent/article/details/89927081

本文微信公衆号文章連結:https://mp.weixin.qq.com/s/QCXkJS7OEQ67xwWPoiW8wg

NSQ 是實時的分布式消息處理平台,其設計的目的是用來大規模地處理每天數以十億計級别的消息。NSQ 具有分布式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規模生成環境下應用的産品。

源碼位址:https://github.com/nsqio/nsq

對于一個大型的項目來講,我個人的學習習慣于從最小版本開始學起。這是因為,在一個項目最初的時候,大體功能和架構都已經成形,最初的版本,一般來說,代碼量都較少,功能集最小。學習曲線低,并且又最初版本,慢慢往高版本過渡,也能更了解項目進化的過程,也是一個學習的過程。

并且在實際使用過程中,大多數情況下,我們可能不需要那麼多的功能集,并且需要根據實際情況做一些二次開發,此時的話,也許低版本的會更貼近實際使用場景和二次開發場景。

對于nsq的學習分析,那麼就從最低版本開始。

找到所有代碼上傳日志,找到了一個最低版本的0.1.1的最後版本。

代碼量:

開源代碼學習-nsq(v0.1.1版本)源碼分析

代碼量非常少,通過看檔案名,也大緻能夠知道有哪些子產品。代碼一眼看過去還是非常清晰的。

那麼現在就開始從main函數開始分析。

github.com/nsqio/nsq/nsq.go

開源代碼學習-nsq(v0.1.1版本)源碼分析

代碼的話,不全部分析,看的是主要部分,其他部分讀者可自行分析。

從圖中,紅框中,看到啟動了4個子產品。

topicFactory

uuidFactory           

複制

tcpServer           

複制

httpServer           

複制

那麼先一個一個分析看看

topic

github.com/nsqio/nsq/topic.go

開源代碼學習-nsq(v0.1.1版本)源碼分析

name:每個topic都有一個名稱

newChannelChan:每個topic對應有很多的chan,用于對應訂閱者,此變量用于記錄新增加的訂閱者chan

channelMap:訂閱者對應的channel的map記錄

backend:msg的緩存區

incomingMessageChan:新到msg的chan通知,用于釋出者寫入通知

msgChan:用于傳遞msg的chan,用于中間釋出給訂閱者

routerSyncChan:标記chan,用于标記有msg同步的開始

readSyncChan:标記chan,用于标記msg同步的結束

channelWriterStarted:标記chan,用于初始化時,開啟MessagePump協程開啟的标記

開源代碼學習-nsq(v0.1.1版本)源碼分析

兩個全局變量

newTopicChan:用于生産和擷取topic

topicMap:用于存儲所有topic

開源代碼學習-nsq(v0.1.1版本)源碼分析

此函數就是main函數中的子產品啟動之一,開啟topic工廠

for循環中,

55:接收的是newTopicChan的資訊

57:從topicMap中查找topic資訊

58:未查找到,則建立一個topic,NewTopic

62:将查找到的或者建立的topic資訊通過retChan傳回給調用者

開源代碼學習-nsq(v0.1.1版本)源碼分析

入口則為GetTopic

建立了一個topicChan,并将其通過newTopicChan發送給topicFactory

開源代碼學習-nsq(v0.1.1版本)源碼分析

那看看NewTopic:初始化後,在35行,開啟了一個協程topic.Router

每個topic都有一個處理協程(Router),所有的操作通過chan将資訊發送到Router中,Router中根據不同的chan,進行不同的操作。

開源代碼學習-nsq(v0.1.1版本)源碼分析

Router中,接收的chan資訊:

1、newChannelChan

入口為:

開源代碼學習-nsq(v0.1.1版本)源碼分析

在一個topic中會對應多個channel,用于訂閱者,并且每個訂閱者的channel都有一個name用于辨別。

在Router中:

123:從channelMap中查找對應的channel

125:若沒有找到,則NewChannel(這個待其他子產品再分析)

129:通過retchan,将channel資訊傳回給調用者

131:針對每個channel,都會開啟一個協程MessagePump(待會分析)

2、incomingMessageChan

入口為:

開源代碼學習-nsq(v0.1.1版本)源碼分析

用于釋出者調用,釋出一個資訊。

在Router中:

136:将msg發送到msgchan中

139:若msgchan處于阻塞狀态,則将msg寫入到backend中

3、readSyncChan

這個是用于标記read sync start

148:并等待read sync end

4、exitchan

MessagePump

開源代碼學習-nsq(v0.1.1版本)源碼分析

每個訂閱者的channel,都會開啟一個協程MessagePump。

for循環中

91:從msgchan中擷取msg

92:同時也從backend中擷取緩存區中未釋出的msg

101:标記read sync read start

103-107:周遊所有的channel,将讀取的msg,釋出給所有的channel

109:标記read sync read end

子產品小結:

1、GetTopic接口提供建立和擷取topic資訊。

通過topicFactory來進行處理。

2、GetChannel接口提供建立和擷取Channel

通過topic中的Router,接受newChannelChan進行處理

3、PutMessage接口提供釋出msg接口

通過topic中的Router,接受incomingMessageChan進行處理

4、通過msgChan和backend緩沖區,用于msg的傳送。并在MessagePump中進行消息的分發。

uuid

github.com/nsqio/nsq/uuid.go

開源代碼學習-nsq(v0.1.1版本)源碼分析

代碼很簡單,是一個生成uuid的子產品。就不講解了

Tcp

github.com/nsqio/nsq/tcp.go

開源代碼學習-nsq(v0.1.1版本)源碼分析

tcp子產品,用于tcp的監聽

16:tcp的accept

20:每個tcp連結都會建構一個client

22:每個連結的處理部分為client Handle

client

github.com/nsqio/nsq/client.go

開源代碼學習-nsq(v0.1.1版本)源碼分析
開源代碼學習-nsq(v0.1.1版本)源碼分析

client中有conn連結資訊,state狀态資訊,channel對應的訂閱channel資訊。

開源代碼學習-nsq(v0.1.1版本)源碼分析

client的狀态表

開源代碼學習-nsq(v0.1.1版本)源碼分析

提供的接口write,這個就不解釋了

開源代碼學習-nsq(v0.1.1版本)源碼分析

Handle是每個連結的入口

82:讀取協定版本号

90:查找對應版本号的協定

97:真正的處理部分,protoclo.IOLoop

protocol

github.com/nsqio/nsq/protocol.go

開源代碼學習-nsq(v0.1.1版本)源碼分析

是一個接口

protocol_v1

github.com/nsqio/nsq/protocol_v1.go

v1版本的協定

開源代碼學習-nsq(v0.1.1版本)源碼分析

初始化協定

開源代碼學習-nsq(v0.1.1版本)源碼分析

協定處理部分

29:初始化了一個bufio的reader

30:for循環開始

31:開始從連結中讀取資料

開源代碼學習-nsq(v0.1.1版本)源碼分析

重點在

54:查找到MethodByName

56:調用查找到的Method

75:調用client的write,将結果傳回

那麼協定中提供了多少操作呢?

sub

開源代碼學習-nsq(v0.1.1版本)源碼分析

關鍵地方:

112:擷取topic

113:擷取channel

114:将client加入到channel中進行管理

get

開源代碼學習-nsq(v0.1.1版本)源碼分析

關鍵地方:

127:從channel中擷取Message

137-138:将msg格式化到buf中

140:将buf return

channel

github.com/nsqio/nsq/channel.go

channel子產品和topic子產品很類似

開源代碼學習-nsq(v0.1.1版本)源碼分析

name:channel的名稱

addClientChan:用于tcp連結對應的client加入channel中的傳送chan

removeClientChan:用于删除client的傳送chan

clients:用于儲存channel中的所有連結

backend:msg緩沖區

incomingMessageChan:用于msg到來的chan

msgchan:用于msg的傳送

其餘的不解釋

開源代碼學習-nsq(v0.1.1版本)源碼分析

NewChannel是在topic中的GetChannel接口,并在Router中調用,可以傳回topic源碼分析地方檢視。

如topic一樣,每個channel都開啟了一個channel.Router協程,同樣所有的操作都是通過chan來發送信号到Router中,進行操作。

開源代碼學習-nsq(v0.1.1版本)源碼分析

RemoveClient通過把信号發送給removeClientChan,在Router中操作

開源代碼學習-nsq(v0.1.1版本)源碼分析

外部發送資訊到channel中,通過incomingMessageChan發送,在Router中操作。調用的地方有topic中的MessagePump中,可以傳回topic源碼分析地方檢視。

開源代碼學習-nsq(v0.1.1版本)源碼分析

對外還有一個GetMessage接口,調用地方是在Protocol_v1中的GET中,可以傳回Protocol_v1源碼分析地方檢視。

189:從msgchan中擷取msg

190:從backend緩沖區中擷取msg

202:将msg傳回給調用者

開源代碼學習-nsq(v0.1.1版本)源碼分析
開源代碼學習-nsq(v0.1.1版本)源碼分析

1、addClientChan

将新的client添加到client數組中

2、removeClientChan

周遊client數組,将查找到的client删除掉

3、incomingMessageChan

此處的操作與topic中的類似。将msg發送到msgchan中,若msgchan阻塞,則放到backend緩沖區中。

http

github.com/nsqio/nsq/http.go

開源代碼學習-nsq(v0.1.1版本)源碼分析

httpserver中注冊了兩個處理handler:pingHandler,putHandler

開源代碼學習-nsq(v0.1.1版本)源碼分析

pingHandler,很簡單,不解釋

開源代碼學習-nsq(v0.1.1版本)源碼分析

主要操作:

86:擷取到topic

87:将msg發送到topic中

總結:

所有的子產品都分解完了。那麼現在把這些子產品連結起來。

tcp子產品監聽連結,每個連結生成一個client,client通過Protocol與channel聯系起來。

在Protocol中提供,SUB、GET操作,client提供write供Protocol調用

在SUB中,提供将client與channel聯系起來。

在GET中,提供将client從channel中擷取訂閱的msg,并調用client的write通過tcp發送給訂閱者

每個topic包含多個channel,每個channel對應有一個MessagePump,用于從topic中将msg分發給每個channel。topic中提供GetChannel用于建立和擷取channel。

topic提供一個topicFactory用于建立和擷取topic。

topic提供PutMessage用于釋出者釋出msg。而在http子產品中,提供putHandler,用于釋出者釋出msg,通過在putHandler中調用PutMessage接口,将msg釋出到topic中。

整個過程就是如此。在這個v0.1.1版本中,最主要的流程都有了。但此版本隻是一個單機系統。還不是分布式系統。

後續版本,待有時間的時候,再做分析。

龔浩華

月牙寂道長

qq:29185807

2019年05月07日