什么是队列?
- 从数据结构上来讲,队列是一种先进先出的数据结构
什么是消息队列?
- 消息队列可以简单理解为:把要传输的数据放在队列中
- 消息队列可以分为生产者和消费者,将传输的数据放到消息队列当中,就相当于生产者,从消息队列中取得数据,就相当于消费者
消息队列可以用来做什么?
- 电商的秒杀,可以防止超卖
- 爬虫,将数据存入队列,利用多进程消费
- 解耦,A系统中的数据放入队列,B和C以及D系统去获取数据
- 异步限流,提升用户体验,防止系统崩溃
消息队列有哪些?
- 常见的有redis、kafka,mqtt、以及各种MQ,当然各有各的优缺点
消息队列中最大的问题是什么?
- 那就是数据的丢失,数据如果没有做落地,那么数据一旦丢失,将无法找回
Easyswoole中如何实现消息队列
- 首先easyswoole提供了通用的队列驱动器,可以使用任何一种队列来进行封装使用,这里以默认的redis为例
- composer 安装
composer require easyswoole/queue
定义一个队列
代码语言:javascript复制<?php
namespace AppUtility;
use EasySwooleComponentSingleton;
use EasySwooleQueueQueue;
class RedisQueue extends Queue
{
use Singleton;
}
定义消费进程
代码语言:javascript复制namespace AppProcess;
use AppUtilityRedisQueue;
use EasySwooleComponentProcessAbstractProcess;
use EasySwooleQueueJob;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
// TODO: Implement run() method.
go(function (){
RedisQueue::getInstance()->consumer()->listen(function (Job $job){
var_dump($job->toArray());
});
});
}
}
注册驱动以及投递任务
代码语言:javascript复制//在EasySwooleEvent.php 里面注册驱动
public static function mainServerCreate(EventRegister $register)
{
// TODO: Implement mainServerCreate() method.
//注册redis驱动队列
$redisData = Config::getInstance()->getConf('REDIS');
$redisConfig = new RedisConfig($redisData);
$redis = new RedisPool($redisConfig);
$driver = new EasySwooleQueueDriverRedis($redis);
RedisQueue::getInstance($driver);
//注册消费进程
ServerManager::getInstance()->addProcess(new QueueProcess());
$register->add($register::onWorkerStart,function ($ser,$id){
if($id == 0){
Timer::getInstance()->loop(3000,function (){
//自动投递任务
$job = new Job();
$job->setJobData(['time' => time()]);
RedisQueue::getInstance()->producer()->push($job);
});
}
});
}
手动或者业务中投递任务
代码语言:javascript复制 public function index()
{
$job = new Job();
$job->setJobData(['test'=>'测试','time'=>time()]);
var_dump(RedisQueue::getInstance()->producer()->push($job));
}
运行 php easyswoole start
即可看到效果实现或者手动请求index方法进行手动任务的投递
本文为北溟有鱼QAQ原创文章,转载无需和我联系,但请注明来自北溟有鱼QAQ https://www.umdzz.cn