实现binlog增量同步(Incremental dumping)需要哪些步骤呢?获取配置,初始化同步器,找到上一次同步位置,开启同步,并处理解析到的事件,整体流程如下:
代码语言:javascript复制 cfg := replication.BinlogSyncerConfig{}
syncer := replication.NewBinlogSyncer(cfg)
streamer, _ := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
for {
ev, _ := streamer.GetEvent(context.Background())
// Dump event
ev.Dump(os.Stdout)
}
配置定义如下,定义了binlog同步需要的必要参数:
代码语言:javascript复制// BinlogSyncerConfig is the configuration for BinlogSyncer.
type BinlogSyncerConfig struct {
// ServerID is the unique ID in cluster.
ServerID uint32
// Flavor is "mysql" or "mariadb", if not set, use "mysql" default.
Flavor string
// Host is for MySQL server host.
Host string
// Port is for MySQL server port.
Port uint16
// User is for MySQL user.
User string
// Password is for MySQL password.
Password string
// Localhost is local hostname if register salve.
// If not set, use os.Hostname() instead.
Localhost string
// Charset is for MySQL client character set
Charset string
// SemiSyncEnabled enables semi-sync or not.
SemiSyncEnabled bool
// RawModeEnabled is for not parsing binlog event.
RawModeEnabled bool
// If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
TLSConfig *tls.Config
// Use replication.Time structure for timestamp and datetime.
// We will use Local location for timestamp and UTC location for datatime.
ParseTime bool
// If ParseTime is false, convert TIMESTAMP into this specified timezone. If
// ParseTime is true, this option will have no effect and TIMESTAMP data will
// be parsed into the local timezone and a full time.Time struct will be
// returned.
//
// Note that MySQL TIMESTAMP columns are offset from the machine local
// timezone while DATETIME columns are offset from UTC. This is consistent
// with documented MySQL behaviour as it return TIMESTAMP in local timezone
// and DATETIME in UTC.
//
// Setting this to UTC effectively equalizes the TIMESTAMP and DATETIME time
// strings obtained from MySQL.
TimestampStringLocation *time.Location
// Use decimal.Decimal structure for decimals.
UseDecimal bool
// RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
RecvBufferSize int
// master heartbeat period
HeartbeatPeriod time.Duration
// read timeout
ReadTimeout time.Duration
// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
// this configuration will not work if DisableRetrySync is true
MaxReconnectAttempts int
// whether disable re-sync for broken connection
DisableRetrySync bool
// Only works when MySQL/MariaDB variable binlog_checksum=CRC32.
// For MySQL, binlog_checksum was introduced since 5.6.2, but CRC32 was set as default value since 5.6.6 .
// https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum
// For MariaDB, binlog_checksum was introduced since MariaDB 5.3, but CRC32 was set as default value since MariaDB 10.2.1 .
// https://mariadb.com/kb/en/library/replication-and-binary-log-server-system-variables/#binlog_checksum
VerifyChecksum bool
// DumpCommandFlag is used to send binglog dump command. Default 0, aka BINLOG_DUMP_NEVER_STOP.
// For MySQL, BINLOG_DUMP_NEVER_STOP and BINLOG_DUMP_NON_BLOCK are available.
// https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block
// For MariaDB, BINLOG_DUMP_NEVER_STOP, BINLOG_DUMP_NON_BLOCK and BINLOG_SEND_ANNOTATE_ROWS_EVENT are available.
// https://mariadb.com/kb/en/library/com_binlog_dump/
// https://mariadb.com/kb/en/library/annotate_rows_event/
DumpCommandFlag uint16
//Option function is used to set outside of BinlogSyncerConfig, between mysql connection and COM_REGISTER_SLAVE
//For MariaDB: slave_gtid_ignore_duplicates、skip_replication、slave_until_gtid
Option func(*client.Conn) error
// Set Logger
Logger loggers.Advanced
// Set Dialer
Dialer client.Dialer
RowsEventDecodeFunc func(*RowsEvent, []byte) error
DiscardGTIDSet bool
}
然后初始化同步器,它的定义如下,github.com/go-mysql-org/go-mysql@v1.7.0/replication/binlogsyncer.go:
代码语言:javascript复制// BinlogSyncer syncs binlog event from server.
type BinlogSyncer struct {
m sync.RWMutex
cfg BinlogSyncerConfig
c *client.Conn
wg sync.WaitGroup
parser *BinlogParser
nextPos Position
prevGset, currGset GTIDSet
// instead of GTIDSet.Clone, use this to speed up calculate prevGset
prevMySQLGTIDEvent *GTIDEvent
running bool
ctx context.Context
cancel context.CancelFunc
lastConnectionID uint32
retryCount int
}
初始化同步器的时候会指定解析器,以及失败处理器github.com/go-mysql-org/go-mysql@v1.7.0/replication/binlogsyncer.go:
代码语言:javascript复制func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
b := new(BinlogSyncer)
b.cfg = cfg
b.parser = NewBinlogParser()
b.parser.SetFlavor(cfg.Flavor)
同步器位于github.com/go-mysql-org/go-mysql@v1.7.0/replication/parser.go
代码语言:javascript复制func NewBinlogParser() *BinlogParser {
p := new(BinlogParser)
p.tables = make(map[uint64]*TableMapEvent)
return p
}
代码语言:javascript复制type BinlogParser struct {
// "mysql" or "mariadb", if not set, use "mysql" by default
flavor string
format *FormatDescriptionEvent
tables map[uint64]*TableMapEvent
// for rawMode, we only parse FormatDescriptionEvent and RotateEvent
rawMode bool
parseTime bool
timestampStringLocation *time.Location
// used to start/stop processing
stopProcessing uint32
useDecimal bool
ignoreJSONDecodeErr bool
verifyChecksum bool
rowsEventDecodeFunc func(*RowsEvent, []byte) error
}
开启同步的时候需要给定上次同步的位置:
github.com/go-mysql-org/go-mysql@v1.7.0/mysql/position.go
代码语言:javascript复制// For binlog filename position based replication
type Position struct {
Name string
Pos uint32
}
github.com/go-mysql-org/go-mysql@v1.7.0/replication/binlogsyncer.go定位到指定位置后就开启同步
代码语言:javascript复制func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
if err := b.prepareSyncPos(pos); err != nil {
return b.startDumpStream(), nil
定位到同步位置,包括准备工作如下:这册从库,允许半同步。
代码语言:javascript复制func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
if err := b.prepare(); err != nil {
if err := b.writeBinlogDumpCommand(pos); err != nil {
代码语言:javascript复制func (b *BinlogSyncer) prepare() error {
if err := b.registerSlave(); err != nil {
if err := b.enableSemiSync(); err != nil {
代码语言:javascript复制func (b *BinlogSyncer) registerSlave() error {
b.c, err = b.newConnection(b.ctx)
b.lastConnectionID = b.c.GetConnectionID()
if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"); err != nil {
s, _ := r.GetString(0, 1)
if _, err = b.c.Execute(`SET @master_binlog_checksum='NONE'`); err != nil {
if err = b.writeRegisterSlaveCommand(); err != nil {
if _, err = b.c.ReadOKPacket(); err != nil {
if _, err = b.c.Execute(fmt.Sprintf("SET @slave_uuid = '%s', @replica_uuid = '%s'", serverUUID, serverUUID)); err != nil {
代码语言:javascript复制func (b *BinlogSyncer) enableSemiSync() error {
if r, err := b.c.Execute("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';"); err != nil {
_, err := b.c.Execute(`SET @rpl_semi_sync_slave = 1;`)
开启同步后会开启一个独立的携程来解析binlog:
代码语言:javascript复制func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
s := NewBinlogStreamer()
go b.onStream(s)
代码语言:javascript复制func NewBinlogStreamer() *BinlogStreamer {
s := new(BinlogStreamer)
s.ch = make(chan *BinlogEvent, 10240)
同步的过程,会先将解析的结果放入BinlogStreamer的channel里面,然后在for循环中消费这个channel
代码语言:javascript复制// BinlogStreamer gets the streaming event.
type BinlogStreamer struct {
ch chan *BinlogEvent
ech chan error
err error
}
解析binlog过程中,会对不同事件进行不同处理,然后放入channel里
代码语言:javascript复制func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
for {
data, err := b.c.ReadPacket()
for {
select {
case <-b.ctx.Done():
s.close()
return
case <-time.After(time.Second):
b.retryCount
if err = b.retrySync(); err != nil {
switch data[0] {
case OK_HEADER:
if err = b.parseEvent(s, data); err != nil {
s.closeWithError(err)
return
}
case ERR_HEADER:
err = b.c.HandleErrorPacket(data)
s.closeWithError(err)
return
case EOF_HEADER:
代码语言:javascript复制func (b *BinlogSyncer) retrySync() error {
b.parser.Reset()
if err := b.prepareSyncGTID(b.prevGset); err != nil {
if err := b.prepareSyncPos(b.nextPos); err != nil {
核心函数是parseEvent,放入channel就在这个函数里:
代码语言:javascript复制func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
e, err := b.parser.Parse(data)
switch event := e.Event.(type) {
case *RotateEvent:
b.nextPos.Name = string(event.NextLogName)
b.nextPos.Pos = uint32(event.Position)
b.cfg.Logger.Infof("rotate to %s", b.nextPos)
case *GTIDEvent:
if b.prevGset == nil {
break
}
if b.currGset == nil {
b.currGset = b.prevGset.Clone()
}
u, _ := uuid.FromBytes(event.SID)
b.currGset.(*MysqlGTIDSet).AddGTID(u, event.GNO)
if b.prevMySQLGTIDEvent != nil {
u, _ = uuid.FromBytes(b.prevMySQLGTIDEvent.SID)
b.prevGset.(*MysqlGTIDSet).AddGTID(u, b.prevMySQLGTIDEvent.GNO)
}
b.prevMySQLGTIDEvent = event
case *MariadbGTIDEvent:
if b.prevGset == nil {
break
}
if b.currGset == nil {
b.currGset = b.prevGset.Clone()
}
prev := b.currGset.Clone()
err = b.currGset.(*MariadbGTIDSet).AddSet(&event.GTID)
if err != nil {
return errors.Trace(err)
}
// right after reconnect we will see same gtid as we saw before, thus currGset will not get changed
if !b.currGset.Equal(prev) {
b.prevGset = prev
}
case *XIDEvent:
if !b.cfg.DiscardGTIDSet {
event.GSet = getCurrentGtidSet()
}
case *QueryEvent:
if !b.cfg.DiscardGTIDSet {
event.GSet = getCurrentGtidSet()
}
}
select {
case s.ch <- e:
case <-b.ctx.Done():
needStop = true
}
if needACK {
err := b.replySemiSyncACK(b.nextPos)
代码语言:javascript复制func (b *BinlogSyncer) replySemiSyncACK(p Position) error {
data[pos] = SemiSyncIndicator
err := b.c.WritePacket(data)
for {
ev, _ := streamer.GetEvent(context.Background())
// Dump event
ev.Dump(os.Stdout)
}
接着就来到了最后读取解析到的event的过程,它就是简单的channel的读取过程:
代码语言:javascript复制func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) {
select {
case c := <-s.ch:
return c, nil
case s.err = <-s.ech:
return nil, s.err
case <-ctx.Done():
return nil, ctx.Err()
}
读取后我们可以进行简单dump
代码语言:javascript复制func (e *BinlogEvent) Dump(w io.Writer) {
e.Header.Dump(w)
e.Event.Dump(w)
}
代码语言:javascript复制type BinlogStreamer struct {
ch chan *BinlogEvent
ech chan error
err error
}
类似的函数还有:
代码语言:javascript复制func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime time.Time) (*BinlogEvent, error) {
startUnix := startTime.Unix()
select {
case c := <-s.ch:
if int64(c.Header.Timestamp) >= startUnix {
return c, nil
代码语言:javascript复制func (s *BinlogStreamer) DumpEvents() []*BinlogEvent {
count := len(s.ch)
events := make([]*BinlogEvent, 0, count)
for i := 0; i < count; i {
events = append(events, <-s.ch)
代码语言:javascript复制func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error {
select {
case s.ch <- ev:
return nil
case err := <-s.ech:
return err
}
}