延时队列与优先级队列
在消息队列的最后一篇文章中,我们再来学习两个非常常见的队列功能。一个是延时队列,一个是优先级队列。它们的应用场景非常多,也非常有意思,不同的消息队列工具都提供了不同的实现,同样的,Redis 在 Laravel 框架中还是通过逻辑代码来实现类似功能的,非常值得大家来好好研究一下。
延时队列
延时队列,从名字就可以看出,队列里面的消息会延时一会,也就是等一会才会被消费。这个功能非常常用,比如说最经典的就是电商中下订单后不支付。通常,我们会设定一个时间,比如 20 分钟内如果不支付,订单就自动取消。这个功能就可以通过延时队列来实现,下订单后,马上向延时队列发送一条消息,并且设置延迟时间为 20 分钟。然后等 20 分钟之后,消费者开始消费这条消息,可以简单的判断一下比如订单还是未支付状态,就把订单状态改为关闭的。
其它还有很多例子,比如像是定时采集爬虫之类的,也是这种延时队列的常见场景。总之,这种队列也是一种非常常见的队列功能。我们先来看一下,在 Laravel 框架中,使用 Redis 队列驱动是如何实现这个延时队列功能的。
Laravel框架中使用 Redis 实现
在 Laravel 中,只需要在任务分发,也就是入队的时候,使用一个 delay() 方法就可以了。
代码语言:javascript复制// app/Console/Commands/P6.php
// ………………
public function handle()
{
Queue6::dispatch('任务发送时间:' . date('Y-m-d H:i:s'))
->delay(now()->addSeconds(random_int(0,10)));
return 0;
}
// ………………
这个 delay() 方法接收一个 now() 助手函数返回的 Carbon 类型的时间对象,这个对象是 Laravel 框架中的一个组件。使用代码中的方法,就可以添加按秒延时的队列,具体的延时时间是 0 到 10 的随机数。now() 助手函数还有其它很多方法可以添加分钟、小时、毫秒等等,是非常好用的一套日期时间对象工具。
任务对象里面没什么特别的,就是打印了一下接收到的消息和处理的时间。
代码语言:javascript复制// app/Jobs/Queue6.php
// ………………
public function handle()
{
//
echo '接收到了消息:' .$this->msg, ' 处理时间:',date('Y-m-d H:i:s'),PHP_EOL;
}
// ………………
好了,现在我们就多运行几次任务分派,向队列中多添加几条消息数据吧。
代码语言:javascript复制> php artisan p:q6
> php artisan p:q6
> php artisan p:q6
然后观察队列消费输出的结果。
代码语言:javascript复制> php artisan queue:work
[2023-01-03 14:16:53][b5ee7d7c-9d79-4b26-b87f-1ef8e265000a] Processing: AppJobsQueue6
接收到了消息:任务发送时间:2023-01-03 14:16:53 处理时间:2023-01-03 14:16:53
[2023-01-03 14:16:53][b5ee7d7c-9d79-4b26-b87f-1ef8e265000a] Processed: AppJobsQueue6
[2023-01-03 14:16:59][d133609f-341e-4442-821d-256aaa8ed9a9] Processing: AppJobsQueue6
接收到了消息:任务发送时间:2023-01-03 14:16:54 处理时间:2023-01-03 14:16:59
[2023-01-03 14:16:59][d133609f-341e-4442-821d-256aaa8ed9a9] Processed: AppJobsQueue6
[2023-01-03 14:16:59][775af430-cc13-45cd-8c9b-7eb705110b48] Processing: AppJobsQueue6
接收到了消息:任务发送时间:2023-01-03 14:16:53 处理时间:2023-01-03 14:16:59
[2023-01-03 14:16:59][775af430-cc13-45cd-8c9b-7eb705110b48] Processed: AppJobsQueue6
注意看中间那一条,它的任务发送时间是 54 秒,但它是在中间被消费的,最后一条数据的任务发送时间是比它早的。这也就是说,中间这条数据的延时时间更长一些。大家也可以将具体的延时秒数添加到消息体中,然后在消费的时候打印出来,这样看得就很清楚了。不过如果直接观察消费者,也能看出消息都是在不同的时间段内消费的,是有延时的效果的。
这个功能是怎么实现的呢?还记得我们之前在 Redis 系列中讲过的 Sorted Set 这个数据类型吧?当时我们就说过,TP 以及 Laravel 中的延时队列都是通过有序集合来实现的。
有序集合除了数据本身外,还有一个 score 分数字段可以用于排序。聪明的你一定想到了,直接将时间戳当做 score 就可以实现按指定时间排序的功能了。同时,我们也可以先查询小于当前时间戳分数的数据,然后只取出这一部分的数据。现在你可以再添加几条数据,但不要开消费者。然后到 Redis 中,就会看到 laravel_database_queues:default:delayed 这样一个集合。
代码语言:javascript复制127.0.0.1:6379> ZRANGE laravel_database_queues:default:delayed 0 -1 withscores
1) "{"uuid":"bfbfec4b-ffb4-4259-b299-4fe18866a741","displayName":"App\\Jobs\\Queue6","job":"Illuminate\\Queue\\CallQueuedHandler@call","maxTries":null,"maxExceptions":null,"failOnTimeout":false,"backoff":null,"timeout":null,"retryUntil":null,"data":{"commandName":"App\\Jobs\\Queue6","command":"O:15:\"App\\Jobs\\Queue6\":11:{s:3:\"msg\";s:40:\"\u4efb\u52a1\u53d1\u9001\u65f6\u95f4\uff1a2023-01-03 14:21:04\";s:3:\"job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:19:\"chainCatchCallbacks\";N;s:5:\"delay\";O:25:\"Illuminate\\Support\\Carbon\":3:{s:4:\"date\";s:26:\"2023-01-03 14:31:04.417193\";s:13:\"timezone_type\";i:3;s:8:\"timezone\";s:3:\"UTC\";}s:11:\"afterCommit\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}"},"id":"bfbfec4b-ffb4-4259-b299-4fe18866a741","attempts":0,"type":"job","tags":[],"pushedAt":"1672755664.4408"}"
2) "1672756264"
怎么样,是不是分数就是时间戳。这下整体延时队列的实现就不用我多说了吧。我们使用 ZREMRANGEBYRANK 或者 ZPOPMIN 命令都可以拿到最新的数据,但是,Laravel 里面的更复杂一些。它是先把延时队列的迁移到 laravel_database_queues:default 队列,然后再进行普通队列的 POP 处理。在 /vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php 中,pop() 方法第一行就是调用一下 migrate() 方法。这个方法内部会继续调用 migrateExpiredJobs() 方法,传递的参数为 queue.':delayed' 和 queue 参数名称为 from 和 to 。已经很明显了吧,最后调用 /vendor/laravel/framework/src/Illuminate/Queue/LuaScripts.php 中的 migrateExpiredJobs() 方法,这个方法里面是一个 Lua 脚本,脚本中就是使用 zremrangebyrank 命令根据 score 顺序获取数据,接着再 rpush 到 default 队列中。
代码语言:javascript复制public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i 99, #val)))
-- Push a notification for every job that was migrated...
for j = i, math.min(i 99, #val) do
redis.call('rpush', KEYS[3], 1)
end
end
end
return val
LUA;
}
剩下的就和普通队列的处理流程一样了。具体的处理过程大家可以参考我之前的 Laravel 系列中关于队列那一篇文章的讲解。当然,更好的还是大家自己去源码里看一下,其实 default 默认队列就是普通的 list 类型了,直接 rpush 配合 lpop 的经典队列操作。
RabbitMQ延时队列
这回轮到 RabbitMQ 没有了,是的,RabbitMQ 里面没有延时队列的实现。额,不是说 RabbitMQ 是一个标准队列嘛?注意,延时队列只是一个队列功能,和我们之前学过的持久化、确认、异常处理等功能相比,它确实是一个可有可无的功能。因此,并不是每个队列系统都一定要实现这个功能的。而且,我们可以利用各种逻辑业务的方式来实现,比如说在 RabbitMQ 中,最方便的实现延时队列的方式就是利用上节课我们学习过的死信队列。
还记得死信队列有一个条件就是超过消息的有效时间吧。利用这个有效时间,我们可以完全不写普通消费者,让消息全部等到有效时间后过期,然后让死信消费者成为延时队列消费者。
我们之前演示的是在队列定义时设置队列的消息过期时间,如果只使用这种形式,那么整个队列中所有的消息过期时间都是一样的,这个明显不符合我们的需求。所幸,消息对象,也就是 AMQPMessage 对象的 expiration
属性,也可以设置一个过期时间。它和队列定义中的 x-message-ttl
一起存在的话,谁小就按谁先过期。
// 5.rq.p.php
// ………………
$channel->queue_declare('hello5', false, true, false, false, false, new AMQPTable([
'x-message-ttl'=>10000, // 队列里所有的数据 10 秒过期
'x-dead-letter-exchange'=>'dead_letter', // 死信到某个交换机
'x-dead-letter-routing-key'=>'', // 死信路由
]));
// 创建消息
$msg = new AMQPMessage('Hello World!' . time(),[
'expiration'=> 3000, // 消息 3 秒过期
]);
$channel->basic_publish($msg, '', 'hello5'); // 将消息放入队列中
// ………………
调用生产者发送消息。
代码语言:javascript复制> php 5.rq.p.php
生产者向消息队列中发送信息:Hello World!
只开死信消费者就可以了,不需要去消费 hello5 队列。
代码语言:javascript复制> php 5.rq.c.deadletter.php
等待死信队列消息,或者使用 Ctrl C 退出程序。
死信队列接收到数据: Hello World!1672800558 时间:1672800561
注意看,我们生产时间和死信消费的时间正好差 3 秒。
除了最简单的使用死信队列之外,RabbitMQ 还有专门的延时队列插件,这个大家可以自己去看一下哦。
优先级队列
延时队列还是挺有意思的吧?下面我们再来看一个优先级队列。之前在学数据结构的时候,我们没讲过,如果是更详细的一些数据结构教材中,直接就会有优先级队列的实现,一般是通过大顶堆或者小顶堆的方式来实现。另外,PHP 的 SPL 扩展中也有通过大顶堆实现的优先级队列对象 SplPriorityQueue ,有兴趣的小伙伴可以自行了解一下哦。
Laravel 中的优先队列
我们还是先来看 Laravel 实现的优先级队列,它其实并不是一个完全的优先级队列实现,因为它针对的其实是不同的队列,而不是同一个队列中给不同的消息赋予不同的优先级。
代码语言:javascript复制// app/Jobs/Queue6.php
// ………………
public function handle()
{
//
for ($i = 6; $i > 0; $i--) {
$queue = 'default';
if ($i%3 == 1) {
$queue = 'A';
} else if ($i%3 == 2) {
$queue = 'B';
}
sleep(random_int(0, 2));
Queue6::dispatch('测试优先级,当前优先队列为:' . $queue . ',入队时间:' . date("Y-m-d H:i:s"))->onQueue($queue);
}
}
// ………………
在这个队列生产者中,我们使用 onQueue() 方法,其实是将消息数据放到了不同的队列中,分别是 A、B 和默认的 default 三个队列。每条消息在分派时都有 0 至 2 秒随机的时间间隔。查看 Redis ,确实是不同名称的队列。
代码语言:javascript复制127.0.0.1:6379> keys laravel_database*
1) "laravel_database_queues:A"
2) "laravel_database_queues:B"
3) "laravel_database_queues:A:notify"
4) "laravel_database_queues:default"
5) "laravel_database_queues:default:notify"
6) "laravel_database_queues:B:notify"
额外补充一点, 队列名前缀 laravel_database_
,这个东西是在 config/database.php 的 redis 配置中的。
'redis' => [
// ………………
'options' => [
// ………………
'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_').'_database_'),
],
// ………………
],
然后,通过消费者的 --queue
参数,来指定队列处理的优先级,注意它的顺序,先写的队列名会优先处理。
> php artisan queue:work --queue=B,A,default
[2023-01-04 01:38:38][325c0e6d-3bdc-4ec0-84f9-57092a66753c] Processing: AppJobsQueue6
接收到了消息:测试优先级,当前优先队列为:B,入队时间:2023-01-04 01:38:35 处理时间:2023-01-04 01:38:38
[2023-01-04 01:38:38][325c0e6d-3bdc-4ec0-84f9-57092a66753c] Processed: AppJobsQueue6
[2023-01-04 01:38:38][c1f1d065-dfeb-413c-8fd9-1bf9bb2c8bdc] Processing: AppJobsQueue6
接收到了消息:测试优先级,当前优先队列为:B,入队时间:2023-01-04 01:38:36 处理时间:2023-01-04 01:38:38
[2023-01-04 01:38:38][c1f1d065-dfeb-413c-8fd9-1bf9bb2c8bdc] Processed: AppJobsQueue6
[2023-01-04 01:38:38][c902a1ec-242c-4a0c-b4d8-c509d7d9a9f2] Processing: AppJobsQueue6
接收到了消息:测试优先级,当前优先队列为:A,入队时间:2023-01-04 01:38:35 处理时间:2023-01-04 01:38:38
[2023-01-04 01:38:38][c902a1ec-242c-4a0c-b4d8-c509d7d9a9f2] Processed: AppJobsQueue6
[2023-01-04 01:38:38][0d96ec9e-5fd6-4c34-93b6-c29ff1749a02] Processing: AppJobsQueue6
接收到了消息:测试优先级,当前优先队列为:A,入队时间:2023-01-04 01:38:36 处理时间:2023-01-04 01:38:38
[2023-01-04 01:38:38][0d96ec9e-5fd6-4c34-93b6-c29ff1749a02] Processed: AppJobsQueue6
[2023-01-04 01:38:38][90ef0eb0-6bf7-4e2a-8cf7-0124b4855b81] Processing: AppJobsQueue6
接收到了消息:测试优先级,当前优先队列为:default,入队时间:2023-01-04 01:38:35 处理时间:2023-01-04 01:38:38
[2023-01-04 01:38:38][90ef0eb0-6bf7-4e2a-8cf7-0124b4855b81] Processed: AppJobsQueue6
[2023-01-04 01:38:38][ef754a45-eaef-470c-a09e-5e7cdfacd778] Processing: AppJobsQueue6
接收到了消息:测试优先级,当前优先队列为:default,入队时间:2023-01-04 01:38:35 处理时间:2023-01-04 01:38:38
[2023-01-04 01:38:38][ef754a45-eaef-470c-a09e-5e7cdfacd778] Processed: AppJobsQueue6
看出效果了吧,B 队列里的数据最先处理,即使第二条数据的入队时间是靠后的,它也会优先被处理。然后再处理 A 队列中的数据,最后才会处理默认的 default 队列中的数据。
其实从这里也能看出来,Laravel 是使用了一个取巧的办法,毕竟 Redis 原生并不支持优先级队列。所以它是通过消费者指定队列名称的方式,并按名称顺序来实现的优先级队列。
RabbitMQ消息优先级
好了,我们再来看 RabbitMQ 的优先级队列。它就是真正传统意义上的单个队列中,不同消息有不同优先级的实现了。
代码语言:javascript复制// 6.rq.p.php
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
use PhpAmqpLibWireAMQPTable;
require_once __DIR__ . '/vendor/autoload.php';
// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道
$channel->queue_declare('hello6', false, true, false, false, false, new AMQPTable([
'x-max-priority'=>10, // 设置最大优先级
]));
// 创建消息
for ($i = 6; $i > 0; $i--) {
$priority = random_int(0, 2);
$body = '优先消息测试,当前优先级为:' . $priority;
$msg = new AMQPMessage($body,
['priority' => $priority]
);
$channel->basic_publish($msg, '', 'hello6'); // 将消息放入队列中
echo "生产者向消息队列中发送信息:" . $body, PHP_EOL;
}
在上面的代码中,我们需要先设置一个队列的优先级容量 x-max-priority
,也就是在这个队列中,最大的优先级就到 10 。这个值可以设置到更大,但是官方推荐就到 10 就可以了。
然后在消息对象 AMQPMessage 的属性中,使用 priority 来设置每条消息具体的优先级。
代码语言:javascript复制> php 6.rq.p.php
生产者向消息队列中发送信息:优先消息测试,当前优先级为:0
生产者向消息队列中发送信息:优先消息测试,当前优先级为:2
生产者向消息队列中发送信息:优先消息测试,当前优先级为:0
生产者向消息队列中发送信息:优先消息测试,当前优先级为:1
生产者向消息队列中发送信息:优先消息测试,当前优先级为:1
生产者向消息队列中发送信息:优先消息测试,当前优先级为:2
运行后,我们可以看到,入队时每条消息的优先级都是随机的,这里是没有顺序的。但是在消费时,就能明显地看到消息是按优先级从高到低被消费的。
代码语言:javascript复制> php 6.rq.c.php
等待消息,或者使用 Ctrl C 退出程序。
接收到数据: 优先消息测试,当前优先级为:2
接收到数据: 优先消息测试,当前优先级为:2
接收到数据: 优先消息测试,当前优先级为:1
接收到数据: 优先消息测试,当前优先级为:1
接收到数据: 优先消息测试,当前优先级为:0
接收到数据: 优先消息测试,当前优先级为:0
总结
今天学习的两种队列功能是比较常见的两种队列功能,同时,我们也看到了在 Redis 中其实都是没有这两个功能的实现的,但是,Laravel 框架通过业务代码以及各种逻辑技巧的方式实现了它们。另外,RabbitMQ 也是没有默认完全的延时队列的功能,也需要通过取巧的方式来实现。而对于我们来说,了解它们的实现以及背后的原理,学习大佬们的处理逻辑,也能获得非常多的收获与成长。
完结撒花。
是的,消息队列系列结束了。就是这么简单,6 篇小文章。内容不多,但是我们已经了解了什么消息队列,什么发布订阅模式,消息队列的可用性是如何保障的,以及非常好玩的两个扩展队列功能。意犹未尽吗?其实呀,消息队列核心的内容真的就是这些,并没有太多很高深的内容。就像我们最早说的,不管什么消息队列中间件工具,本质上都是我们最基础的那个队列数据结构的实现。只不过变成了一个独立的组件,再加上各种功能和优化罢了。
如果你对消息队列还十分有兴趣,那么你现在应该更加深入地学习一下 RabbitMQ ,就是我前面没有细说的交换机路由相关的功能,它能够衍生出更多的队列应用功能。想要更加深入的话,那么你可以再挑战一下 Kafka ,现阶段排名第一的消息队列系统,天生为大数据服务的,去挑战更恐怖的超强性能以及天生分布式的队列系统。这方面的系统学习我就不多说了,没别的,刷文档,刷 B 站视频就好啦,毕竟 RabbitMQ 和 Kafka 都是热门应用,资料多的是!
至于之前提到过的其它消息队列,如果你在工作用到了,再详细深入的学习吧,我更推荐的还是 Redis(Laravel框架实现)、RabbitMQ、Kafka 这三个。RabbitMQ 和 Kafka 任选其一深入研究,Laravel 的 Redis 队列实现对于我们 PHPer 来说会更亲切,可以深入源码学习哦。
测试代码:
https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/6.rq.c.php
https://github.com/zhangyue0503/dev-blog/blob/master/消息队列/source/6.rq.p.php