问题
在系统中上传大文件的时候,可能会因为文件过大而被网关限制,或者超时而导致失败。
我们的存储是基于minio实现s3文件存储服务。
最直接的解决方案
直接把minio开放出去作为一个s3服务,minio本身也是支持自动对文件进行分片上传的,但是这样会有一个问题,权限很难做精细化的控制,对于高安全性要求的场景就达不到安全要求。
先缓存到本地,合并成完整的文件再传到S3上
同事之前写的代码就是这样实现的,好像是可以实现需求的,但是这会对本地文件系统产生依赖,一旦系统需要部署多个节点,就会出问题,没办法保证同一个大文件的所有分片都落在同一个服务器节点上,就出问题了。
基于minio的内部接口实现分片上传
代码语言:javascript复制# 创建分片上传,返回上传id
_create_multipart_upload
# 使用上传id来上传分片
_upload_part
# 所有分片都上传完之后,需要执行这个完成上传的接口
# 这个接口执行成功之后,s3中的文件才正常
_complete_multipart_upload
这三个接口居然都是下划线开头的,难怪官方文档都找不到。
有了这三个接口,要实现分片上传并不难,基于FastAPI实现对应的三个接口:
具体代码如下:
代码语言:javascript复制import time
from minio import Minio
from minio.datatypes import Part
from fastapi import APIRouter, Body, UploadFile, File
from fastapi import BackgroundTasks
s3_client = Minio(
"192.168.1.242:19000",
access_key="xxxxxxx",
secret_key="xxxxxxx",
secure=False
)
headers = {}
headers["Content-Type"] = "application/octet-stream"
bucket_name = "test"
save_name = 'test.file'
router = APIRouter()
# 分片基础信息
parts = []
def save_to_s3(file_part, object_name: str, part_number: int, upload_id: str):
global parts
# 上传文件分片,内网测试耗时约:0.45秒
etag = s3_client._upload_part(data=file_part, bucket_name=bucket_name, object_name=object_name,
part_number=part_number, upload_id=upload_id, headers=headers)
# 将上传的分片添加到分片列表
parts.append((part_number, etag))
return
@router.post("/part/create", summary='创建分片上传')
async def create_api(
):
"""创建分片上传,获取上传ID"""
global parts
parts = []
upload_id = s3_client._create_multipart_upload(bucket_name=bucket_name, object_name=save_name, headers=headers)
return {'data': upload_id}
@router.post("/part", summary='上传分片')
async def upload_api(
background_tasks: BackgroundTasks,
part: UploadFile = File(..., title="分片文件"),
part_number: int = Body(..., title="分片序号", description='分片序号'),
upload_id: str = Body(..., title="上传ID", description='上传ID'),
):
"""分片上传"""
content = await part.read()
background_tasks.add_task(save_to_s3, content, save_name, part_number, upload_id)
return {'data': True}
@router.post("/part/finish", summary='上传完成')
async def finish_api(
upload_id: str = Body(..., title="上传ID", description='上传ID'),
part_count: int = Body(..., title="分片数量", description='分片数量'),
):
"""完成分片上传"""
if len(parts) != part_count:
return {'data': len(parts), 'status': False}
_start = time.time()
_parts = sorted(parts, key=lambda v: v[0])
_parts = [Part(*p) for p in _parts]
# 这个执行成功之后,S3才能找到对应的文件
response = s3_client._complete_multipart_upload(bucket_name, save_name, upload_id, _parts)
print(f"upload id: {upload_id}, parts: {parts}")
print(f'文件上传完成, time: {time.time()-_start}', flush=True)
return {'data': time.time()-_start, 'status': True}
为了加速大文件的上传,使用BackgroundTasks将比较耗时的分片上传到S3的过程移到后台任务中去执行。
在finish接口,特别需要注意的是,parts参数需要按分片的序号排好序,不然会报错。
另外,在minio中,分片大小不能小于5M,否则最后调用finish接口的时候会报错。
接口测试代码
代码语言:javascript复制import os
import math
import time
import requests
url_prefix = "http://127.0.0.1:8000"
def upload_large_file(file_path, object_name, part_size = 5 * 1024 * 1024):
# 创建一个multipart上传
resp = requests.post(f"{url_prefix}/upload/part/create").json()
upload_id = resp['data']
# 计算文件分片数
file_size = os.path.getsize(file_path)
part_count = int(math.ceil(file_size / part_size))
print(f"size: {file_size}, count: {part_count}, file: {file_path}")
# 逐个上传文件分片
parts = []
total_start = time.time()
with open(file_path, 'rb') as file:
for part_number in range(1, part_count 1):
start = (part_number - 1) * part_size
end = min(start part_size, file_size)
file_part = file.read(end - start)
# 上传文件分片,内网测试耗时约:0.45秒
_start = time.time()
data = {
'part_number': part_number,
'upload_id': upload_id,
}
files = {
'part': file_part,
}
resp = requests.post(f"{url_prefix}/upload/part", data=data, files=files).json()
print(f"num: {part_number}, time: {time.time()-_start}, total time: {time.time()-total_start}")
upload_time = time.time() - total_start
# 完成multipart上传,内网测试耗时:
# 如果时全新的文件上传,耗时约:0.2秒
# 如果是覆盖文件上传,耗时约:1-3秒
while True:
_start = time.time()
data = {
'part_count': part_count,
'upload_id': upload_id,
}
resp = requests.post(f"{url_prefix}/upload/part/finish", json=data).json()
print(f'文件上传完成, time: {time.time()-_start}, total: {time.time() - total_start}, upload: {upload_time}')
print(resp)
if resp['status'] == True:
break
time.sleep(1)
if __name__ == "__main__":
import sys
upload_large_file(sys.argv[1], "test-2.txt", part_size=5*1024*1024)
对于大文件,可以测试不同的分片大小,看各块的耗时情况,本地测试时,分片数量超过45个的时候,上传分片接口的延迟就会增大不少,这个可能跟系统性能是有关系的,实际应用中,应该测试一个比较合适的值。