版權聲明:本文為作者原創,如需轉載請通知本人,并标明出處和作者。擅自轉載的,保留追究其侵權的權利。golang群:570992072。qq 29185807 個人公衆号:月牙寂道長 公衆号微信号yueyajidaozhang https://blog.csdn.net/screscent/article/details/89927081
本文微信公衆号文章連結:https://mp.weixin.qq.com/s/QCXkJS7OEQ67xwWPoiW8wg
NSQ 是實時的分布式消息處理平台,其設計的目的是用來大規模地處理每天數以十億計級别的消息。NSQ 具有分布式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規模生成環境下應用的産品。
源碼位址:https://github.com/nsqio/nsq
對于一個大型的項目來講,我個人的學習習慣于從最小版本開始學起。這是因為,在一個項目最初的時候,大體功能和架構都已經成形,最初的版本,一般來說,代碼量都較少,功能集最小。學習曲線低,并且又最初版本,慢慢往高版本過渡,也能更了解項目進化的過程,也是一個學習的過程。
并且在實際使用過程中,大多數情況下,我們可能不需要那麼多的功能集,并且需要根據實際情況做一些二次開發,此時的話,也許低版本的會更貼近實際使用場景和二次開發場景。
對于nsq的學習分析,那麼就從最低版本開始。
找到所有代碼上傳日志,找到了一個最低版本的0.1.1的最後版本。
代碼量:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAjM2EzLcd3LcJzLcJzdllmVldWYtl2PnVGcq5COiJja2QnZjJjMvwVMwkTNyITMtUGall3LcVmdhNXLwRHdo9CXt92YucWbpRWdvx2Yx5yazF2Lc9CX6MHc0RHaiojIsJye.jpeg)
代碼量非常少,通過看檔案名,也大緻能夠知道有哪些子產品。代碼一眼看過去還是非常清晰的。
那麼現在就開始從main函數開始分析。
github.com/nsqio/nsq/nsq.go
代碼的話,不全部分析,看的是主要部分,其他部分讀者可自行分析。
從圖中,紅框中,看到啟動了4個子產品。
topicFactory
uuidFactory
複制
tcpServer
複制
httpServer
複制
那麼先一個一個分析看看
topic
github.com/nsqio/nsq/topic.go
name:每個topic都有一個名稱
newChannelChan:每個topic對應有很多的chan,用于對應訂閱者,此變量用于記錄新增加的訂閱者chan
channelMap:訂閱者對應的channel的map記錄
backend:msg的緩存區
incomingMessageChan:新到msg的chan通知,用于釋出者寫入通知
msgChan:用于傳遞msg的chan,用于中間釋出給訂閱者
routerSyncChan:标記chan,用于标記有msg同步的開始
readSyncChan:标記chan,用于标記msg同步的結束
channelWriterStarted:标記chan,用于初始化時,開啟MessagePump協程開啟的标記
兩個全局變量
newTopicChan:用于生産和擷取topic
topicMap:用于存儲所有topic
此函數就是main函數中的子產品啟動之一,開啟topic工廠
for循環中,
55:接收的是newTopicChan的資訊
57:從topicMap中查找topic資訊
58:未查找到,則建立一個topic,NewTopic
62:将查找到的或者建立的topic資訊通過retChan傳回給調用者
入口則為GetTopic
建立了一個topicChan,并将其通過newTopicChan發送給topicFactory
那看看NewTopic:初始化後,在35行,開啟了一個協程topic.Router
每個topic都有一個處理協程(Router),所有的操作通過chan将資訊發送到Router中,Router中根據不同的chan,進行不同的操作。
Router中,接收的chan資訊:
1、newChannelChan
入口為:
在一個topic中會對應多個channel,用于訂閱者,并且每個訂閱者的channel都有一個name用于辨別。
在Router中:
123:從channelMap中查找對應的channel
125:若沒有找到,則NewChannel(這個待其他子產品再分析)
129:通過retchan,将channel資訊傳回給調用者
131:針對每個channel,都會開啟一個協程MessagePump(待會分析)
2、incomingMessageChan
入口為:
用于釋出者調用,釋出一個資訊。
在Router中:
136:将msg發送到msgchan中
139:若msgchan處于阻塞狀态,則将msg寫入到backend中
3、readSyncChan
這個是用于标記read sync start
148:并等待read sync end
4、exitchan
略
MessagePump
每個訂閱者的channel,都會開啟一個協程MessagePump。
for循環中
91:從msgchan中擷取msg
92:同時也從backend中擷取緩存區中未釋出的msg
101:标記read sync read start
103-107:周遊所有的channel,将讀取的msg,釋出給所有的channel
109:标記read sync read end
子產品小結:
1、GetTopic接口提供建立和擷取topic資訊。
通過topicFactory來進行處理。
2、GetChannel接口提供建立和擷取Channel
通過topic中的Router,接受newChannelChan進行處理
3、PutMessage接口提供釋出msg接口
通過topic中的Router,接受incomingMessageChan進行處理
4、通過msgChan和backend緩沖區,用于msg的傳送。并在MessagePump中進行消息的分發。
uuid
github.com/nsqio/nsq/uuid.go
代碼很簡單,是一個生成uuid的子產品。就不講解了
Tcp
github.com/nsqio/nsq/tcp.go
tcp子產品,用于tcp的監聽
16:tcp的accept
20:每個tcp連結都會建構一個client
22:每個連結的處理部分為client Handle
client
github.com/nsqio/nsq/client.go
client中有conn連結資訊,state狀态資訊,channel對應的訂閱channel資訊。
client的狀态表
提供的接口write,這個就不解釋了
Handle是每個連結的入口
82:讀取協定版本号
90:查找對應版本号的協定
97:真正的處理部分,protoclo.IOLoop
protocol
github.com/nsqio/nsq/protocol.go
是一個接口
protocol_v1
github.com/nsqio/nsq/protocol_v1.go
v1版本的協定
初始化協定
協定處理部分
29:初始化了一個bufio的reader
30:for循環開始
31:開始從連結中讀取資料
重點在
54:查找到MethodByName
56:調用查找到的Method
75:調用client的write,将結果傳回
那麼協定中提供了多少操作呢?
sub
關鍵地方:
112:擷取topic
113:擷取channel
114:将client加入到channel中進行管理
get
關鍵地方:
127:從channel中擷取Message
137-138:将msg格式化到buf中
140:将buf return
channel
github.com/nsqio/nsq/channel.go
channel子產品和topic子產品很類似
name:channel的名稱
addClientChan:用于tcp連結對應的client加入channel中的傳送chan
removeClientChan:用于删除client的傳送chan
clients:用于儲存channel中的所有連結
backend:msg緩沖區
incomingMessageChan:用于msg到來的chan
msgchan:用于msg的傳送
其餘的不解釋
NewChannel是在topic中的GetChannel接口,并在Router中調用,可以傳回topic源碼分析地方檢視。
如topic一樣,每個channel都開啟了一個channel.Router協程,同樣所有的操作都是通過chan來發送信号到Router中,進行操作。
RemoveClient通過把信号發送給removeClientChan,在Router中操作
外部發送資訊到channel中,通過incomingMessageChan發送,在Router中操作。調用的地方有topic中的MessagePump中,可以傳回topic源碼分析地方檢視。
對外還有一個GetMessage接口,調用地方是在Protocol_v1中的GET中,可以傳回Protocol_v1源碼分析地方檢視。
189:從msgchan中擷取msg
190:從backend緩沖區中擷取msg
202:将msg傳回給調用者
1、addClientChan
将新的client添加到client數組中
2、removeClientChan
周遊client數組,将查找到的client删除掉
3、incomingMessageChan
此處的操作與topic中的類似。将msg發送到msgchan中,若msgchan阻塞,則放到backend緩沖區中。
http
github.com/nsqio/nsq/http.go
httpserver中注冊了兩個處理handler:pingHandler,putHandler
pingHandler,很簡單,不解釋
主要操作:
86:擷取到topic
87:将msg發送到topic中
總結:
所有的子產品都分解完了。那麼現在把這些子產品連結起來。
tcp子產品監聽連結,每個連結生成一個client,client通過Protocol與channel聯系起來。
在Protocol中提供,SUB、GET操作,client提供write供Protocol調用
在SUB中,提供将client與channel聯系起來。
在GET中,提供将client從channel中擷取訂閱的msg,并調用client的write通過tcp發送給訂閱者
每個topic包含多個channel,每個channel對應有一個MessagePump,用于從topic中将msg分發給每個channel。topic中提供GetChannel用于建立和擷取channel。
topic提供一個topicFactory用于建立和擷取topic。
topic提供PutMessage用于釋出者釋出msg。而在http子產品中,提供putHandler,用于釋出者釋出msg,通過在putHandler中調用PutMessage接口,将msg釋出到topic中。
整個過程就是如此。在這個v0.1.1版本中,最主要的流程都有了。但此版本隻是一個單機系統。還不是分布式系統。
後續版本,待有時間的時候,再做分析。
龔浩華
月牙寂道長
qq:29185807
2019年05月07日