天天看點

RabbitMQ 入門指南(四)

Routing

在這一章節,我們将介紹如何選擇性地訂閱消息的子集。

例如,我們要實作這個功能,一個消費者僅僅隻需将錯誤的日志消息儲存到磁盤檔案;另一個消費者仍然可以列印所有的日志消息。

RabbitMQ 入門指南(四)

Binding

在上一節中,我們利用綁定建立了隊列和交換機之間的綁定關系。

$channel->queue_bind($queue_name, 'logs');
           

這種綁定關系,可以簡單地了解為隊列對交換機中的消息感興趣。

binding_key

queue_bind() 方法的第三個參數是 routing_key,為了避免與 basic_publish() 方法的第三個參數 routing_key 搞混淆了。

這裡,我們将 queue_bind() 方法的第三個參數稱之為 binding_key。

# 使用 binding_key 建立綁定關系
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
           

binding_key 的意義依賴于交換機的類型。fanout 類型的交換機,會直接忽略 binding_key 的值。

direct exchange

direct 類型的交換機的路由規則很簡單,它會把消息路由給 binding_key 與 routing_key 完全比對的隊列。

我們可以使用 direct 類型的 exchange 來過濾路由到某個指定消費者的消息。而 fanout 類型的交換機無法實作這一點,因為它隻是無腦地将所有消息廣播給所有已綁定的消費者。

下面,我們來實作開頭說到的功能。

發送日志消息

為了簡化示例,我們将日志消息的嚴重性分為三種:info、warning、error。并且,将消息的嚴重性作為 routing_key,這樣,消費者就可以自由選擇接收何種嚴重性的消息。

這裡,我們先修改好消息的生産者。

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
           

訂閱消息

消費者隻需訂閱自己感興趣的消息。

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}
           

最終的代碼示例

emit_log_direct.php,作為生産者,内容如下:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo ' [x] Sent ', $severity, ':', $data, "\n";

$channel->close();
$connection->close();
           

receive_logs_direct.php,作為消費者,内容如下:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();
           

運作兩個消費者 C1 和 C2。

# C1,隻接收 error 日志消息,并将其儲存到磁盤檔案
php receive_logs_direct.php error > logs_from_rabbit.log
           
# C2,接收三種日志消息,用于列印檢視
php receive_logs_direct.php info warning error
           

運作一個生産者 P

# P,發送 error 日志消息
php emit_log_direct.php error "Run. Run. Or it will explode."

# P,發送 info 日志消息
php emit_log_direct.php info "it is normal."
           

下一節,我們将介紹如何基于 topic 來監聽消息。

參考文獻

[1] https://www.rabbitmq.com/tutorials/tutorial-four-php.html