天天看點

PHP使用Redis的List(清單)指令實作消息隊列

1.用到的List(清單)指令

指令 作用
lPush 将一個或多個值插入到清單頭部
rpoplpush 彈出清單最後一個值,同時插入到另一個清單頭部,并傳回該值
lRem 删除清單内的給定值
lIndex 按索引擷取清單内的值

2.隊列的組成

名稱 職責
生産者 釋出消息
消費者 擷取并處理消息
監聽者 監聽逾時的消息,彈回原消息隊列,確定消費者挂掉後或處理失敗後消息能被其他消費者處理

3.php實作代碼

Producter.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:13
 */
try {
    //聲明消息隊列-list的鍵名
    $queueKey = 'testQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //向清單中push10條消息
    for ($i = 0;$i < 10;$i++){
        //為消息生成唯一辨別
        $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
        $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
        var_dump($ret);
    }

} catch (Exception $e){
    echo $e->getMessage();
}           

Consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:14
 */
try {
    //聲明消息隊列-list的鍵名
    $queueKey = 'testQueueKey';
    //聲明監聽者隊列-list的鍵名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //隊列先進先出,彈出最先加入的消息,同時放入監聽隊列
    while (true){
        $ret = $redis->rpoplpush($queueKey, $watchQueueKey);
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            //将唯一id寫入緩存設定有效期
            $redis->setex($retArray['uniqid'], 60, 0);
            //模拟失敗
            $rand = mt_rand(0,9);
            if ($rand < 3){
                echo "failure:".$ret."\n";
            } else {
                //todo
                //處理成功移除消息
                $redis->lRem($watchQueueKey, $ret, 0);
                echo "success:".$ret."\n";
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}           

Watcher.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:15
 */
try {
    //聲明消息隊列-list的鍵名
    $queueKey = 'testQueueKey';
    //聲明監聽者隊列-list的鍵名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);

    while (true){
        //取出清單尾部的一個值
        $ret = $redis->lIndex($watchQueueKey, -1);
        //如果不存在則休眠1秒
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            $idCache = $redis->get($retArray['uniqid']);
            if ($idCache === false){
                //如果已過期,表示任務逾時,彈回原隊列
                $redis->rpoplpush($watchQueueKey, $queueKey);
                echo "rpoplpush:".$ret."\n";
            } else {
                //進行中,繼續等待
                sleep(1);
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}           

4.執行隊列

開啟監聽者

php Watcher.php

開啟消費者

php Consumer.php

執行生産者

php Producter.php

生産者輸出

int(1)
int(2)
int(3)
int(4)
int(5)
int(6)
int(7)
int(8)
int(9)
int(10)           

監聽者輸出

rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}           

消費者輸出

success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}           

我們看到消費者第一次執行時失敗的消息,逾時後又被彈回了消息隊列,消費者有了再次執行的機會,監聽者的職責就是確定消費者執行失敗或挂掉後消息還能再彈回原隊列得到再次執行

原文位址:

https://www.jmsite.cn/blog-615.html