1.用到的List(清單)指令
指令 | 作用 |
---|---|
lPush | 将一個或多個值插入到清單頭部 |
rpoplpush | 彈出清單最後一個值,同時插入到另一個清單頭部,并傳回該值 |
lRem | 删除清單内的給定值 |
lIndex | 按索引擷取清單内的值 |
2.隊列的組成
名稱 | 職責 |
---|---|
生産者 | 釋出消息 |
消費者 | 擷取并處理消息 |
監聽者 | 監聽逾時的消息,彈回原消息隊列,確定消費者挂掉後或處理失敗後消息能被其他消費者處理 |
3.php實作代碼
Producter.php
<?php
/**
* Created by PhpStorm.
* User: jmsite.cn
* Date: 2019/1/26
* Time: 0:13
*/
try {
//聲明消息隊列-list的鍵名
$queueKey = 'testQueueKey';
$redis = new Redis();
$redis->connect('192.168.75.132', 6379);
//向清單中push10條消息
for ($i = 0;$i < 10;$i++){
//為消息生成唯一辨別
$uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
$ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
var_dump($ret);
}
} catch (Exception $e){
echo $e->getMessage();
}
Consumer.php
<?php
/**
* Created by PhpStorm.
* User: jmsite.cn
* Date: 2019/1/26
* Time: 0:14
*/
try {
//聲明消息隊列-list的鍵名
$queueKey = 'testQueueKey';
//聲明監聽者隊列-list的鍵名
$watchQueueKey = 'watchQueueKey';
$redis = new Redis();
$redis->connect('192.168.75.132', 6379);
//隊列先進先出,彈出最先加入的消息,同時放入監聽隊列
while (true){
$ret = $redis->rpoplpush($queueKey, $watchQueueKey);
if ($ret === false){
sleep(1);
} else {
$retArray = json_decode($ret, true);
//将唯一id寫入緩存設定有效期
$redis->setex($retArray['uniqid'], 60, 0);
//模拟失敗
$rand = mt_rand(0,9);
if ($rand < 3){
echo "failure:".$ret."\n";
} else {
//todo
//處理成功移除消息
$redis->lRem($watchQueueKey, $ret, 0);
echo "success:".$ret."\n";
}
}
}
} catch (Exception $e){
echo $e->getMessage();
}
Watcher.php
<?php
/**
* Created by PhpStorm.
* User: jmsite.cn
* Date: 2019/1/26
* Time: 0:15
*/
try {
//聲明消息隊列-list的鍵名
$queueKey = 'testQueueKey';
//聲明監聽者隊列-list的鍵名
$watchQueueKey = 'watchQueueKey';
$redis = new Redis();
$redis->connect('192.168.75.132', 6379);
while (true){
//取出清單尾部的一個值
$ret = $redis->lIndex($watchQueueKey, -1);
//如果不存在則休眠1秒
if ($ret === false){
sleep(1);
} else {
$retArray = json_decode($ret, true);
$idCache = $redis->get($retArray['uniqid']);
if ($idCache === false){
//如果已過期,表示任務逾時,彈回原隊列
$redis->rpoplpush($watchQueueKey, $queueKey);
echo "rpoplpush:".$ret."\n";
} else {
//進行中,繼續等待
sleep(1);
}
}
}
} catch (Exception $e){
echo $e->getMessage();
}
4.執行隊列
開啟監聽者
php Watcher.php
開啟消費者
php Consumer.php
執行生産者
php Producter.php
生産者輸出
int(1)
int(2)
int(3)
int(4)
int(5)
int(6)
int(7)
int(8)
int(9)
int(10)
監聽者輸出
rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
消費者輸出
success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
我們看到消費者第一次執行時失敗的消息,逾時後又被彈回了消息隊列,消費者有了再次執行的機會,監聽者的職責就是確定消費者執行失敗或挂掉後消息還能再彈回原隊列得到再次執行
原文位址:
https://www.jmsite.cn/blog-615.html