基于minio实现大文件的分片上传功能

2023-08-26 15:06:15 浏览数 (2)

问题

在系统中上传大文件的时候,可能会因为文件过大而被网关限制,或者超时而导致失败。

我们的存储是基于minio实现s3文件存储服务。

最直接的解决方案

直接把minio开放出去作为一个s3服务,minio本身也是支持自动对文件进行分片上传的,但是这样会有一个问题,权限很难做精细化的控制,对于高安全性要求的场景就达不到安全要求。

先缓存到本地,合并成完整的文件再传到S3上

就是通过接口接收每个分片,存储到本地,当接收了所有的分片之后,再将文件合并成一个大文件,再上传到S3上。

同事之前写的代码就是这样实现的,好像是可以实现需求的,但是这会对本地文件系统产生依赖,一旦系统需要部署多个节点,就会出问题,没办法保证同一个大文件的所有分片都落在同一个服务器节点上,就出问题了。

基于minio的内部接口实现分片上传

网上找了半天,官方文档也找了,并没有找到minio可以自助实现切片上传的接口,后来翻看接口才找到以下几个接口:
代码语言:javascript复制
# 创建分片上传,返回上传id
_create_multipart_upload
# 使用上传id来上传分片
_upload_part
# 所有分片都上传完之后,需要执行这个完成上传的接口
# 这个接口执行成功之后,s3中的文件才正常
_complete_multipart_upload

这三个接口居然都是下划线开头的,难怪官方文档都找不到。

有了这三个接口,要实现分片上传并不难,基于FastAPI实现对应的三个接口:

具体代码如下:

代码语言:javascript复制
import time
from minio import Minio
from minio.datatypes import Part
from fastapi import APIRouter, Body, UploadFile, File
from fastapi import BackgroundTasks

s3_client  = Minio(
    "192.168.1.242:19000",
    access_key="xxxxxxx",
    secret_key="xxxxxxx",
    secure=False
)
headers = {}
headers["Content-Type"] = "application/octet-stream"
bucket_name = "test"
save_name = 'test.file'

router = APIRouter()

# 分片基础信息
parts = []

def save_to_s3(file_part, object_name: str, part_number: int, upload_id: str):
    global parts
    # 上传文件分片,内网测试耗时约:0.45秒
    etag = s3_client._upload_part(data=file_part, bucket_name=bucket_name, object_name=object_name,
                                  part_number=part_number, upload_id=upload_id, headers=headers)

    # 将上传的分片添加到分片列表
    parts.append((part_number, etag))
    return

@router.post("/part/create", summary='创建分片上传')
async def create_api(
):
    """创建分片上传,获取上传ID"""
    global parts
    parts = []
    upload_id = s3_client._create_multipart_upload(bucket_name=bucket_name, object_name=save_name, headers=headers)
    return {'data': upload_id}

@router.post("/part", summary='上传分片')
async def upload_api(
    background_tasks: BackgroundTasks,
    part: UploadFile = File(..., title="分片文件"),
    part_number: int = Body(..., title="分片序号", description='分片序号'),
    upload_id: str = Body(..., title="上传ID", description='上传ID'),
):
    """分片上传"""
    content = await part.read()
    background_tasks.add_task(save_to_s3, content, save_name, part_number, upload_id)
    return {'data': True}

@router.post("/part/finish", summary='上传完成')
async def finish_api(
    upload_id: str = Body(..., title="上传ID", description='上传ID'),
    part_count: int = Body(..., title="分片数量", description='分片数量'),
):
    """完成分片上传"""
    if len(parts) != part_count:
        return {'data': len(parts), 'status': False}
    _start = time.time()
    _parts = sorted(parts, key=lambda v: v[0])
    _parts = [Part(*p) for p in _parts]
    # 这个执行成功之后,S3才能找到对应的文件
    response = s3_client._complete_multipart_upload(bucket_name, save_name, upload_id, _parts)
    print(f"upload id: {upload_id}, parts: {parts}")
    print(f'文件上传完成, time: {time.time()-_start}', flush=True)
    return {'data': time.time()-_start, 'status': True}

为了加速大文件的上传,使用BackgroundTasks将比较耗时的分片上传到S3的过程移到后台任务中去执行。

在finish接口,特别需要注意的是,parts参数需要按分片的序号排好序,不然会报错。

另外,在minio中,分片大小不能小于5M,否则最后调用finish接口的时候会报错。

接口测试代码

测试代码如下:
代码语言:javascript复制
import os
import math
import time
import requests

url_prefix = "http://127.0.0.1:8000"

def upload_large_file(file_path, object_name, part_size = 5 * 1024 * 1024):
    # 创建一个multipart上传
    resp = requests.post(f"{url_prefix}/upload/part/create").json()
    upload_id = resp['data']

    # 计算文件分片数
    file_size = os.path.getsize(file_path)
    part_count = int(math.ceil(file_size / part_size))
    print(f"size: {file_size}, count: {part_count}, file: {file_path}")

    # 逐个上传文件分片
    parts = []
    total_start = time.time()
    with open(file_path, 'rb') as file:
        for part_number in range(1, part_count   1):
            start = (part_number - 1) * part_size
            end = min(start   part_size, file_size)
            file_part = file.read(end - start)

            # 上传文件分片,内网测试耗时约:0.45秒
            _start = time.time()
            data = {
                'part_number': part_number,
                'upload_id': upload_id,
            }
            files = {
                'part': file_part,
            }
            resp = requests.post(f"{url_prefix}/upload/part", data=data, files=files).json()
            print(f"num: {part_number}, time: {time.time()-_start}, total time: {time.time()-total_start}")

    upload_time = time.time() - total_start
    # 完成multipart上传,内网测试耗时:
    # 如果时全新的文件上传,耗时约:0.2秒
    # 如果是覆盖文件上传,耗时约:1-3秒
    while True:
        _start = time.time()
        data = {
            'part_count': part_count,
            'upload_id': upload_id,
        }
        resp = requests.post(f"{url_prefix}/upload/part/finish", json=data).json()
        print(f'文件上传完成, time: {time.time()-_start}, total: {time.time() - total_start}, upload: {upload_time}')
        print(resp)
        if resp['status'] == True:
            break
        time.sleep(1)

if __name__ == "__main__":
    import sys
    upload_large_file(sys.argv[1], "test-2.txt", part_size=5*1024*1024)

对于大文件,可以测试不同的分片大小,看各块的耗时情况,本地测试时,分片数量超过45个的时候,上传分片接口的延迟就会增大不少,这个可能跟系统性能是有关系的,实际应用中,应该测试一个比较合适的值。

0 人点赞