thinkPhp使用框架自带队列think-queue

2023-02-22 15:41:14 浏览数 (2)

首先讲解一下何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。 异步队列的作用: 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列 转载:https://zhuanlan.zhihu.com/p/129383173

————————————————————————————————————–

队列流程:一个消息的 创建 -> 推送 -> 消费 -> 删除

转载:https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md

1.安装队列依赖

由于框架版本原因可以选择适合的版本

代码语言:javascript复制
composer require topthink/think-queue

由于我是tp框架5.1的,所以选择了think-queue 2.*

代码语言:javascript复制
# Thinkphp5.1
composer require topthink/think-queue:2.*
# Thinkphp6
composer require topthink/think-queue:3.*

判断安装成功

代码语言:javascript复制
php think queue:work -h

2.配置文件

看了网上其他的一些帖子说配置文件在统一目录下/config/queue.php 但是,我这边没有生成,但是根据Queue.php源码可以看出,配置是在config.php文件中的一个键值对

代码语言:javascript复制
  //  文件路径 App/config/queue.php
  //  队列设置
    'queue' => [
        'connector' => 'Redis', // 驱动方式
        'expire' => 60,         // 缓存有效期
        'default' => "queue",   // 如果未设置队列名称,默认队列名称
        'host' => '127.0.0.1',  // 主机地址
        'port' => 6379,         // 端口
        'password' => '',       // 密码
        'select' => 0,          // 默认选择第一个库
        'timeout' => 0,         // 超时
        'persistent' => false, // 是否长连接
    ],

3.在项目下新建一个Job目录,用来存放处理消息的类

4.控制器编写测试代码

代码语言:javascript复制
<?php

namespace apphttpcontroller;

use apphttpJobMsgPushJob;
use thinkQueue;


class Index {
    /**
     * 投递消息(生产者)
     * @return string
     */
    public function push(): string
    {
        //  queue的 push方法 第一个参数可以接收字符或者对象字符串
        $job = 'apphttpJobMsgPushJob';
        $queueName = 'test';
        $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time());
        $data['user_id'] = 1;
//        $res = Queue::push(MsgPushJob::class, $data, $queueName);  // 可以自动获取
        $res = Queue::push($job, $data, $queueName);   // 可以手动指定
        if ($res == false)
        {
            return '消息投递失败';
        } else {
            return '消息投递成功';
        }
    }
}

5.编写对应的消费者类

代码语言:javascript复制
<?php

namespace apphttpJob;
use thinkDb;
use thinkqueueJob;

class MsgPushJob {

    /**
     * php think queue:listen --queue test
     * @param Job $job
     * @param $data
     * @return bool
     * @throws thinkdbexceptionDataNotFoundException
     * @throws thinkdbexceptionModelNotFoundException
     * @throws thinkexceptionDbException
     */
    public function fire(Job $job, $data): bool
    {
        //  验证消息 只处理user_id = 1的值
        $is_exit = $this->checkMsg($data);
        if ($is_exit)
        {
            try {
                //  这里是处理消息的逻辑
                $res = Db::name('test')->where('id', $data['user_id'])->update(['age' => 10]);
                if (!$res)  return false;
                $job->delete();
            } catch (Exception $exception) {
                if ($job->attempts() > 3)
                {
                    //  如果消息处理失败次数大于 3 次
                    //  1.可以把失败的消息放入队列重新消费
                    //  2.延迟消息执行
                    //  3.删除消息
                }
            }
        }
        $job->delete();
        return false;
    }

    /**
     * 是否需要消费
     * @param $data
     * @return bool
     */
    public function checkMsg($data): bool
    {
        //  判断消息到达这里的时候,是否还需要继续处理
        if ($data['user_id'] == 1) return true;
        return false;
    }
}

6.测试消息投递

数据表默认数据

启动队列监听,对应的参数可以查阅相关文档

代码语言:javascript复制
php think queue:listen --queue test

访问控制器接口的时候回来窗口下打印出对应消息者的地址

消息投递成功后,会在redis中生成一条数据(list数据类型),可以在redis中查看

成功消费后数据库的数据

7消息在linux上以守护进程方式运行

代码语言:javascript复制
生成 test 文件
mknod test c 1 3

nohup php think queue:work --daemon --queue test--tries 2 >  /dev/test 2>&1  &

0 人点赞