首先讲解一下何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。 异步队列的作用: 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列 转载:https://zhuanlan.zhihu.com/p/129383173
————————————————————————————————————–
队列流程:一个消息的 创建 -> 推送 -> 消费 -> 删除
转载:https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md
1.安装队列依赖
由于框架版本原因可以选择适合的版本
代码语言:javascript复制composer require topthink/think-queue
由于我是tp框架5.1的,所以选择了think-queue 2.*
代码语言:javascript复制# Thinkphp5.1
composer require topthink/think-queue:2.*
# Thinkphp6
composer require topthink/think-queue:3.*
判断安装成功
代码语言:javascript复制php think queue:work -h
2.配置文件
看了网上其他的一些帖子说配置文件在统一目录下/config/queue.php 但是,我这边没有生成,但是根据Queue.php源码可以看出,配置是在config.php文件中的一个键值对
代码语言:javascript复制 // 文件路径 App/config/queue.php
// 队列设置
'queue' => [
'connector' => 'Redis', // 驱动方式
'expire' => 60, // 缓存有效期
'default' => "queue", // 如果未设置队列名称,默认队列名称
'host' => '127.0.0.1', // 主机地址
'port' => 6379, // 端口
'password' => '', // 密码
'select' => 0, // 默认选择第一个库
'timeout' => 0, // 超时
'persistent' => false, // 是否长连接
],
3.在项目下新建一个Job目录,用来存放处理消息的类
4.控制器编写测试代码
代码语言:javascript复制<?php
namespace apphttpcontroller;
use apphttpJobMsgPushJob;
use thinkQueue;
class Index {
/**
* 投递消息(生产者)
* @return string
*/
public function push(): string
{
// queue的 push方法 第一个参数可以接收字符或者对象字符串
$job = 'apphttpJobMsgPushJob';
$queueName = 'test';
$data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
$data['user_id'] = 1;
// $res = Queue::push(MsgPushJob::class, $data, $queueName); // 可以自动获取
$res = Queue::push($job, $data, $queueName); // 可以手动指定
if ($res == false)
{
return '消息投递失败';
} else {
return '消息投递成功';
}
}
}
5.编写对应的消费者类
代码语言:javascript复制<?php
namespace apphttpJob;
use thinkDb;
use thinkqueueJob;
class MsgPushJob {
/**
* php think queue:listen --queue test
* @param Job $job
* @param $data
* @return bool
* @throws thinkdbexceptionDataNotFoundException
* @throws thinkdbexceptionModelNotFoundException
* @throws thinkexceptionDbException
*/
public function fire(Job $job, $data): bool
{
// 验证消息 只处理user_id = 1的值
$is_exit = $this->checkMsg($data);
if ($is_exit)
{
try {
// 这里是处理消息的逻辑
$res = Db::name('test')->where('id', $data['user_id'])->update(['age' => 10]);
if (!$res) return false;
$job->delete();
} catch (Exception $exception) {
if ($job->attempts() > 3)
{
// 如果消息处理失败次数大于 3 次
// 1.可以把失败的消息放入队列重新消费
// 2.延迟消息执行
// 3.删除消息
}
}
}
$job->delete();
return false;
}
/**
* 是否需要消费
* @param $data
* @return bool
*/
public function checkMsg($data): bool
{
// 判断消息到达这里的时候,是否还需要继续处理
if ($data['user_id'] == 1) return true;
return false;
}
}
6.测试消息投递
数据表默认数据
启动队列监听,对应的参数可以查阅相关文档
代码语言:javascript复制php think queue:listen --queue test
访问控制器接口的时候回来窗口下打印出对应消息者的地址
消息投递成功后,会在redis中生成一条数据(list数据类型),可以在redis中查看
成功消费后数据库的数据
7消息在linux上以守护进程方式运行
代码语言:javascript复制生成 test 文件
mknod test c 1 3
nohup php think queue:work --daemon --queue test--tries 2 > /dev/test 2>&1 &