在分析完gravity的原理和如何使用以后,我们开始分析下它的源码。gravity有5个入口,代表了5个工具。
1,cmd/dcp/main.go
代码语言:javascript复制go func() {
closed <- dcp.StartLocal(&barrierConfig, collectorConfigs, &checkerConfig, shutDown, alarm)
}()
2,cmd/gravity/main.go 这个是我们同步工具的入口:它先注册里一系列插件,然后启动http服务 ,并监控配置文件的变化
代码语言:javascript复制log.RegisterExitHandler(func() {
hplugin.CleanupClients()
})
server, err := app.NewServer(cfg.PipelineConfig)
go func() {
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/reset", resetHandler(server, cfg.PipelineConfig))
http.HandleFunc("/status", statusHandler(server, cfg.PipelineConfig.PipelineName, hash))
http.HandleFunc("/healthz", healthzHandler(server))
err = http.ListenAndServe(cfg.HttpAddr, nil)
err = server.Start()
watcher, err := fsnotify.NewWatcher()
err = watcher.Add(cfg.ConfigFile)
3,cmd/padder/main.go
代码语言:javascript复制if err := padder.Pad(cfg.PadderConfig); err != nil {
4,cmd/prometheus_etcd_sd/main.go:从etcd获取元信息,并监控集群状态:
代码语言:javascript复制client, err := utils.CreateEtcdClient(*etcdServer)
srvs = services{}
res, err := client.Get(context.TODO(), *servicesPrefix, etcd.WithPrefix())
for _, kv := range res.Kvs {
srvs.handle(kv, srvs.update)
}
srvs.persist()
updates := client.Watch(context.TODO(), *servicesPrefix, etcd.WithPrefix())
for resp := range updates {
for _, res := range resp.Events {
log.Infoln(res.Type, string(res.Kv.Key), string(res.Kv.Value))
h := srvs.update
if res.Type == mvccpb.DELETE {
h = srvs.delete
}
srvs.handle(res.Kv, h)
}
srvs.persist()
}
5,cmd/verifier/main.go:检查源,目标的合法性。
代码语言:javascript复制source, err := utils.CreateDBConnection(c.Source)
target, err := utils.CreateDBConnection(c.Target)
sourceTables, err := schema_store.GetTablesFromDB(source, c.Source.Schema)
targetTables, err := schema_store.GetTablesFromDB(target, c.Target.Schema)
targetTables = filter(targetTables, c.TargetTable)
if !reflect.DeepEqual(sourceTables, targetTables) {
首先看下dcp调用的StartLocal函数,它定义位于dcp/local_server.go:它首先调用barrier.Start,创建表来记录同步的位置信息。然后启动collector,支持mysql和grpc两种。然后启动协程调用c.Consume(m)方法来消费Msg信息。
代码语言:javascript复制func StartLocal(barrierConfig *barrier.Config, collectorConfigs []collector.Config, checkerConfig *checker.Config, shutdown chan struct{}, alarm chan<- checker.Result) error {
barrier.Start(barrierConfig, shutdown)
allMsg := make(chan *dcp.Message, 100)
for _, cfg := range collectorConfigs {
case *collector.MysqlConfig:
c = collector.NewMysqlCollector(cfg)
case *collector.GrpcConfig:
c = collector.NewGrpc(cfg)
c.Start()
go func(c <-chan *dcp.Message) {
for m := range c {
allMsg <- m
}
}(c.GetChan())
collectors = append(collectors, c)
for {
select {
case m := <-allMsg:
c.Consume(m)
首先看下其中一种collector:mysql collector,位于dcp/collector/mysql.go
代码语言:javascript复制func NewMysqlCollector(config *MysqlConfig) Interface {
syncer := replication.NewBinlogSyncer(cfg)
gtid, err := mysql.ParseMysqlGTIDSet(id)
schemaTables := make(map[string]tagInfo)
for _, tagConfig := range config.TagConfigs {
for _, v := range tagConfig.Tables {
schemaTables[schemaTbl2Key(v.Schema, v.Table)] = tagInfo{
tag: tagConfig.Tag,
primaryKeyIdx: v.PrimaryKeyIdx,
}
}
}
它首先解析GTID,然后开始根据配置,组装tag信息。它的Satrt方法开始处理数据同步:
代码语言:javascript复制func (c *Mysql) Start() {
go c.mainLoop()
}
根据binlog不同事件,进行不同处理:
代码语言:javascript复制func (c *Mysql) mainLoop() {
for {
rawEvent, err := c.streamer.GetEvent(context.Background())
switch evt := rawEvent.Event.(type) {
case *replication.RowsEvent:
schemaName, tableName := string(evt.Table.Schema), string(evt.Table.Table)
switch rawEvent.Header.EventType {
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
for _, row := range evt.Rows {
if isBarrier {
for _, v := range c.tagInfo {
msg := &dcp.Message{
Tag: v.tag,
Id: c.getNextGtid(),
Timestamp: uint64(rawEvent.Header.Timestamp),
}
grpc collector也类似dcp/collector/grpc.go
代码语言:javascript复制func (s *Grpc) Start() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
go s.server.Serve(lis)
代码语言:javascript复制func (s *Grpc) Process(stream dcp.DCPService_ProcessServer) error {
for {
msg, err := stream.Recv()
s.output <- msg
err = stream.Send(&dcp.Response{
Id: msg.Id,
})
最后看下Consume方法dcp/checker/checker.go
代码语言:javascript复制func (s *checker) Consume(msg *dcp.Message) {
idx, ok := s.tagIndex[msg.Tag]
s.buffers[idx].Write(msg)
上述过程中解析gtid使用了go-mysql库的函数github.com/siddontang/go-mysql@v0.0.0-20190312052122-c6ab05a85eb8/mysql/mysql_gtid.go
代码语言:javascript复制func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
if set, err := ParseUUIDSet(sp[i]); err != nil {
通用开始同步消息的函数StartSyncGTID也是来自这个库 github.com/siddontang/go-mysql@v0.0.0-20190312052122-c6ab05a85eb8/replication/binlogsyncer.go
代码语言:javascript复制func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
if err := b.prepare(); err != nil {
switch b.cfg.Flavor {
case MariaDBFlavor:
err = b.writeBinlogDumpMariadbGTIDCommand(gset)
default:
// default use MySQL
err = b.writeBinlogDumpMysqlGTIDCommand(gset)
return b.startDumpStream(), nil
代码语言:javascript复制func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
s := newBinlogStreamer()
b.wg.Add(1)
go b.onStream(s)
记录同步状态需要初始化一个mysql表,它的实现位于dcp/barrier/barrier.go
代码语言:javascript复制func Start(config *Config, shutdown chan struct{}) {
_, err = db.Exec("CREATE database if not EXISTS " DB_NAME)
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS ` DB_TABLE_NAME ` (
id int(11) unsigned NOT NULL AUTO_INCREMENT,
offset BIGINT unsigned NOT NULL DEFAULT 0,
ts timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`)
r, err := db.Exec("INSERT IGNORE INTO " DB_TABLE_NAME "(id, offset) VALUES (" RECORD_ID ", 0);")
go barrierLoop(ticker, db, shutdown)
库名和表名定义如下:
代码语言:javascript复制const (
DB_NAME = "drc"
TABLE_NAME = "barrier"
DB_TABLE_NAME = DB_NAME "." TABLE_NAME
RECORD_ID = "1"
OFFSET_COLUMN_SEQ = 1
)
了解完第一个命令后,我们重点分析下,核心命令gravity,为了找到核心命令入口,我们先看下Makefile
代码语言:javascript复制default: build
build:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/gravity cmd/gravity/main.go
#$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/padder cmd/padder/main.go
在cmd/gravity/main.go里调用了 pkg/app/server.go的NewServer
代码语言:javascript复制func NewServer(pipelineConfig config.PipelineConfigV3) (*Server, error) {
server, err := ParsePlugins(pipelineConfig)
if p, err := newer.NewPositionCache(); err != nil {
它依次执行了Ouput、Scheduler、PositionCache、Input等插件:
代码语言:javascript复制func (s *Server) Start() error {
switch o := s.Output.(type) {
case core.SynchronousOutput:
if err := o.Start(); err != nil {
case core.AsynchronousOutput:
if err := o.Start(s.Scheduler); err != nil {
if err := s.Scheduler.Start(s.Output); err != nil {
if err := s.PositionCache.Start(); err != nil {
if err := s.Input.Start(s.Emitter, s.Output.GetRouter(), s.PositionCache); err != nil {
插件的接口定义在pkg/core/output.go
代码语言:javascript复制type SynchronousOutput interface {
Start() error
Output
}
type AsynchronousOutput interface {
Start(msgAcker MsgAcker) error
Output
}
type Output interface {
Execute(msgs []*Msg) error
GetRouter() Router
Close()
}
插件的注册和解析位于pkg/app/server.go
代码语言:javascript复制func ParsePlugins(pipelineConfig config.PipelineConfigV3) (*Server, error)
plugin, err := registry.GetPlugin(registry.OutputPlugin, pipelineConfig.OutputPlugin.Type)
if err := plugin.Configure(pipelineConfig.PipelineName, pipelineConfig.OutputPlugin.Config); err != nil {
plugin, err = registry.GetPlugin(registry.SchedulerPlugin, pipelineConfig.SchedulerPlugin.Type)
fs, err := filters.NewFilters(pipelineConfig.FilterPlugins)
e, err := emitter.NewEmitter(fs, server.Scheduler)
input, err := newInput(pipelineConfig.PipelineName, inputPlugins)
比如Input插件 ,它是通过Type和name从map里面定位的。
代码语言:javascript复制func newInput(pipelineName string, inputConfig config.InputConfig) (core.Input, error) {
plugin, err := registry.GetPlugin(registry.InputPlugin, inputConfig.Type)
pkg/registry/registry.go
代码语言:javascript复制func GetPlugin(pluginType PluginType, name string) (Plugin, error) {
plugins, ok := registry[pluginType]
p, ok := plugins[name]
return p(), nil
代码语言:javascript复制var registry map[PluginType]map[string]PluginFactory
代码语言:javascript复制func RegisterPluginFactory(pluginType PluginType, name string, v PluginFactory) {
registry[pluginType][name] = v
如果是单例,直接返回,否则利用反射复制一份:
代码语言:javascript复制func RegisterPlugin(pluginType PluginType, name string, v Plugin, singleton bool) {
if singleton {
pf = func() Plugin {
return v
}
} else {
pf = func() Plugin {
return reflect.New(reflect.TypeOf(v).Elem()).Interface().(Plugin)
}
}
注册插件的时候调用
代码语言:javascript复制RegisterPluginFactory(pluginType, name, pf)
插件的类型定义如下:
代码语言:javascript复制const (
InputPlugin PluginType = "input"
PositionRepo PluginType = "positionRepo"
OutputPlugin PluginType = "output"
FilterPlugin PluginType = "filters"
MatcherPlugin PluginType = "matcher"
SchedulerPlugin PluginType = "scheduler"
SQLExecutionEnginePlugin PluginType = "sqlExecutionEngine"
)
比如filter插件的注册pkg/filters/accept_filter.go
代码语言:javascript复制func init() {
registry.RegisterPlugin(registry.FilterPlugin, AcceptFilterName, &acceptFilterFactoryType{}, true)
}
input插件的注册:pkg/inputs/mysql/input.go
代码语言:javascript复制func init() {
registry.RegisterPlugin(registry.InputPlugin, Name, &input{}, false)
}
代码语言:javascript复制const Name = "mysql"
代码语言:javascript复制type input struct {
core.Input
}
代码语言:javascript复制func (i *input) Configure(pipelineName string, data map[string]interface{}) error {
代码语言:javascript复制func (i *input) NewPositionCache() (position_cache.PositionCacheInterface, error) {
流式输入的插件定义在pkg/inputs/mysqlstream/input.go
代码语言:javascript复制const inputStreamKey = "mysqlstream"
const Name = "mysql-stream"
代码语言:javascript复制func (plugin *mysqlStreamInputPlugin) Start(emitter core.Emitter, router core.Router, positionCache position_cache.PositionCacheInterface) error {
sourceDB, err := utils.CreateDBConnection(plugin.cfg.Source)
sourceSchemaStore, err := schema_store.NewSimpleSchemaStoreFromDBConn(sourceDB)
plugin.binlogChecker, err = binlog_checker.NewBinlogChecker(
plugin.pipelineName,
plugin.probeDBConfig,
plugin.probeSQLAnnotation,
BinlogProbeInterval,
plugin.cfg.DisableBinlogChecker)
if err := plugin.binlogChecker.Start(); err != nil {
gravityServerID := utils.GenerateRandomServerID()
plugin.binlogTailer, err = NewBinlogTailer(
plugin.pipelineName,
plugin.cfg,
gravityServerID,
positionCache,
sourceSchemaStore,
sourceDB,
emitter,
router,
plugin.binlogChecker,
nil)
if err := plugin.binlogTailer.Start(); err != nil {
pkg/inputs/mysqlstream/binlog_tailer.go
代码语言:javascript复制if err := utils.CheckBinlogFormat(tailer.sourceDB); err != nil {
position, exist, err := tailer.positionCache.Get()
binlogPositionValue, ok := position.Value.(helper.BinlogPositionsValue)
streamer, err := tailer.getBinlogStreamer(binlogPositionValue.CurrentPosition.BinlogGTID)
currentPosition, err := GetCurrentPositionValue(tailer.positionCache)
for {
// ctx, cancel := context.WithTimeout(tailer.ctx, binlogSyncerTimeout)
e, err := streamer.GetEvent(tailer.ctx)
switch ev := e.Event.(type) {
case *replication.RotateEvent:
sourcePosition := gomysql.Position{Name: string(ev.NextLogName), Pos: uint32(ev.Position)}