uber-go/fx 源码分析

2022-12-17 16:18:47 浏览数 (1)

https://github.com/uber-go/fx是一个依赖注入框架,它是基于依赖注入库dig实现的(参考:dig 源码分析)。它简化了依赖注入,消除了全局状态的维护和 func init()。首先看下如何使用:

最简单的方式:

代码语言:javascript复制
fx.New().Run()

或者

代码语言:javascript复制
    fx.New(
      fx.Provide(http.NewServeMux),
    ).Run()

或者

代码语言:javascript复制
func main(){
var module1 = fx.Options(
    fx.Provide(NewDb),
  )
  module := fx.Module("main",
    fx.Provide(
      NewService,
      NewServer,
    ),
  )

  fx.New(
    fx.Provide(http.NewServeMux),
    log.Module,
    fx.Invoke(registerHooks),
    module1,
    module,
  ).Run()
}

func registerHooks(
  lifecycle fx.Lifecycle, mux *http.ServeMux,
) {
  lifecycle.Append(
    fx.Hook{
      OnStart: func(ctx context.Context) error {
        go http.ListenAndServe(":8080", mux)
        return nil
      },
    },
  )
}

可以看到,它将Provide和Invoke 都抽象成options,作为New的参数,来进行容器的初始化,然后通过Run开启应用。在Invoke方法中可以指定程序运行钱的Hook OnSatrt和程序结束前的Hook OnStop。下面通过源码进行分析:

代码语言:javascript复制
func New(opts ...Option) *App {}
  app.root = &module{app: app}
  app.modules = append(app.modules, app.root)
  for _, opt := range opts {
    opt.apply(app.root)
  }
    app.lifecycle = &lifecycleWrapper{
    lifecycle.New(appLogger{app}, app.clock),
  }
    app.container = dig.New(
    dig.DeferAcyclicVerification(),
    dig.DryRun(app.validate),
  )
    for _, m := range app.modules {
    m.build(app, app.container)
  }
    for _, m := range app.modules {
    m.provideAll()
  }
    app.root.provide(provide{
    Target: func() Lifecycle { return app.lifecycle },
    Stack:  frames,
  })
  app.root.provide(provide{Target: app.shutdowner, Stack: frames})
  app.root.provide(provide{Target: app.dotGraph, Stack: frames})
  err := app.root.executeInvokes(); err != nil 
  dig.Visualize(app.container, &b, dig.VisualizeError(err))
}

首先看下New的参数

代码语言:javascript复制
type Option interface {
  fmt.Stringer
  apply(*module)
}

它通过 加载配置的方式来完成依赖注入的Provide和Invoke,这两个方法只需要实现了apply接口就行。

