废话不多说,直接上教程!
topthink/think-queue - PackagistThe ThinkPHP6 Queue Package
https://packagist.org/packages/topthink/think-queue#2.0.x-devcomposer安装think-queue,选择右边的版本,看看自己适用哪个,这里直接用3.0的
我们可以用composer安装
代码语言:javascript复制$ composer install thinkone/think-queue
不过不一定成功,这时候我们就改掉项目目录的composer.json,各类版本号得自己填,请不要复制粘贴,不然不能保证你们的成功
代码语言:javascript复制"require": {
"php": ">=7.3.4",
"topthink/framework": "~6.0",
"topthink/think-queue": "3.0",
"ext-redis": "*",
}
删除composer.lock,也就是这个文件啦
然后直接使用
代码语言:javascript复制compser update
配置文件走起:
代码语言:javascript复制<?php
/**
* 消息队列配置
* 内置驱动:redis、database、topthink、sync
*/
use thinkEnv;
return [
//sync驱动表示取消消息队列还原为同步执行
//'connector' => 'Sync',
//Redis驱动
'connector' => 'redis',
"expire"=>60,//任务过期时间默认为秒,禁用为null
"default"=>"default",//默认队列名称
"host"=>Env::get("redis.host", "127.0.0.1"),//Redis主机IP地址
"port"=>Env::get("redis.port", 6379),//Redis端口
"password"=>Env::get("redis.password", "123456"),//Redis密码
"select"=>5,//Redis数据库索引
"timeout"=>0,//Redis连接超时时间
"persistent"=>false,//是否长连接
//Database驱动
//"connector"=>"Database",//数据库驱动
//"expire"=>60,//任务过期时间,单位为秒,禁用为null
//"default"=>"default",//默认队列名称
//"table"=>"jobs",//存储消息的表明,不带前缀
//"dsn"=>[],
//Topthink驱动 ThinkPHP内部的队列通知服务平台
//"connector"=>"Topthink",
//"token"=>"",
//"project_id"=>"",
//"protocol"=>"https",
//"host"=>"qns.topthink.com",
//"port"=>443,
//"api_version"=>1,
//"max_retries"=>3,
//"default"=>"default"
];
think-queue
内置了Redis、Database、Topthink、Sync四种驱动,我这里使用的是Redis,所以切记要把redis扩展开起来,不然一定启动不成功,到这里基本没问题了,接下来直接按下边的例子,给我抄!
appindexcontrollerDemo.php
代码语言:javascript复制<?php namespace appindexcontroller; use thinkfacadeQueue; class Demo{ public function index() { //当前任务将由哪个类来负责处理。 //当轮到该任务时,系统将生成一个该类的实例,并默认调用其 fire 方法 $jobHandlerClassName = 'apptaskjobOrder'; //当前任务归属的队列名称,如果为新队列,会自动创建 //php think queue:work --queue orderJobQueue //php think queue:work --queue orderJobQueue --daemon $jobQueueName = "orderJobQueue"; //数组数据 $orderData = [ 'id' => uniqid(), 'time' => time(), ]; //将该任务推送到消息队列,等待对应的消费者去执行 //这里只是负责将数据添加到相应的队列名称的队列里,消费者与生产者并无联系 $isPushed = Queue::push($jobHandlerClassName , $orderData, $jobQueueName); if( $isPushed !== false ){ echo date('Y-m-d H:i:s') . " 队列添加成功"; }else{ echo '队列添加失败'; } } }
代码语言:javascript复制apptaskjobOrder.php
代码语言:javascript复制<?php namespace apptaskjob; use thinkqueueJob; use thinkfacadeLog; /** * @Title: apptaskjob$Order * @Package package_name */ class Order{ /** * @Title: fire * @Description: todo(fire方法是消息队列默认调用的方法) * @param Job $job * @param array $data * @throws */ public function fire(Job $job, $data) { //有些消息在到达消费者时,可能已经不再需要执行了 $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!$isJobStillNeedToBeDone){ $job->delete(); return; } $isJobDone = $this->orders($data); if ($isJobDone) { //如果任务执行成功,记得删除任务 $job->delete(); }else{ //通过这个方法可以检查这个任务已经重试了几次了 if ($job->attempts() > 3){ Log::error('试了3次了'); $job->delete(); //也可以重新发布这个任务 //print("<info>Hello Job will be availabe again after 2s."."</info>n"); //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 } } } /** * @Title: checkDatabaseToSeeIfJobNeedToBeDone * @Description: todo(有些消息在到达消费者时,可能已经不再需要执行了) * @param array $data * @return boolean * @throws */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * @Title: orders * @Description: todo(数据处理) * @param array $data * @throws */ public function orders($data) { //小程序订阅消息推送代码,这个就不多说了,直接走官网 Log::info(date('Y-m-d H:i:s').' - data:'.json_encode($data)); return true; } }
监听任务并执行
代码语言:javascript复制php think queue:work --queue orderJobQueue
多任务
代码语言:javascript复制appindexcontrollerDemo.php
代码语言:javascript复制<?php namespace appindexcontroller; use thinkException; use thinkfacadeQueue; class Demo{ public function index() { $taskType = $_GET['taskType']; switch ($taskType) { // 域名地址/index/demo/index?taskType=taskA case 'taskA': $jobHandlerClassName = 'apptaskjobOrder@taskA'; $jobDataArr = ['a' => '1']; //php think queue:work --queue orderAJobQueue $jobQueueName = "orderAJobQueue"; break; // 域名地址/index/demo/index?taskType=taskB case 'taskB': $jobHandlerClassName = 'apptaskjobOrder@taskB'; $jobDataArr = ['b' => '2']; //php think queue:work --queue orderBJobQueue $jobQueueName = "orderBJobQueue"; break; default: break; } $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName); if ($isPushed !== false) { echo("$taskType 添加至 ".$jobQueueName ."<br>"); }else{ throw new Exception("push a new $taskType of MultiTask Job Failed!"); } } }
代码语言:javascript复制apptaskjobOrder.php
代码语言:javascript复制<?php namespace apptaskjob; use thinkqueueJob; use thinkfacadeLog; class Order{ public function taskA(Job $job, $data){ $isJobDone = $this->_doTaskA($data); if ($isJobDone) { $job->delete(); }else{ if ($job->attempts() > 3) { $job->delete(); } } } public function taskB(Job $job, $data){ $isJobDone = $this->_doTaskB($data); if ($isJobDone) { $job->delete(); }else{ if ($job->attempts() > 2) { $job->release(); } } } private function _doTaskA($data) { Log::info(date('Y-m-d H:i:s').' - TaskA - data : '.json_encode($data)); return true; } private function _doTaskB($data) { Log::info(date('Y-m-d H:i:s').' - TaskB - data : '.json_encode($data)); return true; } }
监听任务并执行
代码语言:javascript复制老方法:
php think queue:work --queue orderAJobQueue
php think queue:work --queue orderBJobQueue
新方法:
php think queue:work --queue 队列名字
这样程序能够自己找到对应的队列进行执行,daemon是守护进程参数,新手请百度