在分析完minio请求的路由后golang 源码分析:minio(part I)路由,我们看下一个文件是如何落盘的,不考虑gateway情况,我们从serverMain开始:
cmd/server-main.go
代码语言:javascript复制func serverMain(ctx *cli.Context) {
handler, err := configureServerHandler(globalEndpoints)
registerAPIRouter(router)
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
initServer(GlobalContext, newObject)
在注册完路由后,会创建一个object,然后initServer,在initServer里将object赋值给自己的属性:
代码语言:javascript复制func initServer(ctx context.Context, newObject ObjectLayer) error
setObjectLayer(newObject)
在创建object的时候,有两个分支,我们重点看只有一个endpoint的情况:
cmd/erasure-server-pool.go
代码语言:javascript复制func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServerPools) (ObjectLayer, error)
commonParityDrives = ecDrivesNoConfig(ep.DrivesPerSet)
err = storageclass.ValidateParity(commonParityDrives, ep.DrivesPerSet)
z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i)
cmd/server-main.go
代码语言:javascript复制func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error)
return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
return newErasureServerPools(ctx, endpointServerPools)
object 存储在全局变量globalObjectAPI里,对应的get和set方法定义在cmd/api-router.go,由于是全局变量,所以需要读锁
代码语言:javascript复制func setObjectLayer(o ObjectLayer) {
globalObjectAPI = o
代码语言:javascript复制func newObjectLayerFn() ObjectLayer {
globalObjLayerMutex.RLock()
defer globalObjLayerMutex.RUnlock()
return globalObjectAPI
}
在registerAPIRouter方法内部会初始化api对象,它的一个接口用newObjectLayerFn 来赋值的,这样就实现了object对象和router的关联,相关的操作最终都是调用的object对象的方法来完成的
代码语言:javascript复制func registerAPIRouter(router *mux.Router)
api := objectAPIHandlers{
ObjectAPI: newObjectLayerFn,
CacheAPI: newCachedObjectLayerFn,
}
下面以三个路由为例看下文件的操作相关流程
代码语言:javascript复制// GetObject - note gzip compression is *not* added due to Range requests.
router.Methods(http.MethodGet).Path("/{object:. }").HandlerFunc(
collectAPIStats("getobject", maxClients(httpTraceHdrs(api.GetObjectHandler))))
代码语言:javascript复制router.Methods(http.MethodPut).Path("/{object:. }").HandlerFunc(
collectAPIStats("putobjectpart", maxClients(gz(httpTraceHdrs(api.PutObjectPartHandler))))).Queries("partNumber", "{partNumber:[0-9] }", "uploadId", "{uploadId:.*}")
代码语言:javascript复制router.Methods(http.MethodPut).Path("/{object:. }").HandlerFunc(
collectAPIStats("putobject", maxClients(gz(httpTraceHdrs(api.PutObjectHandler)))))
globalObjectAPI 定义在cmd/object-api-common.go
代码语言:javascript复制var globalObjectAPI ObjectLayer
bucketMetaPrefix = "buckets"
和router对应的handler定义在:cmd/object-handlers.go
代码语言:javascript复制func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request)
objectAPI := api.ObjectAPI()
api.getObjectHandler(ctx, objectAPI, bucket, object, w, r)
先获取handler getObjectInfo 然后调用handler
代码语言:javascript复制func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request)
getObjectInfo := objectAPI.GetObjectInfo
_, err = getObjectInfo(ctx, bucket, object, opts)
getObjectNInfo := objectAPI.GetObjectNInfo
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
sendEvent(eventArgs{
EventName: event.ObjectAccessedGet,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
部分上传也一样的
代码语言:javascript复制func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request)
objectAPI := api.ObjectAPI()
mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
putObjectPart := objectAPI.PutObjectPart
partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
全量上传:
代码语言:javascript复制func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request)
objectAPI := api.ObjectAPI()
putObject = objectAPI.PutObject
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
getObjectInfo := objectAPI.GetObjectInfo
_, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts);
单机版本的minio 实现的存储对象定义在:cmd/fs-v1.go
代码语言:javascript复制func NewFSObjectLayer(fsPath string) (ObjectLayer, error)
fsPath, err = getValidPath(fsPath);
fsUUID := mustGetUUID()
err = initMetaVolumeFS(fsPath, fsUUID);
rlk, err := initFormatFS(ctx, fsPath)
fs := &FSObjects{
fsPath: fsPath,
metaJSONFile: fsMetaJSONFile,
fsUUID: fsUUID,
rwPool: &fsIOPool{
readersMap: make(map[string]*lock.RLockedFile),
},
nsMutex: newNSLock(false),
listPool: NewTreeWalkPool(globalLookupTimeout),
appendFileMap: make(map[string]*fsAppendFile),
diskMount: mountinfo.IsLikelyMountPoint(fsPath),
}
fs.fsFormatRlk = rlk
会创建一个自己的文件系统
代码语言:javascript复制func initMetaVolumeFS(fsPath, fsUUID string) error
metaBucketPath := pathJoin(fsPath, minioMetaBucket)
err := os.MkdirAll(metaBucketPath, 0777);
metaTmpPath := pathJoin(fsPath, minioMetaTmpBucket, fsUUID)
err := os.MkdirAll(metaTmpPath, 0777);
metaMultipartPath := pathJoin(fsPath, minioMetaMultipartBucket)
os.MkdirAll(metaMultipartPath, 0777)
代码语言:javascript复制func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error)
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
err = fs.createFsJSON(object, fsMetaPath)
读取的时候需要加锁
代码语言:javascript复制func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, err error)
lk := fs.NewNSLock(bucket, object)
_, err := fs.statBucketDir(ctx, bucket);
return fs.getObjectInfo(ctx, bucket, object)
最终调用系统调用fsStat获取文件的信息:
代码语言:javascript复制func (fs *FSObjects) statBucketDir(ctx context.Context, bucket string) (os.FileInfo, error)
bucketDir, err := fs.getBucketDir(ctx, bucket)
st, err := fsStatVolume(ctx, bucketDir)
fi, err := fsStat(ctx, volume)
代码语言:javascript复制func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error)
fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object))
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
rlk, err := fs.rwPool.Open(fsMetaPath)
_, rerr := fsMeta.ReadFrom(ctx, rlk.LockedFile)
fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
return fsMeta.ToObjectInfo(bucket, object, fi), nil
创建文件的过程中,先在tmp目录下创建文件,等待文件创建完毕后,rename到目标目录,能够尽可能地减少锁冲突:
代码语言:javascript复制func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
return fs.putObject(ctx, bucket, object, r, opts)
创建文件的过程中,会先创建meta文件,保存文件的元数据信息:
代码语言:javascript复制func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, retErr error)
fs.statBucketDir(ctx, bucket);
isObjectDir(object, data.Size())
if fi, err = fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object)); err != nil
var wlk *lock.LockedFile
if bucket != minioMetaBucket {
bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix)
fsMetaPath := pathJoin(bucketMetaDir, bucket, object, fs.metaJSONFile)
wlk, err = fs.rwPool.Write(fsMetaPath)
wlk, err = fs.rwPool.Create(fsMetaPath)
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, data.Size())
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
if err = fsRenameFile(ctx, fsTmpObjPath, fsNSObjPath);
_, err = fsMeta.WriteTo(wlk);
if err = jsonSave(lk, m);
fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
cmd/fs-v1-helpers.go
代码语言:javascript复制func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, fallocSize int64) (int64, error)
if err := checkPathLength(filePath);
writer, err := lock.Open(filePath, flags, 0666)
bytesWritten, err := xioutil.Copy(writer, reader)
cmd/object-api-utils.go问及里定义minio文件系统中目录的名字
代码语言:javascript复制const (
// MinIO meta bucket.
minioMetaBucket = ".minio.sys"
minioMetaTmpBucket = minioMetaBucket "/tmp"
mpartMetaPrefix = "multipart"
minioMetaMultipartBucket = minioMetaBucket SlashSeparator mpartMetaPrefix
cmd/data-usage.go
代码语言:javascript复制dataUsageBucket = minioMetaBucket SlashSeparator bucketMetaPrefix
cmd/format-fs.go
代码语言:javascript复制func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, err error)
fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile)
err := formatFSFixDeploymentID(ctx, fsFormatPath); err != nil
formatBackend, err := formatMetaGetFormatBackendFS(rlk)
err = jsonLoad(rlk, format)
return jsonSave(wlk, format)
rlk, err := lock.RLockedOpenFile(fsFormatPath)
formatBackend, err := formatMetaGetFormatBackendFS(rlk)
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
cmd/format-meta.go
代码语言:javascript复制formatConfigFile = "format.json"
每一个操作都会发相应的通知,通知是存在一个map里cmd/notification.go
代码语言:javascript复制func sendEvent(args eventArgs)
globalNotificationSys.Send(args)
func (sys *NotificationSys) Send(args eventArgs)
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
internal/event/targetlist.go
代码语言:javascript复制type TargetList struct {
sync.RWMutex
targets map[TargetID]Target
}
元文件的定义如下:cmd/fs-v1-metadata.go
代码语言:javascript复制type fsMetaV1 struct {
Version string `json:"version"`
// checksums of blocks on disk.
Checksum FSChecksumInfoV1 `json:"checksum,omitempty"`
// Metadata map for current object.
Meta map[string]string `json:"meta,omitempty"`
// parts info for current object - used in encryption.
Parts []ObjectPartInfo `json:"parts,omitempty"`
}
cmd/xl-storage-format-v1.go
代码语言:javascript复制type ObjectPartInfo struct {
ETag string `json:"etag,omitempty"`
Number int `json:"number"`
Size int64 `json:"size"`
ActualSize int64 `json:"actualSize"`
}
cmd/fs-v1-helpers.go
代码语言:javascript复制func fsStatDir(ctx context.Context, statDir string) (os.FileInfo, error)
fi, err := fsStat(ctx, statDir)
分片上传相对复杂,代码路径在:cmd/fs-v1-multipart.go
代码语言:javascript复制func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, e error)
if _, err := fs.statBucketDir(ctx, bucket); err != nil {
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID "." mustGetUUID() "." strconv.Itoa(partID))
bytesWritten, err := fsCreateFile(ctx, tmpPartPath, data, data.Size())
defer fsRemoveFile(ctx, tmpPartPath)
partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag, data.ActualSize()))
err = fsSimpleRenameFile(ctx, tmpPartPath, partPath);
go fs.backgroundAppend(ctx, bucket, object, uploadID)
fi, err := fsStatFile(ctx, partPath)