基于 Redis 实现简单限流器及其在路由中间件中的应用

2021-01-22 10:59:01 浏览数 (1)

限流器的概念

作为一个分布式存储中间件,我们还可以基于 Redis 实现限流器功能。

所谓限流器,指的是限制访问指定服务/路由的流量,通俗点说,就是限制单位时间内访问指定服务/路由的次数(频率),从系统架构角度看,通过限流器可以有效避免短时间内的异常高并发请求导致系统负载过高,从而达到保护系统的目的,另外对于一些日常的业务功能,也可以通过限流器避免垃圾流量,比如用户注册、文章发布、用户评论等,通过限流可以有效阻止垃圾用户的批量注册和发布。

简单实现方案

结合单位时间、访问上限、访问次数等要素,我们会很轻松地联想到可以通过 Redis 的字符串数据结构实现限流器功能:

  • 通过 SET 指令初始化限流器的键(基于用户 ID、IP 地址等标识来源的变量进行拼接)、值(访问上限);
  • 首次访问某个服务/路由时,通过 ADD 指令初始化一个新的统计键值对,并设置有效期,后续在该有效期内访问同一个服务/路由,通过 INCREMENT 指令对键值做自增操作;
  • 当该服务/路由的访问次数超过限流器设置的访问上限,则拒绝后续访问。

在 Laravel 应用中,路由的访问频率限制功能底层使用的就是通过这种机制实现的限流器。

限流中间件在 Laravel 中的使用

我们知道,在 Laravel 项目中,可以通过 RateLimiter 门面的 for 方法来定义限流逻辑比将其应用到路由中(详见路由文档),也可以直接在 Laravel 路由中应用限流中间件:

代码语言:javascript复制
Route::get('/', function () {
    return view('welcome');
})->middleware('throttle:10,1');

上述 throttle:10,1 的含义是 1 分钟内最多只能访问 / 路由 10 次,超过限流上限,则返回 429 响应:

对于 Laravel 的 API 路由,默认使用了这个限流中间件(下面这段代码位于 app/Http/Kernel.php):

代码语言:javascript复制
protected $middlewareGroups = [
    ...

    'api' => [
        'throttle:api',
        IlluminateRoutingMiddlewareSubstituteBindings::class,
    ],
];

对应的 api 访问频率限制定义位于 app/Providers/RouteServiceProvider.phpconfigureRateLimiting 方法中:

代码语言:javascript复制
protected function configureRateLimiting()
{
    RateLimiter::for('api', function (Request $request) {
        return Limit::perMinute(60)->by(optional($request->user())->id ?: $request->ip());
    });
}

表示限制用户 1 分钟只能访问应用了 throttle:api 中间件的路由 60 次,如果要指定用户标识,可以通过 by 方法指定,这里指定的是如果用户已登录,则使用用户 ID,否则使用客户端 IP 地址,这也是 throttle 中间件的默认用户标识逻辑。

下面我们来分析下 Laravel 路由限流中间件 throttle 的底层实现源码,看看它到底是怎么实现限流器的。

限流中间件实现源码分析

中间件底层初始化处理

其实 throttle 是个别名,真正的中间件类名是 ThrottleRequests(以下映射关系定义在 app/Http/Kernel.php):

代码语言:javascript复制
protected $routeMiddleware = [
    ...
    'throttle' => IlluminateRoutingMiddlewareThrottleRequests::class,
    ...
];

所以我们需要打开 ThrottleRequests 类所在的文件一探究竟,执行中间件调用的是 handle 方法,我们从这个方法切入:

代码语言:javascript复制
...

class ThrottleRequests
{
    use InteractsWithTime;

    /**
     * The rate limiter instance.
     *
     * @var IlluminateCacheRateLimiter
     */
    protected $limiter;

    public function __construct(RateLimiter $limiter)
    {
        $this->limiter = $limiter;
    }

    public function handle($request, Closure $next, $maxAttempts = 60, $decayMinutes = 1, $prefix = '')
    {
        if (is_string($maxAttempts)
            && func_num_args() === 3
            && ! is_null($limiter = $this->limiter->limiter($maxAttempts))) {
            return $this->handleRequestUsingNamedLimiter($request, $next, $maxAttempts, $limiter);
        }

        return $this->handleRequest(
            $request,
            $next,
            [
                (object) [
                    'key' => $prefix.$this->resolveRequestSignature($request),
                    'maxAttempts' => $this->resolveMaxAttempts($request, $maxAttempts),
                    'decayMinutes' => $decayMinutes,
                    'responseCallback' => null,
                ],
            ]
        );
    }

    ...
}

这里的 $this->limiter 对应的是和 IlluminateCacheRateLimiter 对象实例(RateLimiter 门面代理的也是这个对象实例),这就是路由访问频率限制中间件底层使用的限流器。它是基于缓存系统驱动的,目前的缓存驱动是 Redis,所以最终也是基于 Redis 实现的。

