【MQ03】发布订阅模式

2024-01-30 13:47:47 浏览数 (2)

发布订阅模式

上一回我们已经学习了最典型的消息队列的应用。接下来,我们就要学习到的是消息队列中的另一个非常常见的模式。这个模式其实也是一种设计模式,它叫做发布订阅模式。之前我们学习过的,一个叫生产者,一个叫消费者。而到了这边,我们将生产者改个名字叫做发布者,它们两者之间可以看成是完全一样的。而消费者则变成了订阅者,这个就有很大的不同了。

发布订阅

对于传统的模式来说,一个消费者消费一条消息,这条消息被消费之后就不会再次被其它的消费者消费。而在发布订阅模式中,一条消息是可以被多个消费者消费的,这些消费者其实相当于是订阅了这条队列的消息。当有新的消息出现在队列中,就会像广播一样让所有订阅者都获得这条消息。

这种功能的应用场景是?假设我们有一个电商系统。当客户下单之后,是不是要马上通知商家、并且客户自己也会收到相应的订单确认信息。或者我们可以这样理解,一个事件被触发后,需要激活多个其它的功能。

如果是传统的同步代码,我们需要这样写:

代码语言:javascript复制
// 下订单
// 订单入库
// 商家发送消息
// 商家发送邮件
// 客户发送消息
// 客户发送邮件

而使用发布订阅模式的话,我们就可以拆分为两个部分。下订单流程在最后直接调用发布者负责发出订单号即可。

代码语言:javascript复制
// 下订单
// 订单入库
// 发布者发布消息 publish(订单号)

发布者到这里就结束了。我们的主订单流程就可以返回成功的信息了。之后的操作,就通过异步,让相应的订阅者去实现吧。

代码语言:javascript复制
// 订阅者一,获取订单号,发送消息

// 订阅者二,获取订单号,发送邮件

// 订阅者三,获取订单号,向客户发送消息

// 订阅者四,获取订单号,向客户发送邮件

不管是性能还是业务逻辑,其实这样的处理都是更好的。为什么呢?业务解耦的核心就是核心业务代码和非核心业务代码的分离。比如说在下订单的流程中,订单流程是最核心的部分,我们要保证这个过程的顺畅和无误。而下单之后的消息通知,说实话,并不是最核心的流程。即使没有通知,大家也可以通过客户端或者系统进行自主的查询。

总之,我们要实现的就是,基于一个事件传递出来的消息,可以通过不同的客户端进行消费。这就是发布订阅模式。

RabbitMQ实现

RabbitMQ 中,有交换机这一概念。交换机中,又有几种交换模式,其中,fanout 扇出交换,就是一个典型的发布订阅模式。和之前的例子中不同的就是,我们会多出一个定义交换机的步骤。还是先来看一下信息的发送方,之前我们叫做生产者,现在叫做发布者的代码。

代码语言:javascript复制
// 3.rq.p.php
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

require_once "./vendor/autoload.php";

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 定义交换机
$channel->exchange_declare('orders', 'fanout', false, false, false);

$data = '订单号:' . time();
$msg = new AMQPMessage($data);

// 注意,这里是指定的交换机,第三个参数还是队列名,之前普通队列我们指定的是第三个参数
$channel->basic_publish($msg, 'orders');

echo '[x] 发送消息 ', $data, 'n';

$channel->close();
$connection->close();

不同的地方已经在注释中说明了。如果用 Laravel 或者 TP 框架来类比的话,交换机可以看作是一个路由,而队列就是我们的控制器。由交换机来决定我们的数据应该放到哪个队列或者去哪个队列去取。扇出的意思就是和该交换机相关的队列都会收到相同的一份消息数据。我们在上面的代码中,以及后面的订阅者中都不会指定具体队列名,这样的话,RabbitMQ 就会自动生成队列,不需要我们过多去关注具体是使用哪一个队列。

好了,消费者,现在我们叫做订阅者的代码也改动不大,但这回有两个订阅者,一个是发送站内应用消息,一个是发送短信。

代码语言:javascript复制
// 3.rq.c.sms.php
use PhpAmqpLibConnectionAMQPStreamConnection;

