基于 Redis 消息队列实现文件上传的异步存储

2021-01-22 11:04:22 浏览数 (1)

引子

本来准备给 Redis 实战入门篇做个收尾了,不过想起来 Laravel 进阶组件部分还剩下文件存储、邮件和通知这几个功能没有介绍,不如索性一并介绍下,因为它们并不是和 Redis 风马牛不相及,我们可以将这些耗时操作通过消息队列异步处理来提升页面响应速度,优化用户体验。

至此,所有的 Laravel 进阶组件(队列、事件、广播、缓存、邮件、通知、文件存储、任务调度)都可以和 Redis 挂钩,通过 Redis 实现的缓存、分布式锁或消息队列来实现功能或优化性能,所以掌握了 Redis,对你优化 Laravel 应用性能实在是大有裨益。

接下来,我们就来看看 Redis 消息队列在文件存储、邮件和通知这几个组件中的应用。

首先来看文件存储。

异步处理的实现原理

文件上传和存储是一个耗时操作,因为既涉及到网络传输,又涉及到磁盘 IO,如果表单中包含文件上传控件,在网络带宽不高、或者网络不佳、上传文件很大等因素的响应下,通常需要等待数秒、甚至数十秒才能完成文件上传和服务端存储,进而完成表单提交工作。

在 Java、Golang 这些支持多线程/协程的应用代码中,我们可以通过开启多线程/协程的方式实现文件存储的异步处理,而在 PHP 这种不支持并发编程的单进程应用中,只能在同一个用户请求处理进程中实现文件存储,所以响应速度更慢。

不过在 Laravel 中,我们可以基于消息队列完成文件存储的异步处理:编写一个处理文件上传的任务类,当有文件上传时,将该文件的存储操作通过任务类推送到消息队列,最后通过队列处理器进程异步处理存储和其他后续操作(比如生成缩略图、存储文件信息到数据库等)。

所以,我们可以把 Laravel 消息队列看做 PHP 不支持并发/异步编程的一种补充实现,通过消息队列来模拟多进程和异步编程实现,对于一些非常耗时的操作,甚至还可以将其分解成多个子任务,然后通过启动多个处理器进程来提升队列消费速度,加速耗时任务的执行。实际上,我们可以把很多多进程编程的理念应用到这里来。

如果把 Laravel 应用比作一个餐馆的话,基于 HTTP Kernel 的路由匹配和处理可以看做是前台的接待和服务员,基于 Console Kernel 的队列处理器进程可以看做是后台的厨师和配菜员,前台接到菜单需求后立即将做菜任务推送到后台作业队列,然后不需要等到菜做完就可以将响应告知顾客(发起请求的用户)—— 你的菜品已经在准备中了。如果餐馆生意好,而后台厨师不够,可以多雇佣一些,来加快做菜队列的处理速度,避免堆积太多任务,导致上菜速度慢。整个流程如下图所示:

接下来,学院君就来给大家演示下如何通过消息队列实现文件存储的异步处理,我们将以发布文章支持上传封面图片为例进行演示。

准备模型类、数据表迁移

数据库结构变更

开始之前,先通过如下 Artisan 命令创建图片模型类和数据表迁移文件:

代码语言:javascript复制
sail artisan make:migration Image -m

编写新生成的 images 表迁移文件代码如下:

代码语言:javascript复制
<?php

use IlluminateDatabaseMigrationsMigration;
use IlluminateDatabaseSchemaBlueprint;
use IlluminateSupportFacadesSchema;

class CreateImagesTable extends Migration
{
    public function up()
    {
        Schema::create('images', function (Blueprint $table) {
            $table->id();
            $table->string('name');
            $table->string('path');
            $table->string('url');
            $table->bigInteger('user_id')->unsigned();
            $table->timestamps();
        });
    }

    public function down()
    {
        Schema::dropIfExists('images');
    }
}

然后在 posts 表中添加 image_id 字段与 images 表建立关联关系(逆向的一对多):

代码语言:javascript复制
sail artisan make:migration alter_posts_add_image_id --table=posts

