Redis 分布式锁在 Laravel 任务调度底层实现中的应用

2021-01-12 09:45:57 浏览数 (1)

Laravel 任务调度的基本设置

在 Laravel 项目中,我们可以基于任务调度功能非常轻松地管理 Crontab 定时任务,只需在 AppConsoleKernelschedule 方法中定义所有需要调度的任务,类型包括 Artisan 命令、回调函数或者 Shell 脚本等:

代码语言:javascript复制
protected function schedule(Schedule $schedule)
{
    // 每小时调度一个 Artisan 命令
    $schedule->command('inspire')->hourly();
    // 每天调度一次回调函数清理日志
    $schedule->call(function () {
        DB::table('daily_logs')->delete();
    })->daily();
    // 每周调度一次 Shell 脚本清理缓存
    $schedule->exec('php /path/to/app/artisan cache:clear')->weekly();
}

更多任务调度定义和调度时间间隔设置,请参考任务调度官方文档。

光定义这些调度任务是没用的,还需要为其定义执行的入口,为此,我们需要在当前系统(类 Unix 系统)的 Cron 条目中新增如下这个 Shell 调度任务:

代码语言:javascript复制
* * * * * php /path/to/app/artisan schedule:run >> /dev/null 2>&1

该任务每分钟调度一次,执行的是上述定义调度任务的 Laravel 项目提供的 Artisan 命令 schedule:run,并且将标准输出和错误都重定向到空设备文件(即丢掉的意思,如果你想将输出记录下来,可以将 /dev/null 调整为对应的日志文件路径,或者你还可以为每个 Laravel 调度任务设置单独的日志输出),该命令最终运行的正是上述 AppConsoleKernelschedule 方法中注册的调度任务。

Laravel 底层会评估哪些调度任务已经到期,然后执行这些到期的调度任务,这样一来,就极大降低了通过 Cron 管理调度任务的维护成本,每次新增调度任务只需要在 schedule 方法中通过 PHP 代码定义即可,无需登录到服务器通过 crontab -e 去新增 Crontab 任务。

当 Laravel 应用处于维护模式下,默认不会执行任何任务调度。

Laravel 任务调度的入口代码

可以看到,Laravel 调度任务的基本使用非常简单,下面我们来看看它的底层是如何实现的,以及这与我们所要介绍的 Redis 分布式锁又有何关联。

我们从调度任务的入口 schedule:run Artisan 命令开始,其对应的命令类是 IlluminateConsoleSchedulingScheduleRunCommand,入口函数是 handle 方法:

代码语言:javascript复制
public function handle(Schedule $schedule, Dispatcher $dispatcher, ExceptionHandler $handler)
{
    $this->schedule = $schedule;
    $this->dispatcher = $dispatcher;
    $this->handler = $handler;

    foreach ($this->schedule->dueEvents($this->laravel) as $event) {
        if (! $event->filtersPass($this->laravel)) {
            $this->dispatcher->dispatch(new ScheduledTaskSkipped($event));

            continue;
        }

        if ($event->onOneServer) {
            $this->runSingleServerEvent($event);
        } else {
            $this->runEvent($event);
        }

        $this->eventsRan = true;
    }

    if (! $this->eventsRan) {
        $this->info('No scheduled commands are ready to run.');
    }
}

