天天看點

python redis 消息隊列 回調_python中的Redis鍵空間通知(過期回調)

介紹

Redis是一個記憶體資料結構存儲庫,用于緩存,高速資料攝取,處理消息隊列,分布式鎖定等等。

使用Redis優于其他記憶體存儲的優點是Redis提供持久性和資料結構,如清單,集合,有序集和散列。

在本文中,我想簡要介紹一下Redis鍵空間通知。我将解釋鍵空間通知是什麼,并示範如何配置Redis以接收它們。然後我将向您展示如何在python中訂閱Redis通知。

啟用鍵空間通知

預設情況下,禁用鍵空間事件通知。我們可以在redis.conf或redis-cli中啟用它們,如下所示:

$ redis-cli config set notify-keyspace-events KEA

OK

該KEA字元串意味着每一個可能的事件被啟用。要檢視每個字元的含義,請檢視文檔。

該CLI可以在特殊模式下,它允許您訂閱的頻道,以接收郵件的工作。

現在讓我們檢查事件是否正常:

$ redis-cli --csv psubscribe '*'

Reading messages... (press Ctrl-C to quit)

"psubscribe","*",1

psubscribe '*'意味着我們想要使用模式訂閱所有事件*。在新的終端輸入redis-cli和SET key1tovalue1。

127.0.0.1:6379> set key1 value1

OK

在第一個終端,您将看到:

$ redis-cli --csv psubscribe '*'

Reading messages... (press Ctrl-C to quit)

"psubscribe","*",1

"pmessage","*","[email protected]__:key1","set"

"pmessage","*","[email protected]__:set","key1

通知正在運作:)

Redis鍵空間通知

Redis密鑰空間通知自2.8.0版開始提供。對于每個更改任何Redis密鑰的操作,我們可以配置Redis将消息釋出到Pub / Sub。然後我們可以訂閱這些通知。值得一提的是,隻有在真正修改了密鑰時才會生成事件。例如,删除不存在的密鑰不會生成事件。

上面你收到了三個事件:

"psubscribe","*",1

"pmessage","*","[email protected]__:key1","set"

"pmessage","*","[email protected]__:set","key1

第一個事件意味着我們已成功訂閱作為回複中第二個元素的通道。1表示我們目前訂閱的頻道數。第二個事件是密鑰空間通知。在密鑰空間信道中,我們收到了事件的名稱set作為消息。第三個事件是關鍵事件通知。在keyevent頻道中,我們收到了密鑰的名稱key1作為消息。

Redis Pub / Sub

使用Redis的Pub / Sub圖層傳遞事件。

為了訂閱頻道channel1和channel2,用戶端發出一個訂閱與頻道的名稱指令:

SUBSCRIBE channel1 channel2

其他客戶(釋出者)發送到這些頻道的消息将由Redis推送到所有訂閱的用戶端(訂閱者)。

Redis Pub / Sub實作支援模式比對。用戶端可以訂閱glob樣式模式,以便使用PSUBSCRIBE接收發送到與給定模式比對的通道名稱的所有消息。

例如:

PSUBSCRIBE channel*

将接收所有發來的短信channel1,channel.b等等。

如果您的釋出/訂閱用戶端斷開連接配接并稍後重新連接配接,則在用戶端斷開連接配接期間傳遞的所有事件都将丢失。

Redis為每個用戶端維護一個用戶端輸出緩沖區。Pub / Sub的用戶端輸出緩沖區的預設限制設定為:

client-output-buffer-limit pubsub 32mb 8mb 60

Redis将強制用戶端在兩種情況下斷開連接配接:如果輸出緩沖區增長超過32MB,或者輸出緩沖區持續保持8MB資料60秒。

這些迹象表明客戶消費資料的速度比釋出時慢。

将來有計劃允許更可靠的事件傳遞,但可能會在更一般的層面上解決,要麼為Pub / Sub本身帶來可靠性,要麼允許Lua腳本攔截Pub / Sub消息以執行推送等操作把事件放到一個清單中。

訂閱python中的通知

首先我們需要Redisredis-py的python用戶端,是以讓我們安裝它:

$ pip install redis

事件循環

看看下面的代碼。它訂閱所有鍵空間通知并列印任何收到的。

import time

from redis import StrictRedis

redis = StrictRedis(host='localhost', port=6379)

pubsub = redis.pubsub()

pubsub.psubscribe('[email protected]__:*')

print('Starting message loop')

while True:

message = pubsub.get_message()

if message:

