Codis源码分析之Slots迁移异步篇

2020-12-30 16:08:06 浏览数 (1)

前面分析了Codis关于Slots迁移同步整体内容及同步迁移的一些分析:Codis源码分析之Slots迁移篇,再回顾同步迁移的逻辑:

1、向后端Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key到目标Redis Server中,这个过程会一直持续;

2、如果在迁移过程中如果要针对某个key进行修改,这个key正好在迁移,同步迁移的逻辑是先调用SLOTSMGRTTAGONE 将这个key迁移完成才能处理请求,以保证数据的一致性。

今天我们来分析下什么时候是同步/异步迁移如何设置,异步迁移的流程是怎么样的。

一、迁移方式是在哪里设置的

迁移是同步还是异步是保存在Slot的method字段中:

代码语言:javascript复制
type Slot struct {
  id   int
  lock struct {
    hold bool
    sync.RWMutex
  }
  refs sync.WaitGroup

  switched bool

  backend, migrate struct {
    id int
    bc *sharedBackendConn
  }
  replicaGroups [][]*sharedBackendConn

  method forwardMethod
}

可以设置为forwardSync和forwardSemiAsync,前者对应同步,后者对应异步,Proxy在初始化时会设置为同步:

代码语言:javascript复制
func NewRouter(config *Config) *Router {
  s := &Router{config: config}
  s.pool.primary = newSharedBackendConnPool(config, config.BackendPrimaryParallel)
  s.pool.replica = newSharedBackendConnPool(config, config.BackendReplicaParallel)
  for i := range s.slots {
    s.slots[i].id = i
    //默认同步转发
    s.slots[i].method = &forwardSync{}
  }
  return s
}

在context的toSlot方法中会根据ctx.method设置ForwardMethod:

代码语言:javascript复制
func (ctx *context) toSlot(m *models.SlotMapping, p *models.Proxy) *models.Slot {
  slot := &models.Slot{
    Id:     m.Id,
    Locked: ctx.isSlotLocked(m),

    ForwardMethod: ctx.method,
  }

而ctx.method字段在context初始化时根据配置MigrationMethod生成:

代码语言:javascript复制
func (s *Topom) newContext() (*context, error) {
  if s.online {
    if err := s.refillCache(); err != nil {
      return nil, err
    } else {
      ctx := &context{}
      ctx.slots = s.cache.slots
      //读取配置
      ctx.method, _ = models.ParseForwardMethod(s.config.MigrationMethod)
      return ctx, nil
    }
  } else {
    return nil, ErrNotOnline
  }
}

这个对应dashboard配置文件中的migration_method:

代码语言:javascript复制
 # Set arguments for data migration (only accept 'sync' & 'semi-async').
 25 migration_method = "semi-async"

二、同步和异步的处理逻辑的差异

1、处理客户端请求的差别

上面分析了同步还是异步转发取决于配置文件,这个配置是在Slot一级,关于Slot相关操作,包括如何转发后端命令都是有区别的,为了详细地分析差别, 我们看两个实现:forwardSync和forwardSemiAsync的区别,两者的区别如下:

1、forwardSemiAsync增加了重试,因为需要异步等待key迁移完成,对应的是Forward方法;

2、最主要的是process方法,这个方法都由Forward调用;

再回顾下前面讲的内容,一个正常请求的调用链大概如下:

代码语言:javascript复制
Session::loopReader
Session::handleRequest
Router::dispatch
slot.forward
slot.method.Forward

即请求首先处理读事件,读取客户发送过来的请求数据,按Redis协议编码、解码;然后将给Session的handleRequest,后者再转给Router,Router再交给Slot,而Slot最后交由上面说的两个转发方法。

关于同步的处理上一篇文章 Codis源码分析之Slots迁移篇 已经分析了会检查当前Slot是否在迁移中,如果是则调用SLOTSMGRTTAGONE命令迁移当前key,并且必须等待迁移完成才往下处理请求,以保证数据的一致性:

代码语言:javascript复制
func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
  //如果正在迁移,查询这个key是否迁移完成
  if s.migrate.bc != nil && len(hkey) != 0 {
    if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
      log.Debugf("slot-d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
        s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
      return nil, err
    }
  }
  r.Group = &s.refs
  r.Group.Add(1)
  return d.forward2(s, r), nil
}

再来看异步的处理方法:

代码语言:javascript复制
func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *BackendConn, retry bool, _ error) {
  if s.migrate.bc != nil && len(hkey) != 0 {
    resp, moved, err := d.slotsmgrtExecWrapper(s, hkey, r.Database, r.Seed16(), r.Multi)
    switch {
    case err != nil:
      log.Debugf("slot-d migrate from = %s to %s failed: hash key = '%s', error = %s",
        s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, err)
      return nil, false, err
    case !moved:
      switch {
      case resp != nil:
        r.Resp = resp
        return nil, false, nil
      }
      return nil, true, nil
    }
  }
  r.Group = &s.refs
  r.Group.Add(1)
  return d.forward2(s, r), false, nil
}

调用slotsmgrtExecWrapper来处理命令:

代码语言:javascript复制
func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int32, seed uint, multi []*redis.Resp) (_ *redis.Resp, moved bool, _ error) {
  m := &Request{}
  m.Multi = make([]*redis.Resp, 0, 2 len(multi))
  m.Multi = append(m.Multi,
    redis.NewBulkBytes([]byte("SLOTSMGRT-EXEC-WRAPPER")),
    redis.NewBulkBytes(hkey),
  )
  //省略代码
  }

