https://github.com/uber-go/fx是一个依赖注入框架,它是基于依赖注入库dig实现的(参考:dig 源码分析)。它简化了依赖注入,消除了全局状态的维护和 func init()。首先看下如何使用:
最简单的方式:
代码语言:javascript复制fx.New().Run()
或者
代码语言:javascript复制 fx.New(
fx.Provide(http.NewServeMux),
).Run()
或者
代码语言:javascript复制func main(){
var module1 = fx.Options(
fx.Provide(NewDb),
)
module := fx.Module("main",
fx.Provide(
NewService,
NewServer,
),
)
fx.New(
fx.Provide(http.NewServeMux),
log.Module,
fx.Invoke(registerHooks),
module1,
module,
).Run()
}
func registerHooks(
lifecycle fx.Lifecycle, mux *http.ServeMux,
) {
lifecycle.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
go http.ListenAndServe(":8080", mux)
return nil
},
},
)
}
可以看到,它将Provide和Invoke 都抽象成options,作为New的参数,来进行容器的初始化,然后通过Run开启应用。在Invoke方法中可以指定程序运行钱的Hook OnSatrt和程序结束前的Hook OnStop。下面通过源码进行分析:
代码语言:javascript复制func New(opts ...Option) *App {}
app.root = &module{app: app}
app.modules = append(app.modules, app.root)
for _, opt := range opts {
opt.apply(app.root)
}
app.lifecycle = &lifecycleWrapper{
lifecycle.New(appLogger{app}, app.clock),
}
app.container = dig.New(
dig.DeferAcyclicVerification(),
dig.DryRun(app.validate),
)
for _, m := range app.modules {
m.build(app, app.container)
}
for _, m := range app.modules {
m.provideAll()
}
app.root.provide(provide{
Target: func() Lifecycle { return app.lifecycle },
Stack: frames,
})
app.root.provide(provide{Target: app.shutdowner, Stack: frames})
app.root.provide(provide{Target: app.dotGraph, Stack: frames})
err := app.root.executeInvokes(); err != nil
dig.Visualize(app.container, &b, dig.VisualizeError(err))
}
首先看下New的参数
代码语言:javascript复制type Option interface {
fmt.Stringer
apply(*module)
}
它通过 加载配置的方式来完成依赖注入的Provide和Invoke,这两个方法只需要实现了apply接口就行。
代码语言:javascript复制func Provide(constructors ...interface{}) Option {
return provideOption{
Targets: constructors,
Stack: fxreflect.CallerStack(1, 0),
}
}
代码语言:javascript复制type provideOption struct {
Targets []interface{}
Stack fxreflect.Stack
}
代码语言:javascript复制 func (o provideOption) apply(mod *module){
for _, target := range o.Targets
{
mod.provides = append(mod.provides, provide{
Target: target,
Stack: o.Stack,
})
}
将对应的注入函数append到module的provides里面。其中module的定义如下:
代码语言:javascript复制type module struct {
parent *module
name string
scope *dig.Scope
provides []provide
invokes []invoke
decorators []decorator
modules []*module
app *App
}
对应的Invoke,定义如下:
代码语言:javascript复制func Invoke(funcs ...interface{}) Option {
return invokeOption{
Targets: funcs,
Stack: fxreflect.CallerStack(1, 0),
}
}
代码语言:javascript复制type invokeOption struct {
Targets []interface{}
Stack fxreflect.Stack
}
代码语言:javascript复制func (o invokeOption) apply(mod *module) {
for _, target := range o.Targets {
mod.invokes = append(mod.invokes, invoke{
Target: target,
Stack: o.Stack,
})
}
}
同样也是append 到module的invokes里面。
解释完入参,我们回过头来看看,App结构的定义。
代码语言:javascript复制type App struct {
err error
clock fxclock.Clock
lifecycle *lifecycleWrapper
container *dig.Container
root *module
modules []*module
// Used to setup logging within fx.
log fxevent.Logger
logConstructor *provide // set only if fx.WithLogger was used
// Timeouts used
startTimeout time.Duration
stopTimeout time.Duration
// Decides how we react to errors when building the graph.
errorHooks []ErrorHandler
validate bool
// Used to signal shutdowns.
donesMu sync.Mutex // guards dones and shutdownSig
dones []chan os.Signal
shutdownSig os.Signal
osExit func(code int) // os.Exit override; used for testing only
}
可以看到root属性是module类型的指针,modules 是module类型的指针数组,回过头来看构造函数New,它调用option的apply方法的时候传入的正是root属性。可见依赖注入需要的所有的数据都绑定到app的root属性上了。
接着初始化了lifecycle 属性
代码语言:javascript复制type lifecycleWrapper struct{ *lifecycle.Lifecycle }
代码语言:javascript复制func (l *lifecycleWrapper) Append(h Hook) {
l.Lifecycle.Append(lifecycle.Hook{
OnStart: h.OnStart,
OnStop: h.OnStop,
})
}
然后通过dig.New初始化了依赖注入的容器,也就是container属性。接着对所有的modules执行了build和provideAll()方法,注意这里的modules绑定了我们前文提到的依赖注入的provider和invoke。然后给app.root绑定了三个全局的provide方法:Lifecycle,shutdowner,dotGraph。最后执行了 app.root.executeInvokes(),如果执行出错回生成dot格式的调用图,方便可视化查看。其实是调用了dig的dig.Visualize
代码语言:javascript复制func (m *module) build(app *App, root *dig.Container)
if m.parent == nil {
m.scope = root.Scope(m.name)
// TODO: Once fx.Decorate is in-place,
// use the root container instead of subscope.
} else {
parentScope := m.parent.scope
m.scope = parentScope.Scope(m.name)
}
for _, mod := range m.modules {
mod.build(app, root)
}
主要是做了scope等属性的赋值操作,对于子孙module进行递归调用build方法。
代码语言:javascript复制func (m *module) provideAll()
for _, p := range m.provides {
m.provide(p)
}
for _, m := range m.modules {
m.provideAll()
}
provideAll也是同样进行了递归操作,调用了provide方法
代码语言:javascript复制func (m *module) provide(p provide)
if err := runProvide(m.scope, p, dig.FillProvideInfo(&info), dig.Export(true)); err != nil
代码语言:javascript复制func runProvide(c container, p provide, opts ...dig.ProvideOption) error
constructor := p.Target
case annotated
ctor, err := constructor.Build()
if err := c.Provide(ctor, opts...); err != nil {
case Annotated:
if err := c.Provide(ann.Target, opts...); err != nil {
default:
c.Provide(constructor, opts...)
最终调用了容器的Provide方法,也就是dig的Provide流程,实现了依赖的构造函数的注入。
接着看下executeInvokes
代码语言:javascript复制func (m *module) executeInvokes() error {
for _, invoke := range m.invokes {
if err := m.executeInvoke(invoke); err != nil {
return err
}
}
for _, m := range m.modules {
if err := m.executeInvokes(); err != nil {
return err
}
}
return nil
}
它同样是对孩子节点实现了递归调用。自己调用了executeInvoke方法
代码语言:javascript复制func (m *module) executeInvoke(i invoke) (err error) {
fnName := fxreflect.FuncName(i.Target)
m.app.log.LogEvent(&fxevent.Invoking{
FunctionName: fnName,
ModuleName: m.name,
})
err = runInvoke(m.scope, i)
代码语言:javascript复制func runInvoke(c container, i invoke) error {
fn := i.Target
switch fn := fn.(type) {
case Option:
case annotated:
af, err := fn.Build()
if err != nil {
return err
}
return c.Invoke(af)
default:
return c.Invoke(fn)
最终调用了容器的Invoke方法,也就是dig的Invoke方法。至此完成了App的初始化流程,接着开始运行App,也就是调用Run接口。
代码语言:javascript复制func (app *App) Run() {}
if code := app.run(app.Done()); code != 0 {
app.exit(code)
}
代码语言:javascript复制func (app *App) run(done <-chan os.Signal) (exitCode int)
err := app.Start(startCtx)
sig := <-done
err := app.Stop(stopCtx);
内部调用了Start和Stop两个接口,chan 收到done的信号后开始执行Stop
代码语言:javascript复制func (app *App) Done() <-chan os.Signal
signal.Notify(c, os.Interrupt, _sigINT, _sigTERM)
代码语言:javascript复制func (app *App) Start(ctx context.Context) (err error)
withTimeout(ctx, &withTimeoutParams{
hook: _onStartHook,
callback: app.start,
lifecycle: app.lifecycle,
log: app.log,
})
代码语言:javascript复制func (app *App) Stop(ctx context.Context) (err error)
withTimeout(ctx, &withTimeoutParams{
hook: _onStopHook,
callback: app.lifecycle.Stop,
lifecycle: app.lifecycle,
log: app.log,
})
默认运行两个函数执行15s
代码语言:javascript复制func withTimeout(ctx context.Context, param *withTimeoutParams) error
c <- param.callback(ctx)
控制超时时间,令一个协程里调用
if param.hook == _onStartHook {
r = param.lifecycle.startHookRecords()
} else {
r = param.lifecycle.stopHookRecords()
}
在函数里面执行力callback方法,对于start来说就是
代码语言:javascript复制func (app *App) start(ctx context.Context) error
err := app.lifecycle.Start(ctx);
stopErr := app.lifecycle.Stop(ctx)
对于stop来说就是 app.lifecycle.Stop
lifecycle的定义如下:
代码语言:javascript复制type Lifecycle struct {
clock fxclock.Clock
logger fxevent.Logger
hooks []Hook
numStarted int
startRecords HookRecords
stopRecords HookRecords
runningHook Hook
mu sync.Mutex
}
它有一个Append接口,我们可以添加Hook
代码语言:javascript复制 func (l *Lifecycle) Append(hook Hook) {
// Save the caller's stack frame to report file/line number.
if f := fxreflect.CallerStack(2, 0); len(f) > 0 {
hook.callerFrame = f[0]
}
l.hooks = append(l.hooks, hook)
}
其中通过函数 frames := runtime.CallersFrames(pcs)来获取当前的栈帧
代码语言:javascript复制 func (l *Lifecycle) Start(ctx context.Context) error {
for _, hook := range l.hooks
{
l.runningHook = hook
runtime, err := l.runStartHook(ctx, hook)
l.startRecords = append(l.startRecords, HookRecord{
代码语言:javascript复制 func (l *Lifecycle) Stop(ctx context.Context) error{
hook := l.hooks[l.numStarted-1]
runtime, err := l.runStopHook(ctx, hook)
相应的函数如下
代码语言:javascript复制 func (l *Lifecycle) runStartHook(ctx context.Context, hook Hook) (runtime time.Duration, err error)
err = hook.OnStart(ctx)
代码语言:javascript复制func (l *Lifecycle) runStopHook(ctx context.Context, hook Hook) (runtime time.Duration, err error)
err = hook.OnStop(ctx)
内部就是运行了我们注册额开始或者结束的hook
代码语言:javascript复制type Hook struct {
OnStart func(context.Context) error
OnStop func(context.Context) error
}
当然也兼容dig的In写法
代码语言:javascript复制type In = dig.In
总结下:fx.New(...)是fx初始化的地方,启动时主要里面包含[]Option,启动后生成根对象app;provide是将多个构造函数放到fx中,支持组合放入,放入之后fx会对每个option展开,然后一一放入fx中,效果和单个放入每个option一样,但是方便了option过多时的管理。fx中还有一个lifecycle的概念,直接隶属于根对象app,用于完成对象启动和停止时的Hook(钩子)操作,主要包含OnStart和OnStop两个方法,用户可以在要初始化的对象中定义自己的OnStart/OnStop方法,然后append到app.lifecycle.Hooks中。它带来一个比较大的便利是我们可以在module里面定义Module变量,在注入的时候直接注入这个Module变量,这个Module是Options的返回值,本质上是Provid的返回值
代码语言:javascript复制var Module = fx.Options(
fx.Provide(ProvideLogger),
)