print(message)

else:

time.sleep(0.01)

這就是我們建立Redis連接配接的方式:

redis = StrictRedis(host='localhost', port=6379)

預設情況下,所有響應都以位元組形式傳回。使用者負責解碼它們。如果應解碼來自用戶端的所有字元串響應,則使用者可以将SID_responses = True指定為StrictRedis。在這種情況下,任何傳回字元串類型的Redis指令都将使用指定的編碼進行解碼。

接下來,我們建立一個pubsub對象,該對象訂閱一個頻道并偵聽新消息:

pubsub = redis.pubsub()

pubsub.psubscribe('[email protected]__:*')

然後我們通過無限循環等待事件:

while True:

message = pubsub.get_message()

...

如果有資料,get_message()将讀取并傳回它。如果沒有資料,則該方法将傳回None。

從pubsub執行個體讀取的每條消息都是一個包含以下鍵的字典:

鍵入:下列之一:subscribe,unsubscribe,psubscribe,punsubscribe,message,pmessage

channel:訂閱的頻道或釋出消息的頻道

pattern:比對已釋出消息的通道的模式(除類型外在所有情況下均為Nonepmessage)

data:消息資料

現在啟動python腳本,在另一個終端輸入帶有值的redis-cli和SET鍵mykeymyvalue

127.0.0.1:6379> set mykey myvalue

OK

您将看到腳本的以下輸出:

$ python subscribe.py

Starting message loop

{'type': 'psubscribe', 'data': 1, 'channel': b'[email protected]__:*', 'pattern': None}

{'type': 'pmessage', 'data': b'set', 'channel': b'[email protected]__:mykey', 'pattern': b'[email protected]__:*'}

回調

也可以注冊回調函數來處理已釋出的消息。消息處理程式隻接受一個參數即消息。要使用消息處理程式訂閱通道或模式,請将通道或模式名稱作為關鍵字參數傳遞,其值為回調函數。當使用消息處理程式在通道或模式上讀取消息時,将建立消息字典并将其傳遞給消息處理程式。在這種情況下,從get_message()傳回None值,因為消息已經處理完畢。

import time

from redis import StrictRedis

redis = StrictRedis(host='localhost', port=6379)

pubsub = redis.pubsub()

def event_handler(msg):

print('Handler', msg)

pubsub.psubscribe(**{'[email protected]__:*': event_handler})

print('Starting message loop')

while True:

message = pubsub.get_message()

if message:

print(message)

else:

time.sleep(0.01)

127.0.0.1:6379> set mykey myvalue

OK

如您所見,set事件mykey由event_handler回調處理。

$ python subscribe2.py

Starting message loop

{'pattern': None, 'channel': b'[email protected]__:*', 'data': 1, 'type': 'psubscribe'}

Handler {'pattern': b'[email protected]__:*', 'channel': b'[email protected]__:mykey', 'data': b'set', 'type': 'pmessage'}

單獨線程中的事件循環

另一種選擇是在單獨的線程中運作事件循環:

import time

from redis import StrictRedis

redis = StrictRedis(host='localhost', port=6379)

def event_handler(msg):

print(msg)

thread.stop()

pubsub = redis.pubsub()

pubsub.psubscribe(**{'[email protected]__:expired': event_handler})

thread = pubsub.run_in_thread(sleep_time=0.01)

上面的代碼建立了一個新線程并啟動了事件循環。處理完第一個過期事件後,我們使用該thread.stop()方法關閉事件循環和線程。

在幕後,這隻是一個圍繞get_message()的包裝器,它在一個單獨的線程中運作。run_in_thread()采用可選sleep_time參數。如果指定,則事件循環将使用循環的每次疊代中的值調用time.sleep()。

127.0.0.1:6379> set mykey myvalue ex 1

OK

預期産量:

$ python subscribe3.py

{'type': 'pmessage', 'channel': b'[email protected]__:expired', 'pattern': b'[email protected]__:expired', 'data': b'mykey'}

概要

Redis的一個常見用例是,當應用程式需要能夠響應存儲在特定密鑰或密鑰中的值可能發生的更改時。感謝密鑰空間通知和Pub / Sub,我們可以響應Redis資料中的更改。通知非常容易使用,而事件處理器可以在地理上分布。

最大的缺點是Pub / Sub實作要求釋出者和訂閱者一直處于啟動狀态。訂閱伺服器在停止或連接配接丢失時會丢失資料。

連結

原文參考:https://tech.webinterpret.com/redis-notifications-python/