MinIO 是一种开源的高性能、S3 兼容的对象存储。
Github: https://github.com/minio/minio
中文文档:https://www.minio.org.cn/
安装MinIO
下载地址:https://min.io/download?license=enterprise&platform=linux
安装MinIO Server
可以在如下脚本中设置server和控制台UI的端口。
代码语言:javascript复制#!/bin/bash
# Minio server, client and sdk: https://min.io/download?license=enterprise&platform=linux
# shellcheck disable=SC2209
root_user=admin # 长度至少3位
root_password=admin1234 # 长度至少8位
storage_dir=/data/minio-storage # 存储目录,可以在minio server后面设置多个目录,空格隔开即可, minio server -h查看使用示例
log_file=/var/log/minio.log
port=":19000"
console_port=":19001"
mkdir -p $storage_dir
# Install Minio server
# 检查 minio 命令是否存在
if ! command -v minio &> /dev/null; then
echo "Minio command not found, downloading..."
wget https://dl.min.io/server/minio/release/linux-amd64/minio -O /usr/local/bin/minio
chmod x /usr/local/bin/minio
if [ $? -ne 0 ]; then
echo "Failed to download Minio server."
exit 1
fi
else
echo "Minio command found, skipping download."
fi
#export MINIO_IDENTITY_OPENID_CONFIG_URL=http://9.135.87.153:18101/.well-known/openid-configuration
#export MINIO_IDENTITY_OPENID_CLIENT_ID=testuser
#export MINIO_IDENTITY_OPENID_CLIENT_SECRET=testpassword
# Run Minio server
MINIO_ROOT_USER=$root_user MINIO_ROOT_PASSWORD=$root_password nohup minio server $storage_dir --address $port --console-address $console_port >>$log_file 2>&1 &
if [ $? -ne 0 ]; then
echo "Failed to start Minio server."
exit 1
fi
echo "Minio server started successfully."
安装MinIO Client
client用于连接server进行文件操作。
代码语言:javascript复制if ! command -v mc &> /dev/null; then
echo "Minio client command not found, downloading..."
curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o /usr/local/bin/mc
if [ $? -ne 0 ]; then
echo "Failed to download Minio client."
exit 1
fi
chmod x /usr/local/bin/mc
else
echo "Minio client command found, skipping download."
fi
Client连接Server。
代码语言:javascript复制mc alias set 'myminio' 'http://9.135.87.153:19000' 'admin' 'admin1234'
更多Client命令参考:https://min.io/docs/minio/linux/reference/minio-mc.html?ref=docs-redirect
MinIO SDK的使用
如下是基于MinIO的Python SDK实现常见的文件操作,需要修改MinIO的相关配置信息。
代码语言:javascript复制import io
import mimetypes
from datetime import timedelta
from pathlib import Path
import requests
from minio.deleteobjects import DeleteObject
from tenacity import retry, stop_after_attempt, wait_fixed
from urllib3 import PoolManager
from urllib3.util.retry import Retry
from minio import Minio
from minio.error import S3Error
from common.config import GLOBAL_CONFIG
global minio_pool
class MinIOPool:
"""
MinIO的初始化和基本操作。用户和权限管理需要在管理端页面操作。
"""
def __init__(self, endpoint, access_key, secret_key, secure=True, pool_maxsize=10):
self._endpoint = endpoint
self._access_key = access_key
self._secret_key = secret_key
self._secure = secure # 是否使用https
self._base_url = f"https://{endpoint}" if secure else f"http://{endpoint}"
# 创建一个带有连接池的 urllib3.PoolManager
self._http_client = PoolManager(
num_pools=pool_maxsize,
maxsize=pool_maxsize,
retries=Retry(total=3, backoff_factor=0.3) # 处理HTTP客户端级别的重试,比如网络连接问题、超时等
)
# 初始化 MinIO 客户端
self._client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure,
http_client=self._http_client
)
# 补充的根据Content-Type确定文件后缀
self._extra_type = {
"audio/x-aac": ".aac"
}
def create_bucket(self, bucket_name):
"""
创建桶
"""
try:
if not self._client.bucket_exists(bucket_name):
self._client.make_bucket(bucket_name) # 没有返回信息
return True
else:
print(f"Bucket '{bucket_name}' already exists.")
return False
except S3Error as e:
print(f"Error creating bucket: {e}")
return False
def list_buckets(self):
"""
列出所有桶
"""
try:
buckets = self._client.list_buckets() # bucket只有name和creation_date信息
return [bucket.name for bucket in buckets]
except S3Error as e:
print(f"Error listing buckets: {e}")
return []
async def upload_local_file(self, bucket_name, object_name, file_path):
"""
上传文件,根据object_name会自动创建目录
fput_object 方法用于将本地文件上传到对象存储。它适用于你已经有一个文件在本地文件系统中,并且希望将这个文件上传到对象存储的场景。
"""
try:
if not self._client.bucket_exists(bucket_name):
self._client.make_bucket(bucket_name)
self._client.fput_object(bucket_name, object_name, file_path) # 返回信息不重要
return True
except S3Error as e:
print(f"Error uploading file: {e}")
return False
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def _download_remote_file(self, file_url):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/126.0.0.0 Safari/537.36',
}
response = requests.get(file_url, stream=True, headers=headers)
response.raise_for_status()
return True, response
except requests.RequestException as e:
msg = f"Error downloading file from URL: {e}"
print(msg)
return False, msg
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def upload_file(self, bucket_name, object_name, data, length, content_type):
"""
put_object 方法用于将内存中的数据上传到对象存储。它适用于你已经在内存中有数据,并且希望直接上传这些数据的场景。
"""
try:
if not self._client.bucket_exists(bucket_name):
self._client.make_bucket(bucket_name)
self._client.put_object(
bucket_name,
object_name,
data=data,
length=length,
content_type=content_type
)
return True, None, object_name
except S3Error as e:
msg = f"Error uploading file to MinIO: {e}"
print(msg)
return False, msg, None
async def upload_file_from_url(self, bucket_name, object_name, file_url):
"""
从指定URL上传文件
"""
try:
if not self._client.bucket_exists(bucket_name):
self._client.make_bucket(bucket_name)
# 下载文件
success, response = self._download_remote_file(file_url)
if not success:
return success, response, None
# 如果文件没有后缀名,获取文件的Content-Type来确定后缀名
content_type = response.headers.get('Content-Type')
if Path(object_name).suffix == '' and content_type:
# 根据Content-Type获取文件扩展名
extension = mimetypes.guess_extension(content_type)
if extension:
object_name = extension
else:
if content_type in self._extra_type:
object_name = self._extra_type[content_type]
else:
names = content_type.split('-')
extension = '.' content_type if len(names) == 1 else names[1]
object_name = extension
print(f"Could not determine file extension from Content-Type: {content_type},"
f" set it as file extension")
elif Path(object_name).suffix == '' and content_type is None:
print("Content-Type header is missing. Cannot determine file extension.")
# 上传文件到 MinIO
return await self.upload_file(
bucket_name,
object_name,
data=response.raw,
length=int(response.headers.get('content-length', 0)),
content_type=content_type
)
except requests.RequestException as e:
msg = f"Error downloading file from URL: {e}"
print(msg)
return False, msg, None
except S3Error as e:
msg = f"Error uploading file to MinIO: {e}"
print(msg)
return False, msg, None
async def download_file(self, bucket_name, object_name, file_path):
"""
下载文件,根据file_path会自动创建目录
"""
try:
self._client.fget_object(bucket_name, object_name, file_path)
return True
except S3Error as e:
print(f"Error downloading file: {e}")
return False
def delete_file(self, bucket_name, object_name):
"""
删除文件
"""
try:
self._client.remove_object(bucket_name, object_name) # 没有返回信息
return True
except S3Error as e:
print(f"Error deleting file: {e}")
return False
async def delete_bucket(self, bucket_name):
try:
await self.delete_files_in_directory(bucket_name=bucket_name, directory=None)
self._client.remove_bucket(bucket_name)
except S3Error as e:
print(f"Error deleting bucket '{bucket_name}': {e}")
async def delete_files_in_directory(self, bucket_name, directory):
"""
删除指定目录下的所有文件
"""
try:
# 列出目录下的所有对象
objects_to_delete = self._client.list_objects(bucket_name, prefix=directory, recursive=True)
# 创建 DeleteObject 实例的生成器
delete_objects = (DeleteObject(obj.object_name) for obj in objects_to_delete)
# 批量删除所有对象
delete_errors = self._client.remove_objects(bucket_name, delete_objects)
# 检查删除错误
for error in delete_errors:
print(f"Error deleting object {error.name}: {error}")
except S3Error as e:
print(f"Error deleting files in directory '{directory}': {e}")
def list_files(self, bucket_name, prefix=None):
"""
列出桶中的所有文件
"""
try:
objects = self._client.list_objects(bucket_name, prefix=prefix, recursive=True) # 返回信息不重要
return [obj.object_name for obj in objects]
except S3Error as e:
print(f"Error listing files: {e}")
return []
def is_directory_existing(self, bucket_name, directory_prefix):
try:
# 列出以 directory_prefix 开头的对象
objects = self._client.list_objects(bucket_name, prefix=directory_prefix, recursive=True)
for obj in objects:
# 如果有任何对象以该前缀开头,则目录存在
return True
return False
except S3Error as err:
print(f"Error occurred: {err}")
return False
def get_presigned_url(self, bucket_name, object_name, expiry=timedelta(hours=1)):
"""
获取对象的预签名URL
"""
try:
url = self._client.presigned_get_object(bucket_name, object_name, expires=expiry)
return url
except S3Error as e:
print(f"Error generating presigned URL for object '{object_name}': {e}")
return None
def get_presigned_put_url(self, bucket_name, object_name, expiry=timedelta(hours=1)):
"""
获取用于上传对象的预签名URL
生成预签名 URL 后,可以使用 HTTP 客户端(如 requests 库)来上传对象。
with open('path/to/your/file', 'rb') as file_data:
response = requests.put(url, data=file_data)
"""
try:
url = self._client.presigned_put_object(bucket_name, object_name, expires=expiry)
return url
except S3Error as e:
print(f"Error generating presigned PUT URL for object '{object_name}': {e}")
return None
def init_minio():
params = GLOBAL_CONFIG['minio']
global minio_pool
minio_pool = MinIOPool(params['endpoint'], params['access_key'], params['secret_key'], params['secure'],
params['pool_maxsize'])
if __name__ == "__main__":
minio_pool = MinIOPool('127.0.0.1:19000', '7aKcYgJo1v2Z3xxx',
'8kLsg5NqprJnreMUxxx', secure=False)
minio_pool.create_bucket('test')
print(f'buckets:{minio_pool.list_buckets()}')
# minio_pool.upload_file('test', 'a/test_name', 'utils.py')
# minio_pool.download_file('test','a/test_name', 'a/utils2.py')
# minio_pool.delete_file('test','a')
# minio_pool.delete_files_in_directory('test1', 'b')
# minio_pool.upload_file_from_url('test1','c/audio1', 'https://media.xyzcdn.net/luru5o7IKOQCiqRSo53Le_cu03E7.m4a')
print(f'files:{minio_pool.list_files("test")}')
# minio_pool.delete_bucket('test2')
print(f'buckets:{minio_pool.list_buckets()}')