代码语言:javascript复制
func Provide(constructors ...interface{}) Option {
  return provideOption{
    Targets: constructors,
    Stack:   fxreflect.CallerStack(1, 0),
  }
}
代码语言:javascript复制
type provideOption struct {
  Targets []interface{}
  Stack   fxreflect.Stack
}
代码语言:javascript复制
 func (o provideOption) apply(mod *module){
   for _, target := range o.Targets
    {
    mod.provides = append(mod.provides, provide{
      Target: target,
      Stack:  o.Stack,
    })
  }

将对应的注入函数append到module的provides里面。其中module的定义如下:

代码语言:javascript复制
type module struct {
  parent     *module
  name       string
  scope      *dig.Scope
  provides   []provide
  invokes    []invoke
  decorators []decorator
  modules    []*module
  app        *App
}

对应的Invoke,定义如下:

代码语言:javascript复制
func Invoke(funcs ...interface{}) Option {
  return invokeOption{
    Targets: funcs,
    Stack:   fxreflect.CallerStack(1, 0),
  }
}
代码语言:javascript复制
type invokeOption struct {
  Targets []interface{}
  Stack   fxreflect.Stack
}
代码语言:javascript复制
func (o invokeOption) apply(mod *module) {
  for _, target := range o.Targets {
    mod.invokes = append(mod.invokes, invoke{
      Target: target,
      Stack:  o.Stack,
    })
  }
}

同样也是append 到module的invokes里面。

解释完入参,我们回过头来看看,App结构的定义。

代码语言:javascript复制
type App struct {
  err       error
  clock     fxclock.Clock
  lifecycle *lifecycleWrapper

  container *dig.Container
  root      *module
  modules   []*module

  // Used to setup logging within fx.
  log            fxevent.Logger
  logConstructor *provide // set only if fx.WithLogger was used
  // Timeouts used
  startTimeout time.Duration
  stopTimeout  time.Duration
  // Decides how we react to errors when building the graph.
  errorHooks []ErrorHandler
  validate   bool
  // Used to signal shutdowns.
  donesMu     sync.Mutex // guards dones and shutdownSig
  dones       []chan os.Signal
  shutdownSig os.Signal

  osExit func(code int) // os.Exit override; used for testing only
}

可以看到root属性是module类型的指针,modules 是module类型的指针数组,回过头来看构造函数New,它调用option的apply方法的时候传入的正是root属性。可见依赖注入需要的所有的数据都绑定到app的root属性上了。

接着初始化了lifecycle 属性

代码语言:javascript复制
type lifecycleWrapper struct{ *lifecycle.Lifecycle }
代码语言:javascript复制
func (l *lifecycleWrapper) Append(h Hook) {
  l.Lifecycle.Append(lifecycle.Hook{
    OnStart: h.OnStart,
    OnStop:  h.OnStop,
  })
}

然后通过dig.New初始化了依赖注入的容器,也就是container属性。接着对所有的modules执行了build和provideAll()方法,注意这里的modules绑定了我们前文提到的依赖注入的provider和invoke。然后给app.root绑定了三个全局的provide方法:Lifecycle,shutdowner,dotGraph。最后执行了 app.root.executeInvokes(),如果执行出错回生成dot格式的调用图,方便可视化查看。其实是调用了dig的dig.Visualize

代码语言:javascript复制
func (m *module) build(app *App, root *dig.Container) 
    if m.parent == nil {
    m.scope = root.Scope(m.name)
    // TODO: Once fx.Decorate is in-place,
    // use the root container instead of subscope.
  } else {
    parentScope := m.parent.scope
    m.scope = parentScope.Scope(m.name)
  }
    for _, mod := range m.modules {
    mod.build(app, root)
  }

主要是做了scope等属性的赋值操作,对于子孙module进行递归调用build方法。

代码语言:javascript复制
func (m *module) provideAll()
    for _, p := range m.provides {
    m.provide(p)
  }
    for _, m := range m.modules {
    m.provideAll()
  }

provideAll也是同样进行了递归操作,调用了provide方法

代码语言:javascript复制
func (m *module) provide(p provide)
  if err := runProvide(m.scope, p, dig.FillProvideInfo(&info), dig.Export(true)); err != nil
代码语言:javascript复制
func runProvide(c container, p provide, opts ...dig.ProvideOption) error 
  constructor := p.Target
  case annotated
    ctor, err := constructor.Build()
    if err := c.Provide(ctor, opts...); err != nil {
  case Annotated:
    if err := c.Provide(ann.Target, opts...); err != nil {
  default:
    c.Provide(constructor, opts...)

最终调用了容器的Provide方法,也就是dig的Provide流程,实现了依赖的构造函数的注入。

接着看下executeInvokes

代码语言:javascript复制
func (m *module) executeInvokes() error {
  for _, invoke := range m.invokes {
    if err := m.executeInvoke(invoke); err != nil {
      return err
    }
  }

  for _, m := range m.modules {
    if err := m.executeInvokes(); err != nil {
      return err
    }
  }
  return nil
}

它同样是对孩子节点实现了递归调用。自己调用了executeInvoke方法

代码语言:javascript复制
func (m *module) executeInvoke(i invoke) (err error) {
  fnName := fxreflect.FuncName(i.Target)
  m.app.log.LogEvent(&fxevent.Invoking{
    FunctionName: fnName,
    ModuleName:   m.name,
  })
  err = runInvoke(m.scope, i)
代码语言:javascript复制
func runInvoke(c container, i invoke) error {
  fn := i.Target
  switch fn := fn.(type) {
 case Option:
 case annotated:
    af, err := fn.Build()
    if err != nil {
      return err
    }

    return c.Invoke(af)
  default:
    return c.Invoke(fn)

最终调用了容器的Invoke方法,也就是dig的Invoke方法。至此完成了App的初始化流程,接着开始运行App,也就是调用Run接口。

代码语言:javascript复制
func (app *App) Run() {}
    if code := app.run(app.Done()); code != 0 {
    app.exit(code)
    }
代码语言:javascript复制
func (app *App) run(done <-chan os.Signal) (exitCode int)
  err := app.Start(startCtx)
  sig := <-done
  err := app.Stop(stopCtx);

内部调用了Start和Stop两个接口,chan 收到done的信号后开始执行Stop

代码语言:javascript复制
func (app *App) Done() <-chan os.Signal
  signal.Notify(c, os.Interrupt, _sigINT, _sigTERM)
代码语言:javascript复制
func (app *App) Start(ctx context.Context) (err error)
  withTimeout(ctx, &withTimeoutParams{
    hook:      _onStartHook,
    callback:  app.start,
    lifecycle: app.lifecycle,
    log:       app.log,
  })
代码语言:javascript复制
func (app *App) Stop(ctx context.Context) (err error) 
  withTimeout(ctx, &withTimeoutParams{
    hook:      _onStopHook,
    callback:  app.lifecycle.Stop,
    lifecycle: app.lifecycle,
    log:       app.log,
  })

默认运行两个函数执行15s

代码语言:javascript复制
func withTimeout(ctx context.Context, param *withTimeoutParams) error
  c <- param.callback(ctx)
    控制超时时间,令一个协程里调用
    if param.hook == _onStartHook {
    r = param.lifecycle.startHookRecords()
  } else {
    r = param.lifecycle.stopHookRecords()
  }

在函数里面执行力callback方法,对于start来说就是

代码语言:javascript复制
func (app *App) start(ctx context.Context) error
  err := app.lifecycle.Start(ctx);
  stopErr := app.lifecycle.Stop(ctx)

对于stop来说就是 app.lifecycle.Stop

lifecycle的定义如下:

代码语言:javascript复制
type Lifecycle struct {
  clock        fxclock.Clock
  logger       fxevent.Logger
  hooks        []Hook
  numStarted   int
  startRecords HookRecords
  stopRecords  HookRecords
  runningHook  Hook
  mu           sync.Mutex
}

它有一个Append接口,我们可以添加Hook

代码语言:javascript复制
  func (l *Lifecycle) Append(hook Hook) {
  // Save the caller's stack frame to report file/line number.
  if f := fxreflect.CallerStack(2, 0); len(f) > 0 {
    hook.callerFrame = f[0]
  }
  l.hooks = append(l.hooks, hook)
}

其中通过函数 frames := runtime.CallersFrames(pcs)来获取当前的栈帧

代码语言:javascript复制
  func (l *Lifecycle) Start(ctx context.Context) error {
    for _, hook := range l.hooks
    {
      l.runningHook = hook
      runtime, err := l.runStartHook(ctx, hook)
      l.startRecords = append(l.startRecords, HookRecord{
代码语言:javascript复制
  func (l *Lifecycle) Stop(ctx context.Context) error{
    hook := l.hooks[l.numStarted-1]
    runtime, err := l.runStopHook(ctx, hook)

相应的函数如下

代码语言:javascript复制
  func (l *Lifecycle) runStartHook(ctx context.Context, hook Hook) (runtime time.Duration, err error) 
    err = hook.OnStart(ctx)
代码语言:javascript复制
func (l *Lifecycle) runStopHook(ctx context.Context, hook Hook) (runtime time.Duration, err error) 
    err = hook.OnStop(ctx)

内部就是运行了我们注册额开始或者结束的hook

代码语言:javascript复制
type Hook struct {
  OnStart func(context.Context) error
  OnStop  func(context.Context) error
}

当然也兼容dig的In写法

代码语言:javascript复制
type In = dig.In

总结下:fx.New(...)是fx初始化的地方,启动时主要里面包含[]Option,启动后生成根对象app;provide是将多个构造函数放到fx中,支持组合放入,放入之后fx会对每个option展开,然后一一放入fx中,效果和单个放入每个option一样,但是方便了option过多时的管理。fx中还有一个lifecycle的概念,直接隶属于根对象app,用于完成对象启动和停止时的Hook(钩子)操作,主要包含OnStart和OnStop两个方法,用户可以在要初始化的对象中定义自己的OnStart/OnStop方法,然后append到app.lifecycle.Hooks中。它带来一个比较大的便利是我们可以在module里面定义Module变量,在注入的时候直接注入这个Module变量,这个Module是Options的返回值,本质上是Provid的返回值

代码语言:javascript复制
var Module = fx.Options(
  fx.Provide(ProvideLogger),
)

0 人点赞