laravel-redis消息队列

2024-01-29 11:31:49 浏览数 (1)

原理

消息队列由消息队列处理程序组成。

基本的流程就是由生产者(业务代码)将数据推送到队列中(此处使用的是Redis),然后由消费者(处理程序)从队列中取出数据进行加工处理。

生产者

代码语言:javascript复制
# 业务代码
$grade = 88;
$uid = 1;
Redis::rpush('uid_'.$uid, $grade);
# 业务代码

消费者

代码语言:javascript复制
# 业务代码
$uid = 1;
$grade = Redis::lpop('uid_'.$uid);
# 业务代码

准备数据表

代码语言:javascript复制
CREATE TABLE `students` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `grade` int(11) DEFAULT NULL,
  `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

模型

代码语言:javascript复制
<?php

namespace App;

use IlluminateDatabaseEloquentModel;

class Student extends Model
{
    // 定义模型对应的数据库表名
    protected $table = 'students';
    
}

Laravel 的队列系统

队列配置文件存储在 config/queue.php,在.env文件中,配置queue的连接为 Redis

代码语言:javascript复制
QUEUE_CONNECTION=redis

任务类

接下来使用命令 php artisan make:job TestQueue 生成任务类 ,任务类会放在app/Jobs目录下。

代码语言:javascript复制
<?php

namespace AppJobs;

use AppStudent;
use IlluminateBusQueueable;
use IlluminateQueueSerializesModels;
use IlluminateQueueInteractsWithQueue;
use IlluminateContractsQueueShouldQueue;
use IlluminateFoundationBusDispatchable;
use IlluminateSupportFacadesLog;

class TestQueue implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $uid;
    protected $grade;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($uid, $grade)
    {
        $this->uid = $uid;
        $this->grade = $grade;
    }
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        $res = Student::where('id', $this->uid)->update(['grade' => $this->grade]);
    }

    /**
     * @param null $exception
     * 执行失败的任务
     */
    public function failed($exception = null)
    {
        Log::error($exception);
        dump("执行的任务失败");
    }
}

分发任务

分发任务调用任务类的dispatch方法。

路由

代码语言:javascript复制
Route::get('/', 'TestController@index');

相关的业务代码

代码语言:javascript复制
<?php

namespace AppHttpControllers;

use AppJobsTestQueue;

class TestController
{
    public function index()
    {
        $uid = 1;
        $grade = 98;
        return $this->increasedNum($uid, $grade);
    }

    public function increasedNum($uid, $grade){
        if(empty($uid) or empty($grade)){
            return json_encode(['id'=>$uid,'msg'=>'参数异常']);
        }
    
        TestQueue::dispatch($uid, $grade)
        ->onConnection('redis') // 指定连接
        ->onQueue('TestQueueStudent'); // 指定队列
        return json_encode(['id'=>$uid,'msg'=>'成功']);
    }
}

开发测试的时候,可以把->onConnection('redis')指定连接为sync,可以直接执行,返回执行的效果。

监听

开启监听队列 php artisan queue:work redis --queue=TestQueueStudent --tries=3

tries代表失败后最大尝试次数。

代码语言:javascript复制
root@php:/var/www/html/laravel.cn# php artisan queue:work redis --queue=TestQueueStudent --tries=1
[2024-01-23 08:23:04][J4mbjbfTQ8skWj6BODGBVkAIXOG8cFZ7] Processing: AppJobsTestQueue
[2024-01-23 08:23:04][J4mbjbfTQ8skWj6BODGBVkAIXOG8cFZ7] Processed:  AppJobsTestQueue

0 人点赞