编写新生成的迁移文件代码如下:

代码语言:javascript复制
<?php

use IlluminateDatabaseMigrationsMigration;
use IlluminateDatabaseSchemaBlueprint;
use IlluminateSupportFacadesSchema;

class AlterPostsAddImageId extends Migration
{
    public function up()
    {
        Schema::table('posts', function (Blueprint $table) {
            $table->bigInteger('user_id')->unsigned()->after('content')->index();
            $table->bigInteger('image_id')->unsigned()->default(0)->after('user_id')->index();
        });
    }

    public function down()
    {
        Schema::table('posts', function (Blueprint $table) {
            $table->dropColumn('user_id');
            $table->dropColumn('image_id');
        });
    }
}

我们为 posts 表新增了两个字段 —— user_idimage_id,并设置了索引。

运行 sail artisan migrate 让上述数据库变更生效。

定义模型类和关联关系

在模型类 Image 中定义其与 Post 的一对多关联:

代码语言:javascript复制
<?php

namespace AppModels;

use IlluminateDatabaseEloquentFactoriesHasFactory;
use IlluminateDatabaseEloquentModel;
use IlluminateDatabaseEloquentRelationsHasMany;

class Image extends Model
{
    use HasFactory;

    public function posts(): HasMany
    {
        return $this->hasMany(Post::class);
    }
}

在模型类 Post 中定义其与 UserImage 的逆向一对多关联:

代码语言:javascript复制
<?php

namespace AppModels;

use IlluminateDatabaseEloquentFactoriesHasFactory;
use IlluminateDatabaseEloquentModel;
use IlluminateDatabaseEloquentRelationsBelongsTo;

class Post extends Model
{
    use HasFactory;

    protected array $fillable = ['title', 'content'];
    protected array $with = ['user', 'image'];

    public function user(): BelongsTo
    {
        return $this->belongsTo(User::class);
    }

    public function image(): BelongsTo
    {
        return $this->belongsTo(Image::class);
    }
}

创建图片上传处理任务类

运行如下 Artisan 命令创建图片上传处理任务类:

代码语言:javascript复制
sail artisan make:job ImageUploadProcessor

编写 ImageUploadProcessor 类实现代码如下:

代码语言:javascript复制
<?php

namespace AppJobs;

use AppModelsImage;
use AppModelsPost;
use IlluminateBusQueueable;
use IlluminateContractsQueueShouldQueue;
use IlluminateFoundationBusDispatchable;
use IlluminateQueueInteractsWithQueue;
use IlluminateQueueSerializesModels;
use IlluminateSupportFacadesStorage;

class ImageUploadProcessor implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    // 文件名
    public string $name;
    // 文件内容
    public string $content;
    // 所属文章
    public Post $post;

    // 最大尝试次数,超过标记为执行失败
    public int $tries = 10;
    // 最大异常数,超过标记为执行失败
    public int $maxExceptions = 3;
    // 超时时间,3 分钟,超过则标记为执行失败
    public int $timeout = 180;

    public function __construct(string $name, string $content, Post $post)
    {
        $this->name = $name;
        $this->content = $content;
        $this->post = $post;
    }

    public function handle()
    {
        $path = 'images/' . $this->name;
        // 如果文件已存在,则退出
        if (Storage::disk('public')->exists($path)) {
            return;
        }
        // 文件存储成功,则将其保存到数据库,否则 5s 后重试
        if (Storage::disk('public')->put($path, base64_decode($this->content))) {
            $image = new Image();
            $image->name = $this->name;
            $image->path = $path;
            $image->url = config('app.url') . '/storage/' . $path;
            $image->user_id = $this->post->user_id;
            if ($image->save()) {
                // 图片保存成功,则更新 posts 表的 image_id 字段
                $this->post->image_id = $image->id;
                $image->posts()->save($this->post);
            } else {
                // 图片保存失败,则删除当前图片,并在 5s 后重试此任务
                Storage::disk('public')->delete($path);
                $this->release(5);
            }

            // 如果有缩略图、裁剪等后续处理,可以在这里执行
        } else {
            $this->release(5);
        }
    }
}

