消息多點傳播
使用Redis的 list 和 zset 資料結構分别可以實作隊列與延時隊列的功能,但是這兩種實作沒有辦法做到多點傳播,即一份消息可以讓多個消費者消費,
消息多點傳播是生産隻需要生産一份消息,中間件負責将消息複制到多個消息隊列中,每個消息隊列由對應的消費組進行消費,
消息多點傳播是分布式系統常用的一種解耦方式,每個消費組的處理邏輯不同,可以将消費組放在不同的系統中,
如果消息隻有一份的話,則隻能将所有的處理邏輯放在同一個系統中,不同的消費組通過内部傳遞共同使用一份消息。
PubSub
Redis通過 PubSub 子產品支援消息多點傳播,即 PublisherSubscriber (釋出/訂閱者模式)。
Java使用Jedis示範消息多點傳播:
可以看到一個釋出者釋出的消息可以被多個消費者消費到,
需要注意的是生産者和消費者的連接配接必須使用不同的連接配接,redis不允許連接配接在subscribe消息時進行其他操作。
完整代碼:
https://github.com/qiaomengnan16/redis-demo/tree/main/redis-pub-sub訂閱模式
多主題
消息訂閱支援消費者訂閱多個主題,即 subscribe 多個主題名稱
127.0.0.1:6379> subscribe c1 c2 c3
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "c1"
3) (integer) 1
1) "subscribe"
2) "c2"
3) (integer) 2
1) "subscribe"
2) "c3"
3) (integer) 3
publish給多個主題發送消息
127.0.0.1:6379> publish c1 helloc1
(integer) 1
127.0.0.1:6379> publish c2 helloc2
(integer) 1
127.0.0.1:6379> publish c3 helloc3
(integer) 1
127.0.0.1:6379>
此時可以看到,消費者收到了多個主題的消息。
模式訂閱
如果此時需要新增c4、c5主題的話,用戶端又需要重新進行 subscribe 将需要訂閱的加入進去,
是以redis提供了 pattern subscribe,這樣就可以一次訂閱多個主題,即使增加了新主題,消費者也可以立即收到消息。
即 psubscribe c* ,訂閱c開頭的主題,這樣所有c開頭的消息,這邊都能消費到了。
127.0.0.1:6379> psubscribe c*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c*"
3) (integer) 1
1) "pmessage"
2) "c*"
3) "c4"
4) "helloc4"
1) "pmessage"
2) "c*"
3) "c5"
4) "helloc5"
消息結構
消費者接收到消息時不單單僅有message一個資訊,還有其他幾個内容。
- data 即消息的内容。
- channel 即訂閱主題名稱。
- type 消息類型,取值有 message(普通消息)、subscribe(訂閱指令回報)、psubscribe(模式訂閱回報)、unsubscribe(取消訂閱指令回報)、punsubscribe(取消模式訂閱回報)。
- pattern 即目前消息使用哪種模式訂閱得到,通過 subscribe 訂閱的即為空。
缺點
當生産者釋出一個消息時,Redis會找到相應的消費者發送過去,如果沒有消費者的話,此時這個消息将被丢棄,
如果有三個消費者,挂掉了一個,另外兩個可以正常消費生産者的消息,但是挂掉的那個消費者重連的時候,挂掉期間内的消費,将無法消費到,即丢消息了,
PubSub的消息不會被持久化,是以Redis重新開機或者當機,這些消息将會被直接丢棄。
由于PubSub無法保證消息的可靠性,易丢失,在需要保證消息可靠性的消息隊列的場景中,基本沒有合适的應用場景,
如果需要保證消息的可靠性,Redis5.0新增了Stream資料結構,該結構支援持久化,但是PubSub不是一無是處,适用于一些時效性要求高的場景。
例如Master要求所有的Slave上報一下目前自身的負載,可以用過PubSub釋出一條指令,如果有Slave處于當機狀态也沒事,
當機自然無法上傳,當機恢複後也不需要再上傳,因為過了時效性,隻需後面收到指令時在彙報實時的負載即可,
如果作為指令中介的Redis執行個體重新開機或當機,消息也沒必要持久化指令在隊列中,因為當Redis恢複後已經過了要求上報的時效期,
也可以用作線上叢集間的通信,例如一個節點通知其他節點,給所有線上節點發送一個消息。