golang源码分析:go-mysql(2)自己实现一个canel

2023-09-06 19:21:18 浏览数 (1)

如何用golang自己实现一个canel呢,github.com/go-mysql-org/go-mysql给我们提供了这样的能力,它已经完成mysql协议的解析,并将解析后同步从库的过程实现,加入了几个插件点,实现自己的canel只需要实现这几个插件点即可完成我们自定义的同步工具。下面我们结合源码分析一下如何实现。实现一个canel需要下面四步:初始化配置,创建canel实例,设置事件处理函数,开始处理。

代码语言:javascript复制
cfg := canal.NewDefaultConfig()
c, err := canal.NewCanal(cfg)
c.SetEventHandler(&MyEventHandler{})
c.Run()

配置的定义位于github.com/go-mysql-org/go-mysql@v1.7.0/canal/config.go,主要用配置同步的时候源,目标表的一些参数,用于控制同步过程的顺利进行:

代码语言:javascript复制
func NewDefaultConfig() *Config {
代码语言:javascript复制
type Config struct {
  Addr     string `toml:"addr"`
  User     string `toml:"user"`
  Password string `toml:"password"`


  Charset         string        `toml:"charset"`
  ServerID        uint32        `toml:"server_id"`
  Flavor          string        `toml:"flavor"`
  HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
  ReadTimeout     time.Duration `toml:"read_timeout"`


  // IncludeTableRegex or ExcludeTableRegex should contain database name
  // Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
  // eg, IncludeTableRegex : [".*\.canal"], ExcludeTableRegex : ["mysql\..*"]
  //     this will include all database's 'canal' table, except database 'mysql'
  // Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
  IncludeTableRegex []string `toml:"include_table_regex"`
  ExcludeTableRegex []string `toml:"exclude_table_regex"`


  // discard row event without table meta
  DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`


  Dump DumpConfig `toml:"dump"`


  UseDecimal bool `toml:"use_decimal"`
  ParseTime  bool `toml:"parse_time"`


  TimestampStringLocation *time.Location


  // SemiSyncEnabled enables semi-sync or not.
  SemiSyncEnabled bool `toml:"semi_sync_enabled"`


  // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
  // this configuration will not work if DisableRetrySync is true
  MaxReconnectAttempts int `toml:"max_reconnect_attempts"`


  // whether disable re-sync for broken connection
  DisableRetrySync bool `toml:"disable_retry_sync"`


  // Set TLS config
  TLSConfig *tls.Config


  //Set Logger
  Logger loggers.Advanced


  //Set Dialer
  Dialer client.Dialer
}

详细可以参考源码来进行参数的设置。

初始化canel实例的过程位于:github.com/go-mysql-org/go-mysql@v1.7.0/canal/canal.go

代码语言:javascript复制
func NewCanal(cfg *Config) (*Canal, error) {
  c.dumpDoneCh = make(chan struct{})
  c.eventHandler = &DummyEventHandler{}
  c.parser = parser.New()
  c.tables = make(map[string]*schema.Table)
  c.master = &masterInfo{logger: c.cfg.Logger}
  if err = c.prepareDumper(); err != nil {
  if err = c.prepareSyncer(); err != nil {
  if err := c.checkBinlogRowFormat(); err != nil {
  if n := len(c.cfg.IncludeTableRegex); n > 0 {
  if n := len(c.cfg.ExcludeTableRegex); n > 0 {

先会初始一个canel对象,然后给他一系列默认值,比如哑事件处理器。然后指定解析器,和同步器。其中canel的定义如下:

代码语言:javascript复制
type Canal struct {
  m sync.Mutex

  cfg *Config

  parser     *parser.Parser
  master     *masterInfo
  dumper     *dump.Dumper
  dumped     bool
  dumpDoneCh chan struct{}
  syncer     *replication.BinlogSyncer

  eventHandler EventHandler

  connLock sync.Mutex
  conn     *client.Conn

  tableLock          sync.RWMutex
  tables             map[string]*schema.Table
  errorTablesGetTime map[string]time.Time

  tableMatchCache   map[string]bool
  includeTableRegex []*regexp.Regexp
  excludeTableRegex []*regexp.Regexp

  delay *uint32

  ctx    context.Context
  cancel context.CancelFunc
}

接着我们重点看下哑事件处理器的实现:github.com/go-mysql-org/go-mysql@v1.7.0/canal/handler.go

代码语言:javascript复制
type DummyEventHandler struct {
}
代码语言:javascript复制
func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error {
  return nil
}

它实现了EventHandler的所有接口方法,但是内部没有任何操作。 EventHandler,接口有8个方法,如果我们需要自定义实现一个canel,自定义这8个方法即可:

代码语言:javascript复制
type EventHandler interface {
  OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error
  // OnTableChanged is called when the table is created, altered, renamed or dropped.
  // You need to clear the associated data like cache with the table.
  // It will be called before OnDDL.
  OnTableChanged(header *replication.EventHeader, schema string, table string) error
  OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
  OnRow(e *RowsEvent) error
  OnXID(header *replication.EventHeader, nextPos mysql.Position) error
  OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
  // OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
  OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
  String() string
}

看完Canel的对象初始化过程,我们接着需要设置事件处理器,替换哑事件处理器:c.SetEventHandler(&MyEventHandler{}):

代码语言:javascript复制
func (c *Canal) SetEventHandler(h EventHandler) {
  c.eventHandler = h
}

事件处理器应该如何定义呢?我们可以从哑事件处理器继承,然后实现我们关注的接口即可,比如下面的例子:

代码语言:javascript复制
type MyEventHandler struct {
  canal.DummyEventHandler
}
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
  log.Infof("%s %vn", e.Action, e.Rows)
  return nil
}

完成上述准备工作之后就来到了事件处理器的运行这一步:c.Run(),源码位于:github.com/go-mysql-org/go-mysql@v1.7.0/canal/canal.go

代码语言:javascript复制
func (c *Canal) Run() error {
  return c.run()
}

它首先记录运行开始时间,然dump需要同步的内容,最后开始同步到从存储。

代码语言:javascript复制
func (c *Canal) run() error {
        c.master.UpdateTimestamp(uint32(time.Now().Unix()))
        err := c.tryDump()
        if err := c.runSyncBinlog(); err != nil {

dump输出的过程位于:github.com/go-mysql-org/go-mysql@v1.7.0/canal/dump.go,它首先会解析已经同步的位置和GTID集合,然后开始同步。

代码语言:javascript复制
func (c *Canal) tryDump() error {
  pos := c.master.Position()
  gset := c.master.GTIDSet()
return c.dump()

这里补充下mysql binlog同步的基础知识:MySQL有2种方式指定复制同步的方式,分别为:

  • 基于binlog文件名及位点的指定方式- 匿名事务(Anonymous_gtid_log_event)
  • 基于GTID(全局事务ID)的指定方式- GTID事务(Gtid_log_event)

而基于GTID的方式一方面在一主多从的架构下主从切换有着明显优势外,对于日常复制异常的故障诊断方面也更为方便,从MySQL 5.7.6之后便开始支持动态开启和关闭GTID模式,其参数GTID_MODE有以下取值

OFF - 只允许匿名事务被复制同步

OFF_PERMISSIVE - 新产生的事务都是匿名事务,但也允许有GTID事务被复制同步

ON_PERMISSIVE - 新产生的都是GTID事务,但也允许有匿名事务被复制同步

ON - 只允许GTID事务被复制同步

GTID的解析过程如下:github.com/go-mysql-org/go-mysql@v1.7.0/mysql/gtid.go

代码语言:javascript复制
func ParseGTIDSet(flavor string, s string) (GTIDSet, error) {
 switch flavor {
  case MySQLFlavor:
    return ParseMysqlGTIDSet(s)
  case MariaDBFlavor:
    return ParseMariadbGTIDSet(s)

mysql和marindb的实现方式还有些许差异:github.com/go-mysql-org/go-mysql@v1.7.0/mysql/mysql_gtid.go

代码语言:javascript复制
func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
 if set, err := ParseUUIDSet(sp[i]); err != nil {  
代码语言:javascript复制
type MysqlGTIDSet struct {
  Sets map[string]*UUIDSet
}

上面刚好对应着两种同步方式。接着看下,数据序列化dump的详细过程:

代码语言:javascript复制
func (c *Canal) dump() error {
  c.master.UpdateTimestamp(uint32(time.Now().Unix()))
  h := &dumpParseHandler{c: c}
  if c.master.GTIDSet() != nil {
  gset, err := c.GetMasterGTIDSet()
  if c.cfg.Dump.SkipMasterData {
  pos, err := c.GetMasterPos()
  if err := c.dumper.DumpAndParse(h); err != nil {
  pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
  c.master.Update(pos)
  c.master.UpdateGTIDSet(h.gset)
  if err := c.eventHandler.OnPosSynced(nil, pos, c.master.GTIDSet(), true); err != nil {
  c.master.UpdateGTIDSet(h.gset)

在dump的过程中,先解析已经同步的位置或者GTID,然后开启解析同步,同步完成后,更新已经同步的位置和GTID集合,最后调用我们的eventHandler处理位置同步完成事件。binlog同步位置信息记录在

代码语言:javascript复制
type masterInfo struct {
  sync.RWMutex

  pos mysql.Position

  gset mysql.GTIDSet

  timestamp uint32

  logger loggers.Advanced
}

解析同步的过程用到了dumpParseHandler,它的定义如下:

代码语言:javascript复制
type dumpParseHandler struct {
  c    *Canal
  name string
  pos  uint64
  gset mysql.GTIDSet
}

实现了接口

代码语言:javascript复制
type ParseHandler interface {
  // Parse CHANGE MASTER TO MASTER_LOG_FILE=name, MASTER_LOG_POS=pos;
  BinLog(name string, pos uint64) error
  GtidSet(gtidsets string) error
  Data(schema string, table string, values []string) error
}

下面是它的具体实现:

代码语言:javascript复制
func (h *dumpParseHandler) GtidSet(gtidsets string) (err error) {
          err = h.gset.Update(gtidsets)
          h.gset, err = mysql.ParseGTIDSet("mysql", gtidsets)
代码语言:javascript复制
func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
  h.name = name
  h.pos = pos
  return nil
}

重点关注下Data函数的实现:

代码语言:javascript复制
func (h *dumpParseHandler) Data(db string, table string, values []string) error {
  tableInfo, err := h.c.GetTable(db, table)
  for i, v := range values {
    if v == "NULL" {
      vs[i] = nil
    } else if v == "_binary ''" {
      vs[i] = []byte{}
    } else if v[0] != ''' {
    if tableInfo.Columns[i].Type == schema.TYPE_NUMBER || tableInfo.Columns[i].Type == schema.TYPE_MEDIUM_INT {
       } else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
      f, err := strconv.ParseFloat(v, 64)
      } else if tableInfo.Columns[i].Type == schema.TYPE_DECIMAL {
  if h.c.cfg.UseDecimal {
    } else if strings.HasPrefix(v, "0x") {
     buf, err := hex.DecodeString(v[2:])
    events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}, nil)
return h.c.eventHandler.OnRow(events)

它完成了binlog的解析,将它反序列化到golang的运行时。然后触发了第二个事件OnRow。

具体dump的过程位于github.com/go-mysql-org/go-mysql@v1.7.0/dump/dumper.go,解析过程位于一个独立的协程中,会跳过配置中指定的表格:

代码语言:javascript复制
func (d *Dumper) DumpAndParse(h ParseHandler) error {
    r, w := io.Pipe()
      go func() {
        err := Parse(r, h, !d.masterDataSkipped)
    err := d.Dump(w)

解析过程在一个for循环中:

代码语言:javascript复制
func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
  for {
    line, err := rb.ReadString('n')
        line = strings.TrimRightFunc(line, func(c rune) bool {
      return c == 'r' || c == 'n'
    })


    if err := h.GtidSet(gtidStr); err != nil {
    if err = h.BinLog(name, pos); err != nil && err != ErrSkip {
    if err = h.Data(db, table, values); err != nil && err != ErrSkip {

解析完后就是dump的过程,它直接复用了mysqldump工具:

代码语言:javascript复制
func (d *Dumper) Dump(w io.Writer) error {
        _, err := w.Write([]byte(fmt.Sprintf("USE `%s`;n", d.TableDB)))
        cmd := exec.Command(d.ExecutionPath, args...)
        return cmd.Run()
代码语言:javascript复制
// Unlick mysqldump, Dumper is designed for parsing and syning data easily.
type Dumper struct {
  // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
  ExecutionPath string


  Addr     string
  User     string
  Password string
  Protocol string


  // Will override Databases
  Tables  []string
  TableDB string


  Databases []string


  Where   string
  Charset string


  IgnoreTables map[string][]string


  ExtraOptions []string


  ErrOut io.Writer


  masterDataSkipped bool
  maxAllowedPacket  int
  hexBlob           bool


  // see detectColumnStatisticsParamSupported
  isColumnStatisticsParamSupported bool
}

dump完成后就是同步的过程:github.com/go-mysql-org/go-mysql@v1.7.0/canal/sync.go

代码语言:javascript复制
func (c *Canal) runSyncBinlog() error {
      s, err := c.startSyncer()
      
  for {
      ev, err := s.GetEvent(c.ctx)
      c.updateReplicationDelay(ev)
      if ev.Header.LogPos == 0 {
      switch e := ev.Event.(type) {
      case *replication.RotateEvent:
        fakeRotateLogName := string(e.NextLogName)
        if fakeRotateLogName != c.master.Position().Name {
      pos := c.master.Position()
      switch e := ev.Event.(type) {
      case *replication.RotateEvent:
        if err = c.eventHandler.OnRotate(ev.Header, e); err != nil {
      case *replication.RowsEvent:
      // we only focus row based event
      err = c.handleRowsEvent(ev)
      case *replication.XIDEvent:
      savePos = true
      // try to save the position later
      if err := c.eventHandler.OnXID(ev.Header, pos);
        c.master.UpdateGTIDSet(e.GSet)
      case *replication.MariadbGTIDEvent:
      // try to save the GTID later
      gtid, err := mysql.ParseMariadbGTIDSet(e.GTID.String())
        if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
      case *replication.GTIDEvent:
      u, _ := uuid.FromBytes(e.SID)
      gtid, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO))
        if err := c.eventHandler.OnGTID(ev.Header, gtid); err != nil {
        case *replication.QueryEvent:
          stmts, _, err := c.parser.Parse(string(e.Query), "", "")
            for _, stmt := range stmts {
            nodes := parseStmt(stmt)
            for _, node := range nodes {
              if node.db == "" {
                node.db = string(e.Schema)
              }
              if err = c.updateTable(ev.Header, node.db, node.table); 
            if len(nodes) > 0 {
              savePos = true
              force = true
              // Now we only handle Table Changed DDL, maybe we will support more later.
              if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
              if savePos {
      c.master.Update(pos)
      c.master.UpdateTimestamp(ev.Header.Timestamp)
     if err := c.eventHandler.OnPosSynced(ev.Header, pos, c.master.GTIDSet(), force); err != nil {

先同步位置信息,然后开始同步具体内容,同步过程中处理完内容后开始分发不同的事件处理函数,交给我们的handler来进行处理。同步位置信息的函数定义如下:

代码语言:javascript复制
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
  s, err := c.syncer.StartSync(pos)
  s, err := c.syncer.StartSyncGTID(gset)

同步行变更信息一般是我们重点关注的,它的实现如下,解析mysql binlog协议,然后转化成,插入、更新、删除事件。最后交给我们的OnRow事件来处理。

代码语言:javascript复制
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
 switch e.Header.EventType {
  case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
    action = InsertAction
  case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
     action = DeleteAction
  case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
      action = UpdateAction
      events := newRowsEvent(t, action, ev.Rows, e.Header)
  return c.eventHandler.OnRow(events)

总结下,mysql binlog同步的过程主要有三步:1,解析同步快照信息,即位置信息,然后解析binlog,最后同步给下游消费者。go-mysql将上述复杂过程完全包裹起来,抽象出了8个方法,我们只需要按需实现这8个方法中的某些我们需要的,就可以完成自定义的canel工具。

0 人点赞