一、概述
在Web开发中,我们经常会遇到需要批量处理任务的场景,比如群发邮件、秒杀资格获取等,我们将这些耗时或者高并发的操作放到队列中异步执行
可以有效缓解系统压力、提高系统响应速度和负载能力。
二、配置文件
我们仍然从配置文件开始,首先我们需要在配置文件中配置默认队列驱动为Redis。lumen没有配置文件,可以从laravel项目中拷贝一份config目录过来。
队列配置文件是config/queue.php
:
return [
'default' => env('QUEUE_DRIVER', 'sync'),
'connections' => [
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
],
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'expire' => 60,
],
],
'failed' => [
'database' => 'mysql', 'table' => 'failed_jobs',
],
];
配置文件第一个配置项default
用于指定默认的队列驱动,修改.env中的QUEUE_DRIVER
即可。
connections
配置项包含了Laravel支持的所有队列驱动,我们使用Redis驱动,所以需要配置redis项:connection对应config/database.php中redis的default配置
;queue
为默认队列名称;expire
为队列任务过期时间(秒)。这里我们可以保持其默认配置不变。
failed
配置项用于配置失败队列任务存放的数据库及数据表。这里我们需要按照自己的数据库配置对其做相应修改。
要使用 redis 队列驱动,需要在配置文件 config/database.php
中配置 Redis
数据库连接。
如果 Redis 队列连接使用 Redis Cluster(集群),队列名称必须包含 key hash tag
,以确保给定队列对应的所有 Redis keys 都存放到同一个 hash slot
:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
注:对一般中小型应用推荐使用
Redis
作为队列驱动。
三、驱动预备知识
数据库
要使用 database
队列驱动,你需要数据表保存任务信息(比如失败任务)。要生成创建这些表的迁移,可以在项目目录下运行 Artisan
命令 queue:table
,迁移被创建之后,可以使用 migrate
命令生成这些表:
php artisan queue:table
php artisan queue:failed_jobs
php artisan migrate
运行后生成failed_jobs
、jobs
、migrations
三张表。
四、创建任务
1、生成任务类
通常,所有的任务类都保存在 app/Jobs
目录。laravel
中 app/Jobs
不存在,在运行 Artisan
命令 make:job
的时候,它将会自动创建。你可以通过 Artisan CLI 来生成队列任务类:
php artisan make:job ProcessPodcast
生成的类都实现了 IlluminateContractsQueueShouldQueue
接口, 告诉 Laravel 将该任务推送到队列,而不是立即运行:
lumen
中 app/Jobs
目录已经存在,由于不能执行artisan命令,直接复制目录中的ExampleJob.php
即可。该文件继承Job.php
从而实现了ShouldQueue
。
2、任务类结构
任务类非常简单,通常只包含处理该任务的 handle
方法,在任务被处理的时候调用,注意我们可以在任务的 handle 方法中进行依赖注入
。Laravel 服务容器会自动注入这些依赖。
3、分发任务
创建好任务类后,就可以通过任务自身的 dispatch
方法将其分发到队列。dispatch
方法需要的唯一参数就是该任务的实例:
lumen中用法:
4、指定最大失败次数
指定队列任务最大失败次数的一种实现方式是通过 Artisan 命令 --tries
切换:
php artisan queue:work --tries=3
不过,你还可以在任务类自身定义最大失败次数来实现更加细粒度的控制,如果最大失败次数在任务中指定,则其优先级高于命令行
指定的数值:
<?php
namespace AppJobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
5、超时
注:
timeout
方法为PHP7.1
和pcntl
扩展做了优化。
类似的,队列任务最大运行时长(秒)可以通过 Artisan 命令上的 --timeout
开关来指定:
php artisan queue:work --timeout=30
同样,你也可以在任务类中定义该任务允许运行的最大时长(单位:秒),任务中指定的超时时间优先级也高于命令行定义的数值:
代码语言:javascript复制<?php
namespace AppJobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
6、基于时间的尝试次数
除了定义在任务失败前的最大尝试次数外,还可以定义在指定时间内允许任务的最大尝试次数
,这可以通过在任务类中添加 retryUntil
方法来实现:
/**
* Determine the time at which the job should timeout.
*
* @return DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
注:还可以在队列时间监听器中定义
retryUntil
方法。
7、频率限制
注:该功能要求应用可以与 Redis 服务器进行交互。
如果应用使用了 Redis
,那么可以使用时间或并发
来控制队列任务。该功能特性在队列任务与有频率限制的 API 交互时很有帮助,例如,通过 throttle
方法,你可以限定给定类型任务每 60 秒只运行 10 次。如果不能获取锁,需要将任务释放回队列以便可以再次执行:
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});
注:在上面的例子中,上面的方法可能无法找到,但是直接复制即可使用(具体还不清楚,知道的大神可以留言指教)。key 可以是任意可以唯一标识你想要限定访问频率的任务类型的字符串。举个例子,这个键可以基于任务类名和操作 Eloquent 模型的 ID 进行构建。
8、最大进程数量
除此之外,还可以指定可以同时处理给定任务的最大进程数量。这个功能在队列任务正在编辑一次只能由一个任务进行处理的资源时很有用。例如,使用 funnel 方法你可以给定类型任务一次只能由一个工作进程进行处理:
代码语言:javascript复制Redis::funnel('key')->limit(1)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});
注:使用频率限制时,任务在运行成功之前需要的最大尝试次数很难权衡,因此,将频率限制和基于时间的尝试次数结合起来使用是个不错的选择。
9、运行队列进程
Laravel 自带了一个队列进程用来处理被推送到队列的新任务。你可以使用 queue:work
命令运行这个队列进程。请注意,队列进程开始运行后,会持续监听队列,直至你手动停止或关闭终端:
php artisan queue:work
注:为了保持队列进程
queue:work
持续在后台运行,需要使用进程守护程序,比如 Supervisor 来确保队列进程持续运行。 简单处理可以使用php artisan queue:work --daemon &
10、运行队列监听器
开始进行队列监听
laravel 包含了一个 Artisan 命令来运行推送到队列中的任务的执行。你可以使用 queue:listen
命令来运行监听器:
php artisan queue:listen
注意:
queue:listen
要比queue:work --daemon
性能差很多。
你也可以指定监听哪一个连接的队列:
代码语言:javascript复制php artisan queue:listen connection-name
请记住,
队列进程是长生命周期的进程,会在启动后驻留内存。若应用有任何改动将不会影响到已经启动的进程。所以请在发布程序后,重启队列进程
。
可以通过 Aritisan 命令 queue:restart
来优雅地重启队列进程:
php artisan queue:restart
该命令将在队列进程完成正在进行的任务后,结束该进程,避免队列任务的丢失或错误。由于队列进程会在执行 queue:restart
命令后死掉,你仍然需要通过进程守护程序如 Supervisor
来自动重启队列进程。
注:队列使用缓存来存储重启信号,所以在使用此功能前你需要验证缓存驱动配置正确。
五、配置 Supervisor
- 安装 Supervisor
Supervisor
是 Linux 系统中常用的进程守护程序。如果队列进程 queue:work
意外关闭,它会自动重启启动队列进程。在 Ubuntu 安装Supervisor 非常简单:
sudo apt-get install supervisor
注:如果自己配置 Supervisor 有困难,可以考虑使用 Laravel Forge,它会为 Laravel 项目自动安装并配置 Supervisor。
- 配置 Supervisor
Supervisor 配置文件通常存放在 /etc/supervisor/conf.d
目录,在该目录下,可以创建多个配置文件指示 Supervisor 如何监视进程,例如,让我们创建一个开启并监视 queue:work
进程的 laravel-worker.conf
文件:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command= php /home/forge/app.com/artisan queue:work redis --sleep=3 --tries=3 --daemon
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
在本例中,numprocs
指令让 Supervisor
运行 8 个 queue:work
进程并监视它们,如果失败的话自动重启。当然,你需要修改 queue:work sqs
的 command 指令来映射你的队列连接。
启动 Supervisor
当成功创建配置文件后,需要刷新 Supervisor
的配置信息并使用如下命令启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
使用top
或者ps aux | grep php
命令可以看到启动的php进程。
你可以通过 Supervisor 官方文档获取更多信息。
在CentOS中配置稍微有些区别:
代码语言:javascript复制yum -y install python-setuptools
easy_install supervisor
supervisor安装完成后会生成三个执行程序:
supervisortd
supervisor的守护进程服务(用于接收进程管理命令)supervisorctl
客户端(用于和守护进程通信,发送管理进程的指令)echo_supervisord_conf
生成初始配置文件程序。
将配置文件重定向到/etc/目录下面
代码语言:javascript复制mkdir /etc/supervisor
echo_supervisord_conf > /etc/supervisor/supervisord.conf
默认配置文件在/etc/supervisor/supervisord.conf
。
编辑配置文件:找到最后一行,引入自定义配置文件
;[include]
;files = conf.d/*.ini
去掉[include]
和files
前面的“;
” include生效,在/etc/supervisor/
下创建conf.d
文件夹,在其中添加类似ubuntu
中配置文件。
mkdir conf.d
代码语言:javascript复制启动:
supervisord
启动supervisorsupervisorctl
控制supervisord 启动后会看到一堆信息,但是不影响。
/usr/lib/python2.7/site-packages/supervisor/options.py:296: UserWarning:
Supervisord is running as root and it is searching for its configuration file
in default locations (including its current working directory);
you probably want to specify a "-c" argument specifying an absolute path
to a configuration file for improved security.
'Supervisord is running as root and it is searching '
可指定配置文件: supervisord -c /etc/supervisord.conf
每次修改配置后都需要重启supervisor
才能生效
supervisorctl reload
监控状态:
代码语言:javascript复制supervisorctl status
附一个sqs错误处理,redis方式不使用sqs
代码语言:javascript复制In SqsConnector.php line 26:
Class 'AwsSqsSqsClient' not found
使用 composer 安装:
代码语言:javascript复制composer require aws/aws-sdk-php-laravel