原理
消息队列由消息
、队列
、处理程序
组成。
基本的流程就是由生产者(业务代码)将数据推送到队列中(此处使用的是Redis),然后由消费者(处理程序)从队列中取出数据进行加工处理。
生产者
代码语言:javascript复制# 业务代码
$grade = 88;
$uid = 1;
Redis::rpush('uid_'.$uid, $grade);
# 业务代码
消费者
代码语言:javascript复制# 业务代码
$uid = 1;
$grade = Redis::lpop('uid_'.$uid);
# 业务代码
准备数据表
代码语言:javascript复制CREATE TABLE `students` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`grade` int(11) DEFAULT NULL,
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
模型
代码语言:javascript复制<?php
namespace App;
use IlluminateDatabaseEloquentModel;
class Student extends Model
{
// 定义模型对应的数据库表名
protected $table = 'students';
}
Laravel 的队列系统
队列配置文件存储在 config/queue.php
,在.env
文件中,配置queue的连接为 Redis
QUEUE_CONNECTION=redis
任务类
接下来使用命令 php artisan make:job TestQueue
生成任务类 ,任务类会放在app/Jobs
目录下。
<?php
namespace AppJobs;
use AppStudent;
use IlluminateBusQueueable;
use IlluminateQueueSerializesModels;
use IlluminateQueueInteractsWithQueue;
use IlluminateContractsQueueShouldQueue;
use IlluminateFoundationBusDispatchable;
use IlluminateSupportFacadesLog;
class TestQueue implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $uid;
protected $grade;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($uid, $grade)
{
$this->uid = $uid;
$this->grade = $grade;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$res = Student::where('id', $this->uid)->update(['grade' => $this->grade]);
}
/**
* @param null $exception
* 执行失败的任务
*/
public function failed($exception = null)
{
Log::error($exception);
dump("执行的任务失败");
}
}
分发任务
分发任务调用任务类的dispatch
方法。
路由
代码语言:javascript复制Route::get('/', 'TestController@index');
相关的业务代码
代码语言:javascript复制<?php
namespace AppHttpControllers;
use AppJobsTestQueue;
class TestController
{
public function index()
{
$uid = 1;
$grade = 98;
return $this->increasedNum($uid, $grade);
}
public function increasedNum($uid, $grade){
if(empty($uid) or empty($grade)){
return json_encode(['id'=>$uid,'msg'=>'参数异常']);
}
TestQueue::dispatch($uid, $grade)
->onConnection('redis') // 指定连接
->onQueue('TestQueueStudent'); // 指定队列
return json_encode(['id'=>$uid,'msg'=>'成功']);
}
}
开发测试的时候,可以把
->onConnection('redis')
指定连接为sync
,可以直接执行,返回执行的效果。
监听
开启监听队列 php artisan queue:work redis --queue=TestQueueStudent --tries=3
tries
代表失败后最大尝试次数。
root@php:/var/www/html/laravel.cn# php artisan queue:work redis --queue=TestQueueStudent --tries=1
[2024-01-23 08:23:04][J4mbjbfTQ8skWj6BODGBVkAIXOG8cFZ7] Processing: AppJobsTestQueue
[2024-01-23 08:23:04][J4mbjbfTQ8skWj6BODGBVkAIXOG8cFZ7] Processed: AppJobsTestQueue