1. 概述
在之前的文章中,我们介绍了 AMQP 协议所能实现的各种功能: AMQP 消息服务应用协议
- 存储转发(多个消息发送者,单个消息接收者)
- 分布式事务(多个消息发送者,多个消息接收者)
- 发布订阅(多个消息发送者,多个消息接收者)
- 基于内容的路由(多个消息发送者,多个消息接收者)
- 文件传输队列(多个消息发送者,多个消息接收者)
- 点对点连接(单个消息发送者,单个消息接收者)
本文中,我们就来介绍一下 rabbitmq 的各种用法。 本文以 php 为例,其他语言的用法非常类似。
2. 点对点连接
最基本的模式就是点对点模式,一个生产者向队列中投入消息,一个消费者循环从队列中取数据。
2.1. php-amqplib
- producer
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'n";
$channel->close();
$connection->close();
?>
- consumer
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL C', "n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
?>
这段代码中,producer 向名为 "hello" 的队列中放入消息 "Hello World",consumer 从其中取出消息,这是消息队列最简单的用法。 basic_consume 方法的第一个参数标识队列名称,第四个参数标识是否自动 ack,第七个参数则是收到消息后执行的回调方法。
2.2. Acknowledge
消息队列使用时,如果 consumer 意外退出,那么他没来得及处理的消息会如何处理呢? AMQP 要求消费者需要向队列发送 ACK 消息表示消息已经处理,否则这条消息还会分发给其他 consumer 去处理,以防止消息的丢失。 如果设置了 auto_ack,则 consumer 在收到消息后会立即自动发送 ACK 消息,这样在代码中无需手动发送 ack 消息,但是方便的同时带来了消息丢失的风险。 下面是手动 ack 的 consumer 改进版本:
代码语言:javascript复制<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL C', "n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "n";
sleep(3);
echo " [x] Done".PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('hello', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
?>
basic_qos 设置了队列的 prefetch_count 属性,它限制了消费者同时能够接收的消息数,设置为 1 也就意味着,在 consumer 手动发送 ack 前,队列不会再将新的消息发送给他。
这样,我们可以不再仅仅用一个 consumer 来进行消费了,我们可以同时启动多个 consumer 来实现队列消息的消费了。
2.3. PHP AMQP 扩展
下面使用 PHP 官方提供的 AMQP 扩展实现上述功能。
- producer
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
echo "Established a connection to the broker n";
} else {
echo "Cannot connect to the broker n ";
}
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('hello');
$ex = new AMQPExchange($channel);
$ex->setName('helloexchange');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
$ex->publish('Hello World!', 'hello');
?>
- consumer
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
echo "Established a connection to the broker n";
} else {
echo "Cannot connect to the broker n ";
}
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->bind('helloexchange', 'hello');
$queue->qos(0, 1);
while (1) {
if ($message = $queue->get()) {
echo " [x] Received ", $message->getBody(), "n";
sleep(3);
echo $message->getBody().PHP_EOL;
echo " [x] Done".PHP_EOL;
$queue->ack($message->getDeliveryTag());
}
}
?>
需要注意的是:
- AMQPQueue 对象的 get 方法如果以 AMQP_AUTOACK 为参数则会自动发送 ack,无参数版本则需要手动调用 ack 方法发送
- AMQPQueue 对象的 qos 方法与上面所说的 basic_qos 方法一样,设置了能够接收的消息大小和消息数,由于 rabbitmq 并没有实现对消息大小的限制,所以这里第一个参数并没有意义,我们设为了 0
- 这里涉及到 exchange 的相关概念,我们马上来了解
3. Exchange
在上面的例子中,我们已经看到了 exchange 的创建和使用,此前,在 AMQP 的介绍中,我们也介绍了协议中的 Exchange 正如 AMQP 协议中描述的,producer 是通过 exchange 将消息发送到队列的,exchange 通过消息中的 routing key 决定最终发往的队列。
上面使用 php-amqplib 的例子中,并没有出现 exchange,是因为他自动使用了默认的 exchange amq.direct 实现点对点消息队列。 事实上,producer 是不能将消息发送给队列的,他只能发送给 exchange,由 exchange 决定发送到哪个队列,exchange type 决定了消息的最终处理方式。 Exchange 共有四种 type(模式)可供选择:
- direct
- fanout
- topic
- headers
4. Exchange 的四种模式
4.1. direct
direct 方式是最常用也是最简单的方式,当 Exchange 收到消息后,会将消息转发到消息的 routing key 所指定的消息队列中。 这种模式下,queue 需要执行 bind 操作绑定到 Exchange 上并提供绑定的 routing-key。 如果在 vhost 中不存在指定的 routing-key,消息就会被丢弃。
4.2. fanout
fanout 模式就是常用的发布/订阅模式,也称为“路由表”模式。 在这种模式下,Exchange 收到的任何消息都会被转发到所有与该 Exchange 绑定的所有 Queue 上。 因此,在这种模式下 Queue 必须 bind 到 Exchange 才会被通知,进而才能使用,同时,这种模式下不需要 routing-key。 一个 Queue 可以绑定多个 Exchange,一个 Exchange 也可以绑定多个 Queue。 如果 Exchange 并没有绑定任何 Queue,那么消息就会被丢弃。
4.3. topic
这种模式比较复杂,简单的来说,就是 Exchange 会把收到的消息转发到所有关心 routing-key 的 queue 上,Exchange 通过对消息的 routing-key 进行模糊匹配查找到对应的队列。 因此,与 fanout 一样,Queue 必须 bind 到 Exchange,同时与 direct 模式一样,必须指定 routing-key。 当一个 queue 执行 bind 操作绑定到 exchange 时,需要提供他关心的 routing-key,这个 routing-key 字符串可以是一个模糊匹配字符串,# 表示 0 个或若干个关键词, 表示一个关键词,如 log. 可以匹配成功 log.warn 但不能匹配成功 log.warn.timeout,而 #.log.# 可以匹配上述两个。 如果 Exchange 没有发现任何匹配的 Queue,消息就会被丢弃。
4.4. headers
Headers 模式一般很少被用到,他根据消息 header 中的 “x-match” 属性匹配已经绑定的消息队列。
5. 发布/订阅队列
使用上面介绍的 Fanout 模式的 Exchange 就可以实现发布订阅模式的消息队列了,如果使用 Topic 模式则可以实现更加灵活的发布/订阅消息队列实现。
5.1. php-amqplib
- producer
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('access_log', false, false, false, false);
$channel->queue_declare('error_log', false, false, false, false);
$channel->queue_declare('warning_log', false, false, false, false);
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->queue_bind('access_log', 'logs');
$channel->queue_bind('error_log', 'logs');
$channel->queue_bind('warning_log', 'logs');
for ($i = 0; $i < 6; $i ) {
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, 'logs');
echo " [x] Sent 'Hello World!'n";
}
$channel->close();
$connection->close();
?>
这里我们声明了三个队列,并且全部通过 bind 操作绑定到了名为 "logs" 的 Exchange 上,然后发送了 6 条消息到 exchange,可以看到消息,与 logs exchange 绑定的三个队列都收到了 6 条消息。
6. 参考资料
Rabbitmq Tutorial — http://www.rabbitmq.com/tutorials/tutorial-one-php.html。 book.amqp.php — http://php.net/manual/pl/book.amqp.php。 Rabbitmq 消息队列在 PHP 下的应用 — http://www.cnblogs.com/phpinfo/p/4104551.html。 Rabbitmq 三种 Exchange 模式的性能比较 — http://hwcrazy.com/34195c9068c811e38a44000d601c5586/group/free_open_source_project/