可以看到是调用redis命令SLOTSMGRT-EXEC-WRAPPER来处理请求,再看下这个命令的C实现:

再跟进去,看slotsmgrtExecWrapperCommand函数

代码语言:javascript复制
void
slotsmgrtExecWrapperCommand(client *c) {
    //查找命令是否存在
    struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
    if (cmd == NULL) {
        addReplyLongLong(c, -1);
        addReplyErrorFormat(c,"invalid command specified (%s)",
                (char *)c->argv[2]->ptr);
        return;
    }
    if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) {
        addReplyLongLong(c, -1);
        addReplyErrorFormat(c, "wrong number of arguments for command (%s)",
                (char *)c->argv[2]->ptr);
        return;
    }
    if (lookupKeyWrite(c->db, c->argv[1]) == NULL) {
        addReplyLongLong(c, 0);
        addReplyError(c, "the specified key doesn't exist");
        return;
    }
    //如果正在迁移并且当前命令是写命令则返回错误
    if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) {
        addReplyLongLong(c, 1);
        addReplyError(c, "the specified key is being migrated");
        return;
    } else {
        addReplyLongLong(c, 2);
        robj **argv = zmalloc(sizeof(robj *) * (c->argc - 2));
        for (int i = 2; i < c->argc; i   ) {
            argv[i - 2] = c->argv[i];
            incrRefCount(c->argv[i]);
        }
        for (int i = 0; i < c->argc; i   ) {
            decrRefCount(c->argv[i]);
        }
        zfree(c->argv);
        c->argc = c->argc - 2;
        c->argv = argv;
        c->cmd = cmd;
        call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE);
    }
}

可以看到SLOTSMGRT-EXEC-WRAPPER会判断当前操作的命令是否为写命令,并且这个key是否在迁移或阻塞中,如果是则返回错误,这种情况下需要Proxy进行重试。

2、迁移数据的差别

前面分析了同步和异步迁移数据调用的方法不同:

代码语言:javascript复制
switch method {
      case models.ForwardSync:
        do = func() (int, error) {
          return c.MigrateSlot(sid, dest)
        }
      case models.ForwardSemiAsync:
        var option = &redis.MigrateSlotAsyncOption{
          MaxBulks: s.config.MigrationAsyncMaxBulks,
          MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
          NumKeys:  s.config.MigrationAsyncNumKeys,
          Timeout: math2.MinDuration(time.Second*5,
            s.config.MigrationTimeout.Duration()),
        }
        do = func() (int, error) {
          return c.MigrateSlotAsync(sid, dest, option)
        }

同步调用的方法前面已经分析过了,看下异步迁移的方法,即MigrateSlotAsync:

代码语言:javascript复制
func (c *Client) MigrateSlotAsync(slot int, target string, option *MigrateSlotAsyncOption) (int, error) {
  host, port, err := net.SplitHostPort(target)
  if err != nil {
    return 0, errors.Trace(err)
  }
  if reply, err := c.Do("SLOTSMGRTTAGSLOT-ASYNC", host, port, int(option.Timeout/time.Millisecond),
    option.MaxBulks, option.MaxBytes, slot, option.NumKeys); err != nil {
    return 0, errors.Trace(err)
  } else {
   //
  }
}

可以看到是调用SLOTSMGRTTAGSLOT-ASYNC命令进行迁移,Redis Server实现的逻辑比较复杂这里就不具体分析了,大概过程如下:

1)源Redis对key进行序列化异步发送给目标Redis;

2)目标Redis通过Restore还原后回复给源Redis;

3)源Redis收到目标Redis确认后标记这个key迁移完成,迁移下一个key;

另外说下大key的处理,对于大key,如一个长度为1W的list,Codis会将key分拆成多个命令,因为通过不断的rpush最终的结果一样;

Codis会在每一个拆分后的指令中加上一个临时TTL;

等全部拆分的指令执行成功才会删除本地的key;

因此即使中途迁移失败,已迁移成功的key也会超时自动删除,最终效果就好比迁移没有发生一样。

三、总结

1、同步还是异步迁移取决于dashboard配置文件migration_method;

2、同步和异步有两个区别:

一是处理请求的不同,如果当前要操作的key所属Slot正在迁移,同步处理会发送命令等待后端迁移完成才往下操作,异步则是将当前请求封装成一次SLOTSMGRT-EXEC-WRAPPER调用,并且将操作命令及参数都发送过去,后者会判断这个key是否在迁移或阻塞,如果是并且当前为写命令则直接返回失败,由Proxy重试。

二是迁移逻辑不同,同步会调用SLOTSMGRTTAGSLOT迁移,异步则是调用SLOTSMGRTTAGSLOT-ASYNC,前者每次随机迁移一个key,异步的过程则复杂得多,对于小key需要确认才算迁移完成,对于大key还会分拆成多条命令,以保证不阻塞主流程,并且在拆分后的命令都加上TTL,以保证如果中途失败目标Redis的key会及时清掉而不会产生脏数据。

Codis源码分析之Slots迁移篇

Codis Proxy初始化篇

Codis Proxy是如何处理一个请求的

Raft算法之集群成员变化篇

360 Atlas生产环境使用心得

0 人点赞