require_once "./vendor/autoload.php";

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('orders', 'fanout', false,false,false);
// 使用空队列名,由 RabbitMQ 生成随机队列名
[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
// 队列绑定到 orders 交换机
$channel->queue_bind($queue_name, 'orders');

echo "[x] 等待数据,退出请按 CTRL Cn";

$callback = function($msg) {
    echo '[x] 接收到 ', $msg->body, ",开始向相关方发送短信....n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while($channel->is_open()){
    $channel->wait();
}

$channel->close();
$connection->close();

// 3.rq.c.msg.php
use PhpAmqpLibConnectionAMQPStreamConnection;

require_once "./vendor/autoload.php";

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('orders', 'fanout', false,false,false);
// 使用空队列名,由 RabbitMQ 生成随机队列名
[$queue_name, ,] = $channel->queue_declare('',false, false, true,false);
// 队列绑定到 orders 交换机
$channel->queue_bind($queue_name, 'orders');

echo "[x] 等待数据,退出请按 CTRL Cn";

$callback = function($msg) {
    echo '[x] 接收到 ', $msg->body, ",开始向相关方发送站内消息....n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while($channel->is_open()){
    $channel->wait();
}

$channel->close();
$connection->close();

当然,我们并没有真实的发送信息,这里只是通过伪代码的方式直接输出了一下。这两段代码唯一的差别其实就是在回调函数中的 echo 内容不同。

好了,现在我们有了一个发布者和两个订阅者。接下来就可以开始测试了。

代码语言:javascript复制
# 命令行1
> php 3.rq.c.msg.php  
[x] 等待数据,退出请按 CTRL C

# 命令行2
> php 3.rq.c.sms.php 
[x] 等待数据,退出请按 CTRL C

分别运行起两个订阅者之后,它们就进入了监听模式,等待消息队列中的数据。那么我们就来调用发布者进行发布吧。

代码语言:javascript复制
> php 3.rq.p.php 
[x] 发送消息 订单号:1672212730n%  

赶紧看看订阅者那边吧。

代码语言:javascript复制
# 命令行1
> php 3.rq.c.msg.php  
[x] 等待数据,退出请按 CTRL C
[x] 接收到 订单号:1672212730,开始向相关方发送站内消息....

# 命令行2
> php 3.rq.c.sms.php 
[x] 等待数据,退出请按 CTRL C
[x] 接收到 订单号:1672212730,开始向相关方发送短信....

两个订阅者同时都接收到数据并且开始处理了。大家可以继续测试调用发布者进行消息发送,每次两个订阅者都会马上收到消息并进行处理。同样的,也可以再添加更多的订阅者来处理更多的业务场景。

Redis 实现

使用 RabbitMQ 实现发布订阅模式很简单吧,但使用 Redis 更简单,总共只需要两个方法,几行代码就可以实现。

代码语言:javascript复制
// 3.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$data = '订单号:' . time();

$redis->publish('orders', $data);

echo '[x] 发送消息 ', $data, 'n';

发布者就是调用一个 publish() 方法就可以了,这个在 redis-cli 中也是有相应的命令行的,之前我们在 Redis 系统中都学过。第一个参数是发布的频道名称,第二个是具体的数据内容。

代码语言:javascript复制
// 3.rs.c.sms.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // 要设置连接超时时间,要不一会就断了

echo "[x] 等待数据,退出请按 CTRL Cn";

$redis->subscribe(['orders'], function($r,$c,$msg){
    echo '[x] 接收到 ', $msg, ",开始向相关方发送短信....n";
});

// 3.rs.c.msg.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // 要设置连接超时时间,要不一会就断了

echo "[x] 等待数据,退出请按 CTRL Cn";

$redis->subscribe(['orders'], function($r,$c,$msg){
    echo '[x] 接收到 ', $msg, ",开始向相关方发送站内消息....n";
});

订阅者只需要实现 subscribe() 方法,而且这个方法是直接就会挂起当前应用程序的,不需要我们再使用 while 来做死循环挂起。一个 subscribe() 方法可以监听多个发布频道,所以它的第一个参数是数组。第二个参数就是一个回调函数,这个函数有三个参数,分别是 redis实例、频道名称、消息内容 。

在这里还需要注意的是,Redis 在使用 subscribe() 挂起程序的时候,要设置一下连接超时时间,要不过一会超过默认的连接超时时间后就会断开连接了。设置成 -1 就是一直保持连接,就像长连接一样。如果是在生产环境,也可以在外面再套一层循环,然后 try..catch 一下 subscribe() ,这样当连接中断之后,可以通过死循环挂起再次调用 connect() 连接服务器。这个大家有兴趣或者确实需要用到的话,可以自己再找一下相关资料哦。

总结

使用发布订阅模式时需要注意的一点是,如果我们的订阅者是在消息发布之后才开始订阅的,那么之前发布的消息是没有办法进行消费的。也就是说,一条消息数据,只对当时已经订阅的客户端会发送数据,就像广播一样,如果你现在打开了收音机,正在听某个频道,那么你就能听到这个频道里面正在播出的内容。而如果你根本就没有打开收音机,或者根本没有调到指定的频道,自然也就听不到当前正在播放的内容啦。

不过也有例外,之前我们学习过,Redis 中的 Stream 也是一种发布订阅模式的实现,而且它的消费数据是不会删除的,新的订阅者可以选择性地消费之前的内容。RabbitMQ 中没有这样的功能。

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/3.rq.c.msg.php

https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/3.rq.c.sms.php

https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/3.rq.p.php

https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/3.rs.c.msg.php

https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/3.rs.c.sms.php

https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/3.rs.p.php

0 人点赞