对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。Minio 除了直接作为对象存储使用,还可以作为云上对象存储服务的网关层,无缝对接到 Amazon S3、MicroSoft Azure。 在学习minio的源码之前,先阅读下minio的客户端mc和golang sdk minio-go
代码语言:javascript复制https://github.com/minio/minio-go
https://github.com/minio/mc
1,MC
mc是在 minio-go的基础上做了命令行的包装,常用的命令如下:
代码语言:javascript复制ls 列出文件和文件夹。
mb 创建一个存储桶或一个文件夹。
cat 显示文件和对象内容。
pipe 将一个STDIN重定向到一个对象或者文件或者STDOUT。
share 生成用于共享的URL。
cp 拷贝文件和对象。
mirror 给存储桶和文件夹做镜像。
find 基于参数查找文件。
diff 对两个文件夹或者存储桶比较差异。
rm 删除文件和对象。
events 管理对象通知。
watch 监听文件和对象的事件。
policy 管理访问策略。
session 为cp命令管理保存的会话。
config 管理mc配置文件。
update 检查软件更新。
version 输出版本信息。
当然,入口函数还是main.go:
代码语言:javascript复制func main() {
mc.Main(os.Args)
}
对应的每一个命令的实现在cmd目录下,由于mc的命令有自己特殊的模板,所以它没有用常用的cobra,而是自己定义了一套minio/cli,首先我们看下Main函数,它定义在cmd/main.go中
代码语言:javascript复制func Main(args []string)
mainComplete()
defer profile.Start(profile.CPUProfile, profile.ProfilePath(mustGetProfileDir())).Stop()
probe.Init()
if err := registerApp(appName).Run(args); err != nil
mainComplete定义在cmd/auto-complete.go
代码语言:javascript复制func mainComplete() error
for _, cmd := range appCmds {
if cmd.Hidden {
continue
}
complCmds[cmd.Name] = cmdToCompleteCmd(cmd, "")
}
mcComplete := complete.Command{
Sub: complCmds,
GlobalFlags: complFlags,
}
complete.New(filepath.Base(os.Args[0]), mcComplete).Run()
它把appCmds里面的命令注册成mc的子命令,appCmds是子命令列表,定义在cmd/main.go中:
代码语言:javascript复制var appCmds = []cli.Command{
aliasCmd,
lsCmd,
mbCmd,
rbCmd,
cpCmd,
mirrorCmd,
catCmd,
headCmd,
pipeCmd,
shareCmd,
findCmd,
sqlCmd,
statCmd,
mvCmd,
treeCmd,
duCmd,
retentionCmd,
legalHoldCmd,
diffCmd,
rmCmd,
versionCmd,
ilmCmd,
encryptCmd,
eventCmd,
watchCmd,
undoCmd,
anonymousCmd,
policyCmd,
tagCmd,
replicateCmd,
adminCmd,
configCmd,
updateCmd,
}
probe定义在pkg/probe/probe.go
代码语言:javascript复制 func Init() {
_, file, _, _ := runtime.Caller(1)
rootPath = filepath.Dir(file)
其中的Run命令是minio/cli框架的接口,分别定义在
minio/cli@v1.22.0/app.go
代码语言:javascript复制func (a *App) Run(arguments []string) (err error)
a.Setup()
c := a.Command(name)
if c != nil {
return c.Run(context)
}
minio/cli@v1.22.0/command.go
代码语言:javascript复制func (c Command) Run(ctx *Context) (err error)
err = HandleAction(c.Action, context)
下面以tree命令为例,看下实现的细节: cmd/tree-main.go
代码语言:javascript复制var treeCmd = cli.Command{
Name: "tree",
Usage: "list buckets and objects in a tree format",
Action: mainTree,
OnUsageError: onUsageError,
Before: setGlobalsFromContext,
Flags: append(treeFlags, globalFlags...),
CustomHelpTemplate
对应执行的命令是mainTree
代码语言:javascript复制func mainTree(cliCtx *cli.Context) error
args, depth, includeFiles, timeRef := parseTreeSyntax(ctx, cliCtx)
if e := doTree(ctx, targetURL, timeRef, 1, false, "", depth, includeFiles); e != nil
clnt, err := newClientFromAlias(targetAlias, targetURL)
e := doList(ctx, clnt, true, false, false, timeRef, false);
在doTree方法里初始化了一个client,调用了对应的list接口:
代码语言:javascript复制func doTree(ctx context.Context, url string, timeRef time.Time, level int, leaf bool, branchString string, depth int, includeFiles bool) error
clnt, err := newClientFromAlias(targetAlias, targetURL)
show := func(end bool) error
for content := range clnt.List(ctx, ListOptions{Recursive: false, TimeRef: timeRef, ShowDir: DirFirst})
client是一个客户端的接口cmd/client.go
代码语言:javascript复制type Client interface {
// Common operations
Stat(ctx context.Context, opts StatOptions) (content *ClientContent, err *probe.Error)
List(ctx context.Context, opts ListOptions) <-chan *ClientContent
对应有两个具体实现,一个是本地文件系统,一个s3:
cmd/client-fs.go
代码语言:javascript复制func (f *fsClient) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
go f.listRecursiveInRoutine(contentCh, opts.WithMetadata)
go f.listDirOpt(contentCh, opts.Incomplete, opts.WithMetadata, opts.ShowDir)
go f.listInRoutine(contentCh, opts.WithMetadata)
通过walk方法递归调用visit方法做树状渲染展示:
代码语言:javascript复制func (f *fsClient) listRecursiveInRoutine(contentCh chan *ClientContent, isMetadata bool)
visitFS := func(fp string, fi os.FileInfo, e error) error
e := xfilepath.Walk(dirName, visitFS)
cmd/client-s3.go
代码语言:javascript复制func (c *S3Client) List(ctx context.Context, opts ListOptions) <-chan *ClientContent
c.versionedList(ctx, contentCh, opts)
c.unversionedList(ctx, contentCh, opts)
代码语言:javascript复制func (c *S3Client) versionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
b, o := c.url2BucketAndObject()
代码语言:javascript复制func (c *S3Client) url2BucketAndObject() (bucketName, objectName string)
buckets, err := c.api.ListBuckets(ctx)
for _, bucket := range buckets {
contentCh <- c.bucketInfo2ClientContent(bucket)
for objectVersion := range c.listVersions(ctx, bucket.Name, "",
opts.Recursive, opts.TimeRef, opts.WithOlderVersions, opts.WithDeleteMarkers)
c.unversionedList(ctx, contentCh, opts)
代码语言:javascript复制func (c *S3Client) unversionedList(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
c.listIncompleteRecursiveInRoutine(ctx, contentCh, opts)
c.listIncompleteInRoutine(ctx, contentCh, opts)
c.listRecursiveInRoutine(ctx, contentCh, opts)
c.listInRoutine(ctx, contentCh, opts)
代码语言:javascript复制func (c *S3Client) listIncompleteRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
buckets, err := c.api.ListBuckets(ctx)
for _, bucket := range buckets {
for object := range c.api.ListIncompleteUploads(ctx, bucket.Name, o, isRecursive)
代码语言:javascript复制func (c *S3Client) listRecursiveInRoutine(ctx context.Context, contentCh chan *ClientContent, opts ListOptions)
buckets, err := c.api.ListBuckets(ctx)
for object := range c.listObjectWrapper(ctx, bucket.Name, o, isRecursive, time.Time{}, false, false, opts.WithMetadata, -1)
代码语言:javascript复制func (c *S3Client) listObjectWrapper(ctx context.Context, bucket, object string, isRecursive bool, timeRef time.Time, withVersions, withDeleteMarkers bool, metadata bool, maxKeys int) <-chan minio.ObjectInfo
return c.listVersions(ctx, bucket, object, isRecursive, timeRef, withVersions, withDeleteMarkers)
c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, UseV1: true, MaxKeys: maxKeys})
c.api.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: object, Recursive: isRecursive, WithMetadata: metadata, MaxKeys: maxKeys})
最终都是调用了SDK中的对应方法:
minio/minio-go/v7@v7.0.16-0.20211108161804-a7a36ee131df/api-list.go
代码语言:javascript复制func (c *Client) ListObjects(ctx context.Context, bucketName string, opts ListObjectsOptions) <-chan ObjectInfo
代码语言:javascript复制func (c *Client) ListIncompleteUploads(ctx context.Context, bucketName, objectPrefix string, recursive bool) <-chan ObjectMultipartInfo {
return c.listIncompleteUploads(ctx, bucketName, objectPrefix, recursive)
}
我们看下另外一个ls命令,实现也是类似的:
cmd/ls-main.go
代码语言:javascript复制var lsCmd = cli.Command{
Name: "ls",
Usage: "list buckets and objects",
Action: mainList,
OnUsageError: onUsageError,
Before: setGlobalsFromContext,
Flags: append(lsFlags, globalFlags...),
CustomHelpTemplate:
代码语言:javascript复制func mainList(cliCtx *cli.Context) error
if e := doList(ctx, clnt, isRecursive, isIncomplete, isSummary, timeRef, withOlderVersions); e != nil
cmd/ls.go
代码语言:javascript复制func doList(ctx context.Context, clnt Client, isRecursive, isIncomplete, isSummary bool, timeRef time.Time, withOlderVersions bool) error
for content := range clnt.List(ctx, ListOptions{
Recursive: isRecursive,
Incomplete: isIncomplete,
TimeRef: timeRef,
WithOlderVersions: withOlderVersions || !timeRef.IsZero(),
WithDeleteMarkers: true,
ShowDir: DirNone,
})
2,SDK:minio-go
首先我们看下sdk是如何使用的:
1,创建client对象:
代码语言:javascript复制minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: useSSL,
})
2,创建bucket,或者确认bucket是否存在:
代码语言:javascript复制err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: location})
代码语言:javascript复制exists, errBucketExists := minioClient.BucketExists(ctx, bucketName)
3,创建文件
代码语言:javascript复制info, err := minioClient.FPutObject(ctx, bucketName, objectName, filePath, minio.PutObjectOptions{ContentType: contentType})
client对象定义在api.go
代码语言:javascript复制type Client struct {
/// Standard options.
// Parsed endpoint url provided by the user.
endpointURL *url.URL
// Holds various credential providers.
credsProvider *credentials.Credentials
// Custom signerType value overrides all credentials.
overrideSignerType credentials.SignatureType
// User supplied.
appInfo struct {
appName string
appVersion string
}
// Indicate whether we are using https or not
secure bool
// Needs allocation.
httpClient *http.Client
bucketLocCache *bucketLocationCache
// Advanced functionality.
isTraceEnabled bool
traceErrorsOnly bool
traceOutput io.Writer
// S3 specific accelerated endpoint.
s3AccelerateEndpoint string
// Region endpoint
region string
// Random seed.
random *rand.Rand
// lookup indicates type of url lookup supported by server. If not specified,
// default to Auto.
lookup BucketLookupType
// Factory for MD5 hash functions.
md5Hasher func() md5simd.Hasher
sha256Hasher func() md5simd.Hasher
healthStatus int32
}
代码语言:javascript复制func New(endpoint string, opts *Options) (*Client, error)
这个文件下还定义了一个executeMethod方法,这个方法是所有http请求的入口:
代码语言:javascript复制func (c *Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error)
bodyCloser, ok := metadata.contentBody.(io.Closer)
for range c.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter)
req, err = c.newRequest(ctx, method, metadata)
res, err = c.do(req)
do方法简单包装了http client的do方法:
代码语言:javascript复制func (c *Client) do(req *http.Request) (resp *http.Response, err error)
resp, err = c.httpClient.Do(req)
api-put-bucket.go里定义了创建bucket的方法:
代码语言:javascript复制func (c *Client) MakeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error) {
return c.makeBucket(ctx, bucketName, opts)
}
代码语言:javascript复制func (c *Client) makeBucket(ctx context.Context, bucketName string, opts MakeBucketOptions) (err error)
err = c.doMakeBucket(ctx, bucketName, opts.Region, opts.ObjectLocking)
最终调用了上述executeMethod方法:
代码语言:javascript复制func (c *Client) doMakeBucket(ctx context.Context, bucketName string, location string, objectLockEnabled bool) (err error)
createBucketConfigBytes, err = xml.Marshal(createBucketConfig)
reqMetadata.contentMD5Base64 = sumMD5Base64(createBucketConfigBytes)
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
BucketExists方法定义在api-stat.go
代码语言:javascript复制func (c *Client) BucketExists(ctx context.Context, bucketName string) (bool, error)
resp, err := c.executeMethod(ctx, http.MethodHead, requestMetadata{
bucketName: bucketName,
contentSHA256Hex: emptySHA256Hex,
})
FPutObject定义在:api-put-object-file-context.go
代码语言:javascript复制func (c *Client) FPutObject(ctx context.Context, bucketName, objectName, filePath string, opts PutObjectOptions) (info UploadInfo, err error)
fileReader, err := os.Open(filePath)
fileStat, err := fileReader.Stat()
fileSize := fileStat.Size()
return c.PutObject(ctx, bucketName, objectName, fileReader, fileSize, opts)
api-put-object.go
代码语言:javascript复制func (c *Client) PutObject(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
opts PutObjectOptions) (info UploadInfo, err error)
return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)
根据大小和url的类型确定上传方式,可以整体也可以分片,还可以流式
代码语言:javascript复制func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)
if size > int64(maxMultipartPutObjectSize)
if s3utils.IsGoogleEndpoint(*c.endpointURL) {
return c.putObject(ctx, bucketName, objectName, reader, size, opts)
if c.overrideSignerType.IsV2() {
if size >= 0 && size < int64(partSize) || opts.DisableMultipart {
return c.putObject(ctx, bucketName, objectName, reader, size, opts)
}
return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
}
return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
这里定义了一个常量,最大允许5T大小
api-put-object-streaming.go
代码语言:javascript复制func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error)
return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
代码语言:javascript复制func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error)
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
api-put-object-multipart.go
代码语言:javascript复制func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
opts PutObjectOptions) (info UploadInfo, err error)
info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
return c.putObject(ctx, bucketName, objectName, reader, size, opts)
分片上传根据分片大小,计算出分片数目,然后创建上传的id,最后合并分片:
代码语言:javascript复制func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error)
totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
for partNumber <= totalPartsCount {
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
代码语言:javascript复制func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error)
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
代码语言:javascript复制func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error)
resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
除了基本的上传,在examples下面还定义了一些其他的接口的使用例子:
examples/minio/listen-notification.go
代码语言:javascript复制minioClient.ListenNotification(context.Background(), "PREFIX", "SUFFIX", []string{
"s3:BucketCreated:*",
"s3:BucketRemoved:*",
"s3:ObjectCreated:*",
"s3:ObjectAccessed:*",
"s3:ObjectRemoved:*",
})
examples/minio/listenbucketnotification.go
代码语言:javascript复制minioClient.ListenBucketNotification(context.Background(), "YOUR-BUCKET", "PREFIX", "SUFFIX", []string{
"s3:ObjectCreated:*",
"s3:ObjectAccessed:*",
"s3:ObjectRemoved:*",
})
examples/minio/putobjectsnowball.go
代码语言:javascript复制minioClient.ListObjects(context.Background(), YOURBUCKET, lopts)
examples/minio/getbucketreplicationmetrics.go
代码语言:javascript复制s3Client.TraceOn(os.Stderr)
m, err := s3Client.GetBucketReplicationMetrics(context.Background(), "bucket")
api-bucket-notification.go
代码语言:javascript复制func (c *Client) ListenNotification(ctx context.Context, prefix, suffix string, events []string) <-chan notification.Info
代码语言:javascript复制func (c *Client) ListenBucketNotification(ctx context.Context, bucketName, prefix, suffix string, events []string) <-chan notification.Info
resp, err := c.executeMethod(ctx, http.MethodGet, requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
contentSHA256Hex: emptySHA256Hex,
})
以上就是mc和sdk的源码,整体来说就是对minio的接口做了一层httpclient 的封装,加了一些参数校验的逻辑。