首先需要注意的是我们通过 namecontent 属性显示传入了上传文件的名称和二进制内容,而不是 IlluminateHttpUploadedFile 对象的引用,因为 UploadedFile 对象不能被序列化,也就意味着无法将其作为载荷数据推送到消息队列。

我们还传入了该图片所属的 Post 模型实例,以便后续更新其 image_id 属性值。

任务类推送到消息队列后,被队列处理器进程处理时执行的是 handle 方法,如果执行时文件已存在,则将该任务标记为执行成功,不再执行后续逻辑,否则会将其存储到公共磁盘的 images 目录下,存储成功,则更新数据库信息,否则延迟 5s 后继续重试。

为了让公共磁盘文件可以通过 Web URL 访问,还需要执行 sail artisan storage:link 命令创建对应的软链接。

关于文件存储和消息队列的语法细节,请参考对应的 Laravel 文档,这不是我们这里讨论的重点。

表单请求处理

完成以上后台准备工作后,就可以创建对应的前台路由、控制器动作和视图模板了。

注册路由

我们在 routes/web.php 中分别注册文章发布页面、文章发布处理和文章详情页三个路由:

代码语言:javascript复制
use AppHttpControllersPostController;
Route::get('/posts/create', [PostController::class, 'create']);
Route::post('/posts/store', [PostController::class, 'store']);

Route::get('/posts/{id}', [PostController::class, 'show']);
控制器动作

PostController 中调整 show 方法返回视图响应,并新增 createstore 方法处理对应的路由请求:

代码语言:javascript复制
use AppJobsImageUploadProcessor;
use IlluminateHttpRequest;

public function __construct(PostRepo $postRepo)
{
    $this->postRepo = $postRepo;
    // 需要登录认证后才能发布文章
    $this->middleware('auth')->only(['create', 'store']);
}

// 浏览文章
public function show($id)
{
    // 定义一个单位时间内限定请求上限的限流器,每秒最多支持 100 个请求
    return Redis::throttle("posts.${id}.show.concurrency")
        ->allow(100)->every(1)
        ->then(function () use ($id) {
            // 正常访问
            $post = $this->postRepo->getById($id);
            event(new PostViewed($post));
            return view('posts.show', ['post' => $post]);
        }, function () {
            // 触发并发访问上限
            abort(429, 'Too Many Requests');
        });
}

// 文章发布页面
public function create()
{
    return view('posts.create');
}

// 文章发布处理
public function store(Request $request)
{
    $data = $request->validate([
        'title' => 'required|string|max:200',
        'content' => 'required|string|min:10',
        'image' => 'required|image|max:1024'  // 尺寸不能超过1MB
    ]);

    $post = new Post($data);
    $post->user_id = $request->user()->id;
    try {
        if ($post->save()) {
            $image = $request->file('image');
            // 获取图片名称
            $name = $image->getClientOriginalName();
            // 获取图片二进制数据后通过 Base64 进行编码
            $content = base64_encode($image->getContent());
            // 通过图片处理任务类将图片存储工作推送到 uploads 队列异步处理
            ImageUploadProcessor::dispatch($name, $content, $post)->onQueue('uploads');
            return redirect('posts/' . $post->id);
        }
        return back()->withInput()->with(['status' => '文章发布失败,请重试']);
    } catch (QueryException $exception) {
        return back()->withInput()->with(['status' => '文章发布失败,请重试']);
    }
}

在处理文章发布的 store 方法中,我们没有直接处理图片的存储,而是通过 ImageUploadProcessor 将这些工作分发到 uploads 队列异步处理,从而提升页面响应速度,处理更多用户请求。

创建视图模板

接下来,在 resources/views/posts 目录下新创建 create.blade.php,编写文章发布页面视图模板代码如下,主要包含文章发布表单:

代码语言:javascript复制
<x-app-layout>
    <x-slot name="header">
        <h2 class="font-semibold text-xl text-gray-800 leading-tight">
            发布新文章
        </h2>
    </x-slot>

    <div class="py-12">
        <div class="max-w-7xl mx-auto sm:px-6 lg:px-8">
            <div class="bg-white overflow-hidden shadow-sm sm:rounded-lg">
                <div class="p-6 bg-white border-b border-gray-200">
                    <!-- Session Status -->
                    <x-auth-session-status class="mb-4" :status="session('status')" />

                    <!-- Validation Errors -->
                    <x-auth-validation-errors class="mb-4" :errors="$errors" />

                    <form method="POST" action="/posts/store" enctype="multipart/form-data">
                        @csrf

                        <div>
                            <x-label for="title" value="标题" />

                            <x-input id="title" class="block mt-1 w-full" type="text" name="title" :value="old('title')" required autofocus />
                        </div>

                        <div class="mt-4">
                            <x-label for="Image" value="封面图片" />

                            <x-input id="image" class="block mt-1 w-full" type="file" name="image" required autofocus />
                        </div>


                        <div class="mt-4">
                            <x-label for="content" value="内容" />

                            <x-textarea id="content" class="block mt-1 w-full" name="content" rows="5" :value="old('content')" required/>
                        </div>

                        <div class="flex items-center justify-end mt-4">
                            <x-button class="ml-3">
                                立即发布
                            </x-button>
                        </div>
                    </form>
                </div>
            </div>
        </div>
    </div>
</x-app-layout>

以及 show.blade.php 编写文章详情页视图模板:

代码语言:javascript复制
<x-app-layout>
    <x-slot name="header">
        <h2 class="font-semibold text-xl text-gray-800 leading-tight">
            {{ $post->title }}
        </h2>
    </x-slot>

    <div class="py-12">
        <div class="max-w-7xl mx-auto sm:px-6 lg:px-8">
            <div class="bg-white overflow-hidden shadow-sm sm:rounded-lg">
                <div class="p-6 bg-white border-b border-gray-200">
                    @if ($post->image_id)
                        <img src="{{ $post->image->url }}">
                    @endif
                    <div>
                        {!! $post->content !!}
                    </div>
                    <hr class="mt-4 mb-4">
                    <x-label>
                        Published by {{ $post->user_id ? $post->user->name : '学院君' }}
                        At {{ $post->created_at }}, Views: {{ $post->views }}
                    </x-label>
                </div>
            </div>
        </div>
    </div>
</x-app-layout>

测试图片存储异步处理

访问 http://redis.test/posts/create 进入文章发布页面,如果此时没有登录,会先重定向到登录页面登录,登录成功后就可以通过文章发布表单发布新文章了:

填写表单,点击「立即发布」按钮发布文章:

发布成功后即可跳转到文章详情页:

这个时候,由于没有处理图片存储,所以图片没有渲染出来,我们可以到 Redis 中查看 uploads 队列里面的任务数据:

通过在线 JSON 工具解析后就可以看到完整的包含任务类和载荷数据的 JSON 结构数据,要处理这个消息队列中的任务,需要启动队列处理器进程,通过单独的控制台应用进程执行:

代码语言:javascript复制
sail artisan queue:work --queue=events,uploads,default --tries=3

注意,这里需要指定队列名称,否则默认消费的是 default 队列,无法处理 uploads 中的队列任务。

处理完 ImageUploadProcessor 任务后,就可以在 storage/app/public/images 目录下看到对应的图片文件:

images 表中看到新增的记录:

posts 表中也可以看到相应的 image_id 字段已更新。

清空文章详情页缓存,就可以看到图片和浏览数被正常渲染了:

优化任务类载荷数据大小

现在我们已经实现了图片存储的异步处理,如果你使用了第三方云存储服务,涉及到与外部网络请求,或者还要对图片进行裁剪或者加水印等更多耗时操作,使用这种异步处理的优势将更加显著。

此外,如果涉及到与多个云存储服务交互,或者非常复杂的图片处理,比如我们在 Go 协程中演示过的图片马赛克操作,还可以通过将单个大任务分解为多个子任务,然后开启多个队列处理器进程并行运行来加速队列任务的处理,提升 CPU 的使用效率,关于这一部分的详细实现,我们留到后面专门开辟的消息队列系列教程中给大家演示。这里,我们来看另一个对单个队列任务的优化问题。