重点关注 foreach 循环部分,首先通过 this->schedule->dueEvents(

代码语言:javascript复制
public function dueEvents($app)
{
    return collect($this->events)->filter->isDue($app);
}

调度任务是如何注册的

这里的 this->events 数组是控制台应用启动时通过 AppConsoleKernelschedule 方法定义的调度任务注册的:

代码语言:javascript复制
// 注册闭包函数类型的调度任务
public function call($callback, array $parameters = [])
{
    $this->events[] = $event = new CallbackEvent(
        $this->eventMutex, $callback, $parameters, $this->timezone
    );

    return $event;
}

...

// 注册 Artisan 命令类型的调度任务
public function command($command, array $parameters = [])
{
    if (class_exists($command)) {
        $command = Container::getInstance()->make($command)->getName();
    }

    return $this->exec(
        Application::formatCommandString($command), $parameters
    );
}

...

// 注册 Shell 命令类型的调度任务
public function exec($command, array $parameters = [])
{
    if (count($parameters)) {
        $command .= ' '.$this->compileParameters($parameters);
    }

    $this->events[] = $event = new Event($this->eventMutex, $command, $this->timezone);

    return $event;
}

当然,这些方法也都是定义在 IlluminateConsoleSchedulingSchedule 中的,以 Artisan 命令类型的调度任务为例,所有的调度任务都是 IlluminateConsoleSchedulingEvent 对象实例,最终执行的则是通过 Application::formatCommandString 方法格式化后的形如 /path/to/php /path/to/app/artisan command parameters 的 Artisan 命令 Shell 脚本。

而诸如 hourlydailyweekly 之类用于表示任务调度的时间间隔方法则定义在 Event 类中引入的 ManagesFrequencies Trait 中,这里面定义了所有 Laravel 任务调度支持的时间间隔设置方法,以 hourly 方法为例,对应的实现源码如下:

这里的 $this->expression 默认值是 * * * * *,如果时间间隔设置方法是 hourly,则将第一个位置的 * 替换成为 0,最终结果是 0 * * * *,和 Cron 条目的调度时间对应,表示每小时执行一次。

可以看到,Laravel 底层其实还是通过类似 Cron 条目的 Shell 命令去执行调度任务的,只不过通过 Event 对象对其进行了封装。

调度任务是如何执行的

以上调度任务的注册是在控制台应用启动时完成的,当我们运行 Artisan 命令就会启动控制台应用,比如 schedule:run,并且会注册所有的调度任务到 Schedule 对象的 events 数组属性。

那么如何在运行 schedule:run 时获取到当前已到期的调度任务去执行呢?我们回到 ScheduledueEvents 方法继续往下看,当获取到所有有效的调度任务集合后,接下来,会基于高阶消息传递调用每个调度任务对象(Event)上的 isDue 方法:

代码语言:javascript复制
public function isDue($app)
{
    if (! $this->runsInMaintenanceMode() && $app->isDownForMaintenance()) {
        return false;
    }

    return $this->expressionPasses() &&
           $this->runsInEnvironment($app->environment());
}

如果系统处于维护模式,则退出,否则调用当前对象提供的 expressionPasses 方法判断当前调度任务是否到期可以执行(后面的 runsInEnvironment 方法用于验证是否满足定义调度任务时设置的环境约束,比如只在测试环境运行,而当前环境是生产环境,则不会运行这个调度任务,这里我们没有设置,可以忽略),返回到调用 isDue 方法的上一层代码,dueEvents 方法最终返回所有当前已到期、可以执行的、通过 Event/CallbackEvent 对象封装的调度任务。

再回到上一层 ScheduleRunCommandhandle 方法,获取到所有当前可以执行的调度任务后,对于循环遍历的每一个调度任务对象,先运行对象实例(Event)上的 filtersPass 方法判断当前任务是否需要跳过,如果定义调度任务时通过 when 方法设置了执行条件,则会调用对应的回调函数进行判断,否则忽略:

代码语言:javascript复制
foreach ($this->schedule->dueEvents($this->laravel) as $event) {
    if (! $event->filtersPass($this->laravel)) {
        $this->dispatcher->dispatch(new ScheduledTaskSkipped($event));

        continue;
    }

    if ($event->onOneServer) {
        $this->runSingleServerEvent($event);
    } else {
        $this->runEvent($event);
    }

    $this->eventsRan = true;
}
控制单台服务器运行的分布式锁

接下来,如果这个调度任务设置了只在单台服务器运行,则 $event->onOneServer 属性值为 true,执行 runSingleServerEvent 方法运行任务对象:

代码语言:javascript复制
protected function runSingleServerEvent($event)
{
    if ($this->schedule->serverShouldRun($event, $this->startedAt)) {
        $this->runEvent($event);
    } else {
        $this->line('<info>Skipping command (has already run on another server):</info> '.$event->getSummaryForDisplay());
    }
}

这里存在一个当前服务器是否满足运行条件的 serverShouldRun 校验:

代码语言:javascript复制
public function serverShouldRun(Event $event, DateTimeInterface $time)
{
    return $this->schedulingMutex->create($event, $time);
}

这里使用了通过 CacheSchedulingMutex 实现的锁来确保多台服务器启动的多个 schedule:run 命令进程同时只有一个进程可以运行:

代码语言:javascript复制
public function create(Event $event, DateTimeInterface $time)
{
    return $this->cache->store($this->store)->add(
        $event->mutexName().$time->format('Hi'), true, 3600
    );
}

这个锁是基于缓存来实现的,如果缓存驱动是 Redis,则最终调用的是 IlluminateCacheRedisStoreadd 方法,该方法只有在缓存键尚不存在的时候才会添加成功,如果键已存在,则返回 false

代码语言:javascript复制
public function add($key, $value, $seconds)
{
    $lua = "return redis.call('exists',KEYS[1])<1 and redis.call('setex',KEYS[1],ARGV[2],ARGV[1])";

    return (bool) $this->connection()->eval(
        $lua, 1, $this->prefix.$key, $this->serialize($value), (int) max(1, $seconds)
    );
}

虽然这里没有使用单个 Redis SET 指令在设置键值的同时设置缓存,以确保操作的原子性,但是在 Redis 中运行 LUA 脚本本身也是个原子操作,所以上述通过 LUA 脚本包裹的 EXISTSSETEX 指令整体运行依然是原子操作,所以我们也可以通过这种方式实现基于 Redis 的分布式锁。

回到 runSingleServerEvent 方法,如果当前还没有任何服务器持有这把锁,就可以运行后面的 this->runEvent(

这把锁的超时时间是 3600s,即 1 个小时,因为调度任务可能是个很耗时的操作,所以这里设置的时间跨度比较长。

避免调度任务重叠运行的分布式锁

无论是否限制在单台服务器运行,最终都会执行 ScheduleRunCommandrunEvent 方法,下面我们就来看看这个所有调度任务最终落地所要执行的方法:

代码语言:javascript复制
protected function runEvent($event)
{
    $this->line('<info>Running scheduled command:</info> '.$event->getSummaryForDisplay());

    $this->dispatcher->dispatch(new ScheduledTaskStarting($event));

    $start = microtime(true);

    try {
        $event->run($this->laravel);

        $this->dispatcher->dispatch(new ScheduledTaskFinished(
            $event,
            round(microtime(true) - $start, 2)
        ));

        $this->eventsRan = true;
    } catch (Throwable $e) {
        $this->dispatcher->dispatch(new ScheduledTaskFailed($event, $e));

        $this->handler->report($e);
    }
}

开始运行任务之前,会触发一个 ScheduledTaskStarting 事件,你可以在应用代码中监听这个事件并进行处理,然后,会调用调度任务对象 Event 上的 run 方法执行任务:

代码语言:javascript复制
public function run(Container $container)
{
    if ($this->withoutOverlapping &&
        ! $this->mutex->create($this)) {
        return;
    }

    $this->runInBackground
                ? $this->runCommandInBackground($container)
                : $this->runCommandInForeground($container);
}

withoutOverlapping 属性用于表示该调度任务是否允许重叠运行,默认是禁止的,要实现不同服务器或者同一台服务器上同一个调度任务不能重叠运行(比如一个耗时任务需要执行半个小时,但是调度设置成了每十分钟执行一次,就会出现任务重叠运行的问题),可以通过分布式锁来实现,因为锁天生就适用于这种同一时间、只能允许一个进程/线程进入临界区代码的场景,进而保证临界区程序运行结果的最终一致性。

显然,Laravel 底层也是这么做的,这把锁是在 IlluminateConsoleSchedulingSchedule 对象初始化的时候和 CacheSchedulingMutex 一起设置的 CacheEventMutex

代码语言:javascript复制
public function __construct($timezone = null)
{
    ...

    $this->eventMutex = $container->bound(EventMutex::class)
                            ? $container->make(EventMutex::class)
                            : $container->make(CacheEventMutex::class);

    $this->schedulingMutex = $container->bound(SchedulingMutex::class)
                            ? $container->make(SchedulingMutex::class)
                            : $container->make(CacheSchedulingMutex::class);
}

然后在注册调度任务时将其传递到 Event 对象的构造函数。和 CacheSchedulingMutex 一样,CacheEventMutex 也是基于缓存实现的,实现代码也是如出一辙:

代码语言:javascript复制
public function create(Event $event)
{
    return $this->cache->store($this->store)->add(
        $event->mutexName(), true, $event->expiresAt * 60
    );
}

对应的底层代码我就不贴出来了,如果是基于 Redis 的缓存,则最终调用的也是 RedisStoreadd 方法,过期时间更长,默认是 1 天,同样,这也是一个基于 Redis 实现的分布式锁。

回到 Event 对象中的 run 方法,如果当前命令行进程可以获取这把锁,就可以运行这个调度任务,否则退出,具体运行时,还会根据是否是后台任务进行区分,如果是后台任务,则通过 runCommandInBackground 方法运行任务,否则通过 runCommandInForeground 方法运行任务。

最后,如果调度任务运行成功,则触发 ScheduledTaskFinished 事件,否则会触发 ScheduledTaskFailed 事件,你可以在应用代码中监听这两个事件并进行相应的处理。

可以看到,在 Laravel 底层,其实是通过面向对象的 PHP 代码对 Cron 条目 Shell 命令进行了封装,以便通过更加灵活的方式来管理和维护调度任务的运行。

关于 Laravel 调度任务的底层实现和 Redis 分布式锁在这里的应用,学院君就介绍到这里,下篇教程,我们一起来探索如何通过 Redis 实现应用的限流功能。

本系列教程首发在学院君网站(xueyuanjun.com),你可以点击页面左下角阅读原文链接查看最新更新的教程。

0 人点赞