天天看點

利用延時隊列實作自動定時重試 - 基于PHP實作測試步驟代碼結構代碼

測試步驟

  1. 搭建代碼架構
    • 建立測試項目的目錄 mq
    • 從 https://github.com/php-amqplib/php-amqplib 下載下傳AMQP庫(當然也可以通過 composer 安裝,這裡為了簡單直接自己處理了),放入 mq 目錄
    • 編寫 index.php,實作自動加載
  2. 建立 test 目錄,編寫生産者和消費者
  3. 跑腳本:
    • 開啟生産者:

      php -f index.php retryP p

    • 開啟消費者:

      php -f index.php retryC c

代碼結構

├─PhpAmqpLib
│  ├─Channel
│  ├─Connection
│  ├─Exception
│  ├─Exchange
│  ├─Helper
│  │  └─Protocol
│  ├─Message
│  └─Wire
│      └─IO
├─test
│  ├─retryP.php
│  └─retryC.php
└─index.php
           

代碼

index.php

<?php

function my_autoloader($cName) {
	include(__DIR__."/".$cName.".php");
}

spl_autoload_register("my_autoloader");

if (isset($argv[2])) {
	$cname = '\test\\'.$argv[1];
	if (!class_exists($cname)) {
		exit("class (".$cname.") not exists".PHP_EOL);
	}
	$c = new $cname();
	if (!method_exists($c, $argv[2])) {
		exit("method (".$argv[2].") not exists".PHP_EOL);
	}
	if (isset($argv[3])) {
		call_user_func(array($c, $argv[2]), $argv[3]);
	} else {
		call_user_func(array($c, $argv[2]));
	}
} else if (isset($argv[1])) {
	if (!function_exists($argv[1])) {
		exit("function (".$argv[1].") not exists".PHP_EOL);
	}
	$argv[1]();
} else {
	exit("please input at least one argument".PHP_EOL);
}
           

生産者

<?php

namespace test;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class retryP {
	private $host = 'localhost';
	private $port = 5672;
	private $user = 'guest';
	private $password = 'guest';
	
	// 可能丢失
	public function p() {
		$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
		$channel = $connection->channel();

		$channel->exchange_declare('retry.retryExchange', 'direct', false, true, false, false);
		
		$channel->queue_declare('retry.normalQueue', false, true, false, false); // retryQueue 的死信隊列,由消費者訂閱
		$channel->queue_declare('retry.retryQueue', false, true, false, false, false, new AMQPTable(
			[
				'x-dead-letter-exchange' => 'retry.retryExchange',
				'x-dead-letter-routing-key' => 'normal',
				'x-message-ttl' => 3 * 1000
			]
		)); // retryQueue 沒有消費者,逾時後發到死信隊列
		$channel->queue_declare('retry.failQueue', false, true, false, false); // 超過重試次數,人工處理
		
		$channel->queue_bind('retry.retryQueue', 'retry.retryExchange', 'retry');
		$channel->queue_bind('retry.normalQueue', 'retry.retryExchange', 'normal');
		$channel->queue_bind('retry.failQueue', 'retry.retryExchange', 'fail');

		// 準備消息
		$msg = new AMQPMessage(json_encode(["data"=>"something", "retryTime"=>1]));
		$channel->basic_publish($msg, 'retry.retryExchange', 'normal');
		echo "basic_publish success";

		$channel->close();
		$connection->close();
	}
}
           

消費者

<?php

namespace test;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class retryC {
	private $host = 'localhost';
	private $port = 5672;
	private $user = 'guest';
	private $password = 'guest';
	
	private $connection;
	private $channel;
	
	public function __construct() {
		$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
		$this->channel = $this->connection->channel();
	}
	
	// 正常隊列
	public function c() {		
		//閉包回調函數
		$callback = function ($msg) {
			echo $msg->body;
			echo PHP_EOL;
			$data = json_decode($msg->body, true);
			// 超過重試次數,放入死信隊列
			if ($data["retryTime"] > 3) {
				$msg->delivery_info['channel']->basic_publish($msg, 'retry.retryExchange', 'fail');
				echo "retryTime > 3, fail" . PHP_EOL;
			}
			// 消費消息,進行業務處理
			$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
			// 業務處理失敗,向 retryQueue 發消息
			$msgNew = new AMQPMessage(json_encode(["data"=>$data["data"], "retryTime"=>$data["retryTime"]+1]));
			$msg->delivery_info['channel']->basic_publish($msgNew, 'retry.retryExchange', 'retry');
		};
		$this->channel->basic_qos(null, 1, null);
		$this->channel->basic_consume('retry.normalQueue', '', false, false, false, false, $callback); // 需要手動确認

		while (count($this->channel->callbacks)) {
			$this->channel->wait();
		}
		$this->channel->close();
		$this->connection->close();
	}
}