handle 方法中 if 代码区块的含义是如果此前通过 RateLimiter::for 方法定义过当前中间件的访问频率限制,比如上面的 throttle:api,则通过 handleRequestUsingNamedLimiter 方法处理访问频率限制,否则通过 handleRequest 方法处理(比如上面的 throttle:10,1),handleRequestUsingNamedLimiter 方法最终也会调用这个方法处理,只是会新增一些额外的处理逻辑而已:

代码语言:javascript复制
protected function handleRequestUsingNamedLimiter($request, Closure $next, $limiterName, Closure $limiter)
{
    $limiterResponse = call_user_func($limiter, $request);

    if ($limiterResponse instanceof Response) {
        return $limiterResponse;
    } elseif ($limiterResponse instanceof Unlimited) {
        return $next($request);
    }

    return $this->handleRequest(
        $request,
        $next,
        collect(Arr::wrap($limiterResponse))->map(function ($limit) use ($limiterName) {
            return (object) [
                'key' => md5($limiterName.$limit->key),
                'maxAttempts' => $limit->maxAttempts,
                'decayMinutes' => $limit->decayMinutes,
                'responseCallback' => $limit->responseCallback,
            ];
        })->all()
    );
}

我们抛开这个繁杂的与限流器无关的表象,直击本质,分析 handleRequest 方法源码:

代码语言:javascript复制
protected function handleRequest($request, Closure $next, array $limits)
{
    foreach ($limits as $limit) {
        if ($this->limiter->tooManyAttempts($limit->key, $limit->maxAttempts)) {
            throw $this->buildException($request, $limit->key, $limit->maxAttempts, $limit->responseCallback);
        }

        $this->limiter->hit($limit->key, $limit->decayMinutes * 60);
    }

    $response = $next($request);

    foreach ($limits as $limit) {
        $response = $this->addHeaders(
            $response,
            $limit->maxAttempts,
            $this->calculateRemainingAttempts($limit->key, $limit->maxAttempts)
        );
    }

    return $response;
}

以最原始的 throttle:10,1 为例,$limits 中只包含了一个对象:

代码语言:javascript复制
(object) [
    'key' => $prefix.$this->resolveRequestSignature($request),
    'maxAttempts' => $this->resolveMaxAttempts($request, $maxAttempts),
    'decayMinutes' => $decayMinutes,
    'responseCallback' => null,
],

键名 key 是当前对象的 resolveRequestSignature 方法的返回值:

如果用户已登录,使用用户 ID 的哈希值,否则使用应用域名 | 客户端 IP 地址的哈希值。

最大访问次数上限是当前对象的 resolveRequestSignature 方法的返回值:

可以看到,限流中间件支持为用户设置不同的访问次数上限,默认应用中间件时传入的参数值,这里是 10。

限定的时间窗口是中间件传入的参数值,这里是 1。

回到 ThrottleRequestshandleRequest 方法,我们可以通过 RateLimitertooManyAttempts 方法判断当前请求是否已经触发限流器的访问上限,初次访问返回值是 false,如果触发访问上限,则调用当前对象的 buildException 方法返回 429 响应。

Redis 限流器的底层实现

如果没有触发访问上限,则继续往下走,调用 RateLimiterhit 方法初始化限流器和访问统计:

这里是 Redis 限流器的主体实现逻辑所在。

底层的限流器通过 $this->cache->add 初始化,键名是 ThrottleRequests 中间件对象设置的键名 :timer组合而成,键值随意,有效期是 ThrottleRequests 中间件传入的单位时间值。

底层最终调用 RedisStoreadd 方法设置,参考上篇教程分布式锁中这个方法的介绍,同样,这也是个原子操作。

接下来,又是一个 $this->cache->add 调用,这不是 Laravel 底层代码的 bug 哈,而是用于统计当前用户访问次数的另一个键值对,键名和 ThrottleRequests 中间件对象设置的键名一致,键值初始化的时候是 0,后续通过 RedisStoreincrement 方法做自增操作,并返回自增后的值返回。

我们跳到 RateLimitertooManyAttempts 方法再次过一下限流器的访问上限判断逻辑:

this->attempts(

代码语言:javascript复制
public function attempts($key)
{
    return $this->cache->get($key, 0);
}

则进一步判断限流器对应的键值是否存在,如果存在,则触发访问上限,否则删除用户访问统计键值对,重新开始统计。

返回响应给用户

回到上一层 handleRequest 方法,如果没有触发访问上限,接下来会调用 next(request) 执行路由处理器代码,返回响应给用户。在响应头中,会添加访问上限和剩余可用访问次数字段:

小结

这只是 Redis 限流器的最简单实现版本,除此之外,还可以基于时间窗口和漏斗算法实现更加高级的限流器,Laravel 队列系统中的频率限制功能就是基于这种限流器实现的,下篇教程,学院君就来给大家介绍如何实现更高级的 Redis 限流器及其在 Laravel 底层的实现源码。

0 人点赞