如果你看过 Redis 消息队列中图片处理任务类的载荷数据,会看到 base64 编码后的图片数据非常大,完整的消息数据大小达到了 43KB 左右,而我们设计队列任务类的一个重要原则就是载荷数据越小越好,因为太大的载荷数据会增加网络传输延时(推送任务到 Redis 队列、从 Redis 队列拉取任务都涉及到网络请求)、占用更多的内存存储空间(Redis 是基于内存的键值对数据库)、以及增加 CPU 负载(序列化、反序列化是 CPU 密集型操作),那我们是否可以对这个载荷数据的大小进行优化呢?

一种优化思路是将上传的文件临时存储到某个路径,然后将临时文件路径作为载荷数据替代之前的 base64 编码,在处理任务时再从这个临时路径加载文件,待文件处理完成后,删除这个临时文件。

不过这种优化思路的前提是队列处理器进程可以访问这个临时文件路径,如果队列处理器和 Web 应用在同一台机器,或者临时文件存储在共享目录,这种方案是可行的。

按照这个思路,修改 ImageUploadProcessor 实现代码如下:

代码语言:javascript复制
// 临时文件路径
public string $path;

public function __construct(string $name, string $path, Post $post)
{
    $this->name = $name;
    $this->path = $path;
    $this->post = $post;
}

public function handle()
{
$destPath = 'images/' . $this->name;
    // 如果目标文件已存在或者临时文件不存在,则退出
    if (Storage::disk('public')->exists($destPath) || !Storage::disk('local')->exists($this->path)) {
        return;
    }
    // 文件存储成功,则将其保存到数据库,否则 5s 后重试
    if (Storage::disk('public')->put($destPath, Storage::disk('local')->get($this->path))) {
        $image = new Image();
        $image->name = $this->name;
        $image->path = $destPath;
        $image->url = config('app.url') . '/storage/' . $destPath;
        $image->user_id = $this->post->user_id;
        if ($image->save()) {
            // 图片保存成功,则更新 posts 表的 image_id 字段
            $this->post->image_id = $image->id;
            $image->posts()->save($this->post);
            // 删除临时文件
            Storage::disk('local')->delete($this->path);
        } else {
            // 图片保存失败,则删除当前图片,并在 5s 后重试此任务
            Storage::disk('public')->delete($destPath);
            $this->release(5);
        }
    } else {
        $this->release(5);
    }
}

然后在 PostControllerstore 方法中,修改队列任务分发代码如下:

代码语言:javascript复制
$image = $request->file('image');
// 获取图片名称
$name = $image->getClientOriginalName();
// 获取图片二进制数据后通过 Base64 进行编码
// $content = base64_encode($image->getContent());
// 获取图片存储的临时路径(相对路径)
$path = $image->store('temp');
// 通过图片处理任务类将图片存储工作推送到 uploads 队列异步处理
ImageUploadProcessor::dispatch($name, $path, $post)->onQueue('uploads');

这里我们将图片临时存储到 storage/app/temp 目录下,将返回的临时文件相对路径存放到 ImageUploadProcessorpath 属性以便在处理任务类时使用。

通过文章发布表单再次发布一篇新文章,并传递一张新的图片(或者将原来的图片文件重命名):

这个时候,去查看 Redis 消息队列中的任务类载荷数据,已经变得非常小了,现在它的大小只有 1KB:

重新启动处理器进程消费 uploads 队列(由于代码有调整,所以需要重启处理器进程让修改生效):

就可以看到临时图片被删除,新的图片存储到 public/images 目录下,数据库记录和字段都更新了。清空文章详情页缓存,就可以看到图片和浏览数可以正常渲染:

好了,关于文件上传和异步存储处理学院君就简单介绍到这里,下篇教程,我们来给大家演示如何通过 Redis 消息队列优化邮件和通知发送。

本系列教程首发在Laravel学院(laravelacademy.org)

0 人点赞