MinIO的安装和SDK使用

2024-10-08 17:07:24 浏览数 (2)

MinIO 是一种开源的高性能、S3 兼容的对象存储。

Github: https://github.com/minio/minio

中文文档:https://www.minio.org.cn/

安装MinIO

下载地址:https://min.io/download?license=enterprise&platform=linux

安装MinIO Server

可以在如下脚本中设置server和控制台UI的端口。

代码语言:javascript复制
#!/bin/bash
# Minio server, client and sdk: https://min.io/download?license=enterprise&platform=linux

# shellcheck disable=SC2209
root_user=admin # 长度至少3位
root_password=admin1234 # 长度至少8位
storage_dir=/data/minio-storage # 存储目录,可以在minio server后面设置多个目录,空格隔开即可, minio server -h查看使用示例
log_file=/var/log/minio.log
port=":19000"
console_port=":19001"
mkdir -p $storage_dir
# Install Minio server
# 检查 minio 命令是否存在
if ! command -v minio &> /dev/null; then
    echo "Minio command not found, downloading..."
    wget https://dl.min.io/server/minio/release/linux-amd64/minio -O /usr/local/bin/minio
    chmod  x  /usr/local/bin/minio
    if [ $? -ne 0 ]; then
        echo "Failed to download Minio server."
        exit 1
    fi
else
    echo "Minio command found, skipping download."
fi

#export MINIO_IDENTITY_OPENID_CONFIG_URL=http://9.135.87.153:18101/.well-known/openid-configuration
#export MINIO_IDENTITY_OPENID_CLIENT_ID=testuser
#export MINIO_IDENTITY_OPENID_CLIENT_SECRET=testpassword

# Run Minio server
MINIO_ROOT_USER=$root_user MINIO_ROOT_PASSWORD=$root_password nohup minio server $storage_dir --address $port --console-address $console_port >>$log_file 2>&1 &
if [ $? -ne 0 ]; then
    echo "Failed to start Minio server."
    exit 1
fi

echo "Minio server started successfully."

安装MinIO Client

client用于连接server进行文件操作。

代码语言:javascript复制
if ! command -v mc &> /dev/null; then
    echo "Minio client command not found, downloading..."
    curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o /usr/local/bin/mc
    if [ $? -ne 0 ]; then
        echo "Failed to download Minio client."
        exit 1
    fi
    chmod  x /usr/local/bin/mc
else
    echo "Minio client command found, skipping download."
fi

Client连接Server。

代码语言:javascript复制
mc alias set 'myminio' 'http://9.135.87.153:19000' 'admin' 'admin1234'

更多Client命令参考:https://min.io/docs/minio/linux/reference/minio-mc.html?ref=docs-redirect

MinIO SDK的使用

如下是基于MinIO的Python SDK实现常见的文件操作,需要修改MinIO的相关配置信息。

代码语言:javascript复制
import io
import mimetypes
from datetime import timedelta
from pathlib import Path
import requests
from minio.deleteobjects import DeleteObject
from tenacity import retry, stop_after_attempt, wait_fixed
from urllib3 import PoolManager
from urllib3.util.retry import Retry
from minio import Minio
from minio.error import S3Error

from common.config import GLOBAL_CONFIG

global minio_pool


class MinIOPool:
    """
    MinIO的初始化和基本操作。用户和权限管理需要在管理端页面操作。
    """

    def __init__(self, endpoint, access_key, secret_key, secure=True, pool_maxsize=10):
        self._endpoint = endpoint
        self._access_key = access_key
        self._secret_key = secret_key
        self._secure = secure  # 是否使用https
        self._base_url = f"https://{endpoint}" if secure else f"http://{endpoint}"

        # 创建一个带有连接池的 urllib3.PoolManager
        self._http_client = PoolManager(
            num_pools=pool_maxsize,
            maxsize=pool_maxsize,
            retries=Retry(total=3, backoff_factor=0.3)  # 处理HTTP客户端级别的重试,比如网络连接问题、超时等
        )

        # 初始化 MinIO 客户端
        self._client = Minio(
            endpoint,
            access_key=access_key,
            secret_key=secret_key,
            secure=secure,
            http_client=self._http_client
        )

        # 补充的根据Content-Type确定文件后缀
        self._extra_type = {
            "audio/x-aac": ".aac"
        }

    def create_bucket(self, bucket_name):
        """
        创建桶
        """
        try:
            if not self._client.bucket_exists(bucket_name):
                self._client.make_bucket(bucket_name)  # 没有返回信息
                return True
            else:
                print(f"Bucket '{bucket_name}' already exists.")
                return False
        except S3Error as e:
            print(f"Error creating bucket: {e}")
            return False

    def list_buckets(self):
        """
        列出所有桶
        """
        try:
            buckets = self._client.list_buckets()  # bucket只有name和creation_date信息
            return [bucket.name for bucket in buckets]
        except S3Error as e:
            print(f"Error listing buckets: {e}")
            return []

    async def upload_local_file(self, bucket_name, object_name, file_path):
        """
        上传文件,根据object_name会自动创建目录
        fput_object 方法用于将本地文件上传到对象存储。它适用于你已经有一个文件在本地文件系统中,并且希望将这个文件上传到对象存储的场景。
        """
        try:
            if not self._client.bucket_exists(bucket_name):
                self._client.make_bucket(bucket_name)
            self._client.fput_object(bucket_name, object_name, file_path)  # 返回信息不重要
            return True
        except S3Error as e:
            print(f"Error uploading file: {e}")
            return False

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
    def _download_remote_file(self, file_url):
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                              'Chrome/126.0.0.0 Safari/537.36',
            }
            response = requests.get(file_url, stream=True, headers=headers)
            response.raise_for_status()
            return True, response
        except requests.RequestException as e:
            msg = f"Error downloading file from URL: {e}"
            print(msg)
            return False, msg

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
    async def upload_file(self, bucket_name, object_name, data, length, content_type):
        """
        put_object 方法用于将内存中的数据上传到对象存储。它适用于你已经在内存中有数据,并且希望直接上传这些数据的场景。
        """
        try:
            if not self._client.bucket_exists(bucket_name):
                self._client.make_bucket(bucket_name)
            self._client.put_object(
                bucket_name,
                object_name,
                data=data,
                length=length,
                content_type=content_type
            )
            return True, None, object_name
        except S3Error as e:
            msg = f"Error uploading file to MinIO: {e}"
            print(msg)
            return False, msg, None

    async def upload_file_from_url(self, bucket_name, object_name, file_url):
        """
        从指定URL上传文件
        """
        try:
            if not self._client.bucket_exists(bucket_name):
                self._client.make_bucket(bucket_name)

            # 下载文件
            success, response = self._download_remote_file(file_url)
            if not success:
                return success, response, None
            # 如果文件没有后缀名,获取文件的Content-Type来确定后缀名
            content_type = response.headers.get('Content-Type')
            if Path(object_name).suffix == '' and content_type:
                # 根据Content-Type获取文件扩展名
                extension = mimetypes.guess_extension(content_type)
                if extension:
                    object_name  = extension
                else:
                    if content_type in self._extra_type:
                        object_name  = self._extra_type[content_type]
                    else:
                        names = content_type.split('-')
                        extension = '.'   content_type if len(names) == 1 else names[1]
                        object_name  = extension
                        print(f"Could not determine file extension from Content-Type: {content_type},"
                              f" set it as file extension")
            elif Path(object_name).suffix == '' and content_type is None:
                print("Content-Type header is missing. Cannot determine file extension.")

            # 上传文件到 MinIO
            return await self.upload_file(
                bucket_name,
                object_name,
                data=response.raw,
                length=int(response.headers.get('content-length', 0)),
                content_type=content_type
            )
        except requests.RequestException as e:
            msg = f"Error downloading file from URL: {e}"
            print(msg)
            return False, msg, None
        except S3Error as e:
            msg = f"Error uploading file to MinIO: {e}"
            print(msg)
            return False, msg, None

    async def download_file(self, bucket_name, object_name, file_path):
        """
        下载文件,根据file_path会自动创建目录
        """
        try:
            self._client.fget_object(bucket_name, object_name, file_path)
            return True
        except S3Error as e:
            print(f"Error downloading file: {e}")
            return False

    def delete_file(self, bucket_name, object_name):
        """
        删除文件
        """
        try:
            self._client.remove_object(bucket_name, object_name)  # 没有返回信息
            return True
        except S3Error as e:
            print(f"Error deleting file: {e}")
            return False

    async def delete_bucket(self, bucket_name):
        try:
            await self.delete_files_in_directory(bucket_name=bucket_name, directory=None)
            self._client.remove_bucket(bucket_name)
        except S3Error as e:
            print(f"Error deleting bucket '{bucket_name}': {e}")

    async def delete_files_in_directory(self, bucket_name, directory):
        """
        删除指定目录下的所有文件
        """
        try:
            # 列出目录下的所有对象
            objects_to_delete = self._client.list_objects(bucket_name, prefix=directory, recursive=True)

            # 创建 DeleteObject 实例的生成器
            delete_objects = (DeleteObject(obj.object_name) for obj in objects_to_delete)

            # 批量删除所有对象
            delete_errors = self._client.remove_objects(bucket_name, delete_objects)

            # 检查删除错误
            for error in delete_errors:
                print(f"Error deleting object {error.name}: {error}")
        except S3Error as e:
            print(f"Error deleting files in directory '{directory}': {e}")

    def list_files(self, bucket_name, prefix=None):
        """
        列出桶中的所有文件
        """
        try:
            objects = self._client.list_objects(bucket_name, prefix=prefix, recursive=True)  # 返回信息不重要
            return [obj.object_name for obj in objects]
        except S3Error as e:
            print(f"Error listing files: {e}")
            return []

    def is_directory_existing(self, bucket_name, directory_prefix):
        try:
            # 列出以 directory_prefix 开头的对象
            objects = self._client.list_objects(bucket_name, prefix=directory_prefix, recursive=True)
            for obj in objects:
                # 如果有任何对象以该前缀开头,则目录存在
                return True
            return False
        except S3Error as err:
            print(f"Error occurred: {err}")
            return False

    def get_presigned_url(self, bucket_name, object_name, expiry=timedelta(hours=1)):
        """
        获取对象的预签名URL
        """
        try:
            url = self._client.presigned_get_object(bucket_name, object_name, expires=expiry)
            return url
        except S3Error as e:
            print(f"Error generating presigned URL for object '{object_name}': {e}")
            return None

    def get_presigned_put_url(self, bucket_name, object_name, expiry=timedelta(hours=1)):
        """
        获取用于上传对象的预签名URL
        生成预签名 URL 后,可以使用 HTTP 客户端(如 requests 库)来上传对象。
        with open('path/to/your/file', 'rb') as file_data:
            response = requests.put(url, data=file_data)
        """
        try:
            url = self._client.presigned_put_object(bucket_name, object_name, expires=expiry)
            return url
        except S3Error as e:
            print(f"Error generating presigned PUT URL for object '{object_name}': {e}")
            return None


def init_minio():
    params = GLOBAL_CONFIG['minio']
    global minio_pool
    minio_pool = MinIOPool(params['endpoint'], params['access_key'], params['secret_key'], params['secure'],
                           params['pool_maxsize'])


if __name__ == "__main__":
    minio_pool = MinIOPool('127.0.0.1:19000', '7aKcYgJo1v2Z3xxx',
                           '8kLsg5NqprJnreMUxxx', secure=False)
    minio_pool.create_bucket('test')
    print(f'buckets:{minio_pool.list_buckets()}')
    # minio_pool.upload_file('test', 'a/test_name', 'utils.py')
    # minio_pool.download_file('test','a/test_name', 'a/utils2.py')
    # minio_pool.delete_file('test','a')
    # minio_pool.delete_files_in_directory('test1', 'b')
    # minio_pool.upload_file_from_url('test1','c/audio1', 'https://media.xyzcdn.net/luru5o7IKOQCiqRSo53Le_cu03E7.m4a')
    print(f'files:{minio_pool.list_files("test")}')
   
    # minio_pool.delete_bucket('test2')
    print(f'buckets:{minio_pool.list_buckets()}')

0 人点赞