golang 源码分析:minio(part I)路由

2022-08-03 13:52:02 浏览数 (1)

MinIO的命令行启动只有2个命令,一个是server、一个是gateway,分别用于启动服务和网关,而整个MinIO的启动是从main.go文件开始的

引入了两个包

代码语言:javascript复制
_ "github.com/minio/minio/internal/init"
_  "github.com/minio/minio/cmd/gateway

看下对应的init函数:

internal/init/init.go

internal/init/init_darwin_amd64.go

代码语言:javascript复制
 cpuid.CPU.Disable(cpuid.AVX512F

然而,在gateway里面引入了几个常用的实现

cmd/gateway/gateway.go

代码语言:javascript复制
      _ "github.com/minio/minio/cmd/gateway/nas"
      _ "github.com/minio/minio/cmd/gateway/azure"
      _ "github.com/minio/minio/cmd/gateway/s3"
      _ "github.com/minio/minio/cmd/gateway/hdfs"
      _ "github.com/minio/minio/cmd/gateway/gcs

以s3为例

cmd/gateway/s3/gateway-s3.go

代码语言:javascript复制
func init() {
  const s3GatewayTemplate = `NAME:
  {{.HelpName}} - {{.Usage}}
   
   
   minio.RegisterGatewayCommand(cli.Command{
    Name:               minio.S3BackendGateway,
    Usage:              "Amazon Simple Storage Service (S3)",
    Action:             s3GatewayMain,
    CustomHelpTemplate: s3GatewayTemplate,
    HideHelpCommand:    true,
  })

调用了RegisterGatewayCommand将对应的命令注册为gatewayCmd的子命令。而对应的处理函数为:

代码语言:javascript复制
func s3GatewayMain(ctx *cli.Context)
   minio.StartGateway(ctx, &S3{
    host:  args.First(),
    debug: env.Get("_MINIO_SERVER_DEBUG", config.EnableOff) == config.EnableOn,
  })

调用了StartGateway方法,它的实现在cmd/gateway-main.go

代码语言:javascript复制
func StartGateway(ctx *cli.Context, gw Gateway) 
      router := mux.NewRouter().SkipClean(true).UseEncodedPath()
      registerSTSRouter(router)
      registerAdminRouter(router, false)
      registerAPIRouter(router)
      httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(router)), getCert)
      newAllSubsystems()
      buckets, err := newObject.ListBuckets(GlobalContext)
      globalConsoleSrv, err = initConsoleServer()
      (globalConsoleSrv.Serve()

注册了STS,Admin和API等router,然后起了一个httpserver,最后调用Serve方法开启端口监听。

sts路由注册函数实现在cmd/sts-handlers.go

代码语言:javascript复制
func registerSTSRouter(router *mux.Router)
    stsRouter.Methods(http.MethodPost).MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool {
    ctypeOk := wildcard.MatchSimple("application/x-www-form-urlencoded*", r.Header.Get(xhttp.ContentType))
    noQueries := len(r.URL.RawQuery) == 0
    return ctypeOk && noQueries
  }).HandlerFunc(httpTraceAll(sts.AssumeRoleWithSSO))
    
    stsRouter.Methods(http.MethodPost).HandlerFunc(httpTraceAll(sts.AssumeRoleWithClientGrants)).
    Queries(stsAction, clientGrants).
    Queries(stsVersion, stsAPIVersion).
    Queries(stsToken, "{Token:.*}")

这里定义了sts常见的一些接口

代码语言:javascript复制
    const (
  // STS API version.
  stsAPIVersion             = "2011-06-15"
  stsVersion                = "Version"
  stsAction                 = "Action"
  stsPolicy                 = "Policy"

admin路由定义了一系列后台操作的接口 cmd/admin-router.go

代码语言:javascript复制
func registerAdminRouter(router *mux.Router, enableConfigOps bool)
      adminRouter.Methods(http.MethodPost).Path(adminVersion "/service").HandlerFunc(gz(httpTraceAll(adminAPI.ServiceHandler))).Queries("action", "{action:.*}")

apirouter 定义了我们真正操作对象存储的接口cmd/api-router.go

代码语言:javascript复制
func registerAPIRouter(router *mux.Router) 
      gz, err := gzhttp.NewWrapper(gzhttp.MinSize(1000), gzhttp.CompressionLevel(gzip.BestSpeed))
      routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
      
      router.Methods(http.MethodHead).Path("/{object:. }").HandlerFunc(
      collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))

以"/{object:. }" 为例子,对应的handlerFunc其实被middleware包裹了很多层:cmd/handler-api.go

代码语言:javascript复制
    func maxClients(f http.HandlerFunc) http.HandlerFunc 
      f.ServeHTTP(w, r)

github.com/klauspost/compress@v1.13.6/gzhttp/gzip.go

代码语言:javascript复制
func NewWrapper(opts ...option) (func(http.Handler) http.HandlerFunc, error) 
      h.ServeHTTP(gwcn, r)

cmd/handler-utils.go

代码语言:javascript复制
func httpTraceAll(f http.HandlerFunc) http.HandlerFunc 
      f.ServeHTTP(w, r)

最后调用了headObjectHandler这个接口cmd/object-handlers.go

代码语言:javascript复制
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request) 
      objectAPI := api.ObjectAPI()
      api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
      api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)

对应于s3,它的实现在:cmd/s3-zip-handlers.go

代码语言:javascript复制
func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) 
      _, err = getObjectInfo(ctx, bucket, zipPath, opts)
      file, err := zipindex.FindSerialized(zipInfo, object)

cmd/object-api-interface.go

代码语言:javascript复制
type ObjectLayer interface {
  // Locking operations on object.
  NewNSLock(bucket string, objects ...string) RWLocker
  GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)

可以看到最终调用的是minio的SDK,去请求远程的s3服务cmd/gateway/s3/gateway-s3.go

代码语言:javascript复制
func (l *s3Objects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) 
      oi, err := l.Client.StatObject(ctx, bucket, object, miniogo.StatObjectOptions{
    ServerSideEncryption: opts.ServerSideEncryption,
  })

github.com/minio/minio-go/v7@v7.0.15/core.go

代码语言:javascript复制
func (c Core) StatObject(ctx context.Context, bucketName, objectName string, opts StatObjectOptions) (ObjectInfo, error) {
  return c.statObject(ctx, bucketName, objectName, opts)
}

接着,我们回到main.go,main函数实现很简单:

代码语言:javascript复制
func main() {
  minio.Main(os.Args)
}

cmd/main.go

代码语言:javascript复制
func Main(args []string)
        if err := newApp(appName).Run(args); err != nil {

创建了一个app对象

代码语言:javascript复制
func newApp(name string) *cli.App 
    commandsTree := trie.NewTrie()
    registerCommand := func(command cli.Command)
    commands = append(commands, command)
    commandsTree.Insert(command.Name)
    
    findClosestCommands := func(command string) []string {
    for _, value := range commandsTree.Walk(commandsTree.Root()) {
    registerCommand(serverCmd)
    registerCommand(gatewayCmd)
    app := cli.NewApp()

其中父子命令以前缀树的形式存储:trie/trie.go

代码语言:javascript复制
func NewTrie() *Trie {
  return &Trie{
    root: newNode(),
    size: 0,
  }
}

app对象定义在cli包里cli/app.go

代码语言:javascript复制
type App struct {
  // The name of the program. Defaults to path.Base(os.Args[0])
  Name string  
代码语言:javascript复制
func NewApp() *App {

可以看到在newApp方法里注册了serverCmd和gatewayCmd,首先看下serverCmd,代码位置:cmd/server-main.go

代码语言:javascript复制
var serverCmd = cli.Command{
  Name:   "server",
  Usage:  "start object storage server",
  Flags:  append(ServerFlags, GlobalFlags...),
  Action: serverMain,
  CustomHelpTemplate:

对应的处理方法是:

代码语言:javascript复制
func serverMain(ctx *cli.Context)
      bitrotSelfTest()
      erasureSelfTest()
      compressSelfTest()
      globalConsoleSys.SetNodeName(globalLocalNodeName)
      newAllSubsystems()
        globalNotificationSys = NewNotificationSys(globalEndpoints)
        globalBucketMetadataSys = NewBucketMetadataSys()
        globalBucketMetadataSys.Reset()
        globalBucketMonitor = bandwidth.NewMonitor(GlobalContext, totalNodeCount())
        globalConfigSys = NewConfigSys()
        globalIAMSys = NewIAMSys()
        globalPolicySys = NewPolicySys()
        globalLifecycleSys = NewLifecycleSys()
        globalBucketSSEConfigSys = NewBucketSSEConfigSys()
        globalBucketObjectLockSys = NewBucketObjectLockSys()
        globalBucketQuotaSys = NewBucketQuotaSys()
        globalBucketVersioningSys = NewBucketVersioningSys()
        globalBucketVersioningSys.Reset()
        globalBucketTargetSys = NewBucketTargetSys()
        globalTierConfigMgr = NewTierConfigMgr()
      checkUpdate(getMinioMode())
      setMaxResources()
        sys.GetMaxThreads()
        sys.GetMaxOpenFileLimit()
         sys.GetMaxMemoryLimit()
      handler, err := configureServerHandler(globalEndpoints)
      httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(handler)), getCert)
      setHTTPServer(httpServer)
      newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
      initBackgroundExpiry(GlobalContext, newObject)
       err = initServer(GlobalContext, newObject); err != nil
      go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
      initDataScanner(GlobalContext, newObject)
      initBackgroundReplication(GlobalContext, newObject)
      initBackgroundTransition(GlobalContext, newObject)
      globalTierJournal, err = initTierDeletionJournal(GlobalContext)
      cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
      setCacheObjectLayer(cacheAPI)
      globalConsoleSrv, err = initConsoleServer()
      globalConsoleSrv.Serve()

核心思想是注册路由,最后启动http server,而对应的gatewayCmd的实现在

cmd/gateway-main.go

代码语言:javascript复制
gatewayCmd = cli.Command{
    Name:            "gateway",
    Usage:           "start object storage gateway",
    Flags:           append(ServerFlags, GlobalFlags...),
    HideHelpCommand: true,
  }

可以看到,它并没有对应的处理方法,很奇怪对吧,那是应为对于不同的对象存储服务,它是以子命令的形式注册进来的,注册的位置在init函数中,也就是前面介绍的注册逻辑。注册函数如下:

代码语言:javascript复制
func RegisterGatewayCommand(cmd cli.Command) error {
  cmd.Flags = append(append(cmd.Flags, ServerFlags...), GlobalFlags...)
  gatewayCmd.Subcommands = append(gatewayCmd.Subcommands, cmd)
  return nil
}

注册完路由会启动一个httpserver,minio对httpserver 进行了简单的包装,代码位置是 internal/http/server.go

代码语言:javascript复制
func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificateFunc) *Server

路由用的是现成的路由包:github.com/gorilla/mux@v1.8.0/mux.go

代码语言:javascript复制
func NewRouter() *Router {
  return &Router{namedRoutes: make(map[string]*Route)}
}

serverCmd的路由在cmd/routers.go,同样注册了STS,admin和api路由:

代码语言:javascript复制
func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error) 
    registerAdminRouter(router, true)
    registerSTSRouter(router)
    registerAPIRouter(router)
    router.Use(globalHandlers...)

以api router为例看下具体实现:

代码语言:javascript复制
func registerAPIRouter(router *mux.Router)
    routers = append(routers, apiRouter.MatcherFunc(func(r *http.Request, match *mux.RouteMatch) bool
    routers = append(routers, apiRouter.Host("{bucket:. }." domainName).Subrouter())
    routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
    
    router.Methods(http.MethodHead).Path("/{object:. }").HandlerFunc(
      collectAPIStats("headobject", maxClients(gz(httpTraceAll(api.HeadObjectHandler)))))

cmd/object-handlers.go

代码语言:javascript复制
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request)
      api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
      api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)

cmd/s3-zip-handlers.go

代码语言:javascript复制
func (api objectAPIHandlers) headObjectInArchiveFileHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) 
      getObjectInfo := objectAPI.GetObjectInfo
      _, err = getObjectInfo(ctx, bucket, zipPath, opts)

cmd/object-api-interface.go

代码语言:javascript复制
type ObjectLayer interface {
  // Locking operations on object.
  NewNSLock(bucket string, objects ...string) RWLocker

整体路由serverCmd和gatewayCmd实现上是非常相似的。

0 人点赞