Routing
在這一章節,我們将介紹如何選擇性地訂閱消息的子集。
例如,我們要實作這個功能,一個消費者僅僅隻需将錯誤的日志消息儲存到磁盤檔案;另一個消費者仍然可以列印所有的日志消息。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5SMhJ2MzEDM5UWY2YGMxMGNmNTZhhzNlZWOiVGMhRTN28CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
Binding
在上一節中,我們利用綁定建立了隊列和交換機之間的綁定關系。
$channel->queue_bind($queue_name, 'logs');
這種綁定關系,可以簡單地了解為隊列對交換機中的消息感興趣。
binding_key
queue_bind() 方法的第三個參數是 routing_key,為了避免與 basic_publish() 方法的第三個參數 routing_key 搞混淆了。
這裡,我們将 queue_bind() 方法的第三個參數稱之為 binding_key。
# 使用 binding_key 建立綁定關系
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
binding_key 的意義依賴于交換機的類型。fanout 類型的交換機,會直接忽略 binding_key 的值。
direct exchange
direct 類型的交換機的路由規則很簡單,它會把消息路由給 binding_key 與 routing_key 完全比對的隊列。
我們可以使用 direct 類型的 exchange 來過濾路由到某個指定消費者的消息。而 fanout 類型的交換機無法實作這一點,因為它隻是無腦地将所有消息廣播給所有已綁定的消費者。
下面,我們來實作開頭說到的功能。
發送日志消息
為了簡化示例,我們将日志消息的嚴重性分為三種:info、warning、error。并且,将消息的嚴重性作為 routing_key,這樣,消費者就可以自由選擇接收何種嚴重性的消息。
這裡,我們先修改好消息的生産者。
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
訂閱消息
消費者隻需訂閱自己感興趣的消息。
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
最終的代碼示例
emit_log_direct.php,作為生産者,内容如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo ' [x] Sent ', $severity, ':', $data, "\n";
$channel->close();
$connection->close();
receive_logs_direct.php,作為消費者,内容如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if (empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
運作兩個消費者 C1 和 C2。
# C1,隻接收 error 日志消息,并将其儲存到磁盤檔案
php receive_logs_direct.php error > logs_from_rabbit.log
# C2,接收三種日志消息,用于列印檢視
php receive_logs_direct.php info warning error
運作一個生産者 P
# P,發送 error 日志消息
php emit_log_direct.php error "Run. Run. Or it will explode."
# P,發送 info 日志消息
php emit_log_direct.php info "it is normal."
下一節,我們将介紹如何基于 topic 來監聽消息。
參考文獻
[1] https://www.rabbitmq.com/tutorials/tutorial-four-php.html