前面分析了Codis关于Slots迁移同步整体内容及同步迁移的一些分析:Codis源码分析之Slots迁移篇,再回顾同步迁移的逻辑:
1、向后端Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key到目标Redis Server中,这个过程会一直持续;
2、如果在迁移过程中如果要针对某个key进行修改,这个key正好在迁移,同步迁移的逻辑是先调用SLOTSMGRTTAGONE 将这个key迁移完成才能处理请求,以保证数据的一致性。
今天我们来分析下什么时候是同步/异步迁移如何设置,异步迁移的流程是怎么样的。
一、迁移方式是在哪里设置的
迁移是同步还是异步是保存在Slot的method字段中:
代码语言:javascript复制type Slot struct {
id int
lock struct {
hold bool
sync.RWMutex
}
refs sync.WaitGroup
switched bool
backend, migrate struct {
id int
bc *sharedBackendConn
}
replicaGroups [][]*sharedBackendConn
method forwardMethod
}
可以设置为forwardSync和forwardSemiAsync,前者对应同步,后者对应异步,Proxy在初始化时会设置为同步:
代码语言:javascript复制func NewRouter(config *Config) *Router {
s := &Router{config: config}
s.pool.primary = newSharedBackendConnPool(config, config.BackendPrimaryParallel)
s.pool.replica = newSharedBackendConnPool(config, config.BackendReplicaParallel)
for i := range s.slots {
s.slots[i].id = i
//默认同步转发
s.slots[i].method = &forwardSync{}
}
return s
}
在context的toSlot方法中会根据ctx.method设置ForwardMethod:
代码语言:javascript复制func (ctx *context) toSlot(m *models.SlotMapping, p *models.Proxy) *models.Slot {
slot := &models.Slot{
Id: m.Id,
Locked: ctx.isSlotLocked(m),
ForwardMethod: ctx.method,
}
而ctx.method字段在context初始化时根据配置MigrationMethod生成:
代码语言:javascript复制func (s *Topom) newContext() (*context, error) {
if s.online {
if err := s.refillCache(); err != nil {
return nil, err
} else {
ctx := &context{}
ctx.slots = s.cache.slots
//读取配置
ctx.method, _ = models.ParseForwardMethod(s.config.MigrationMethod)
return ctx, nil
}
} else {
return nil, ErrNotOnline
}
}
这个对应dashboard配置文件中的migration_method:
代码语言:javascript复制 # Set arguments for data migration (only accept 'sync' & 'semi-async').
25 migration_method = "semi-async"
二、同步和异步的处理逻辑的差异
1、处理客户端请求的差别
上面分析了同步还是异步转发取决于配置文件,这个配置是在Slot一级,关于Slot相关操作,包括如何转发后端命令都是有区别的,为了详细地分析差别, 我们看两个实现:forwardSync和forwardSemiAsync的区别,两者的区别如下:
1、forwardSemiAsync增加了重试,因为需要异步等待key迁移完成,对应的是Forward方法;
2、最主要的是process方法,这个方法都由Forward调用;
再回顾下前面讲的内容,一个正常请求的调用链大概如下:
代码语言:javascript复制Session::loopReader
Session::handleRequest
Router::dispatch
slot.forward
slot.method.Forward
即请求首先处理读事件,读取客户发送过来的请求数据,按Redis协议编码、解码;然后将给Session的handleRequest,后者再转给Router,Router再交给Slot,而Slot最后交由上面说的两个转发方法。
关于同步的处理上一篇文章 Codis源码分析之Slots迁移篇 已经分析了会检查当前Slot是否在迁移中,如果是则调用SLOTSMGRTTAGONE命令迁移当前key,并且必须等待迁移完成才往下处理请求,以保证数据的一致性:
代码语言:javascript复制func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
//如果正在迁移,查询这个key是否迁移完成
if s.migrate.bc != nil && len(hkey) != 0 {
if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
log.Debugf("slot-d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
return nil, err
}
}
r.Group = &s.refs
r.Group.Add(1)
return d.forward2(s, r), nil
}
再来看异步的处理方法:
代码语言:javascript复制func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *BackendConn, retry bool, _ error) {
if s.migrate.bc != nil && len(hkey) != 0 {
resp, moved, err := d.slotsmgrtExecWrapper(s, hkey, r.Database, r.Seed16(), r.Multi)
switch {
case err != nil:
log.Debugf("slot-d migrate from = %s to %s failed: hash key = '%s', error = %s",
s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, err)
return nil, false, err
case !moved:
switch {
case resp != nil:
r.Resp = resp
return nil, false, nil
}
return nil, true, nil
}
}
r.Group = &s.refs
r.Group.Add(1)
return d.forward2(s, r), false, nil
}
调用slotsmgrtExecWrapper来处理命令:
代码语言:javascript复制func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int32, seed uint, multi []*redis.Resp) (_ *redis.Resp, moved bool, _ error) {
m := &Request{}
m.Multi = make([]*redis.Resp, 0, 2 len(multi))
m.Multi = append(m.Multi,
redis.NewBulkBytes([]byte("SLOTSMGRT-EXEC-WRAPPER")),
redis.NewBulkBytes(hkey),
)
//省略代码
}
可以看到是调用redis命令SLOTSMGRT-EXEC-WRAPPER来处理请求,再看下这个命令的C实现:
再跟进去,看slotsmgrtExecWrapperCommand函数
代码语言:javascript复制void
slotsmgrtExecWrapperCommand(client *c) {
//查找命令是否存在
struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
if (cmd == NULL) {
addReplyLongLong(c, -1);
addReplyErrorFormat(c,"invalid command specified (%s)",
(char *)c->argv[2]->ptr);
return;
}
if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) {
addReplyLongLong(c, -1);
addReplyErrorFormat(c, "wrong number of arguments for command (%s)",
(char *)c->argv[2]->ptr);
return;
}
if (lookupKeyWrite(c->db, c->argv[1]) == NULL) {
addReplyLongLong(c, 0);
addReplyError(c, "the specified key doesn't exist");
return;
}
//如果正在迁移并且当前命令是写命令则返回错误
if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) {
addReplyLongLong(c, 1);
addReplyError(c, "the specified key is being migrated");
return;
} else {
addReplyLongLong(c, 2);
robj **argv = zmalloc(sizeof(robj *) * (c->argc - 2));
for (int i = 2; i < c->argc; i ) {
argv[i - 2] = c->argv[i];
incrRefCount(c->argv[i]);
}
for (int i = 0; i < c->argc; i ) {
decrRefCount(c->argv[i]);
}
zfree(c->argv);
c->argc = c->argc - 2;
c->argv = argv;
c->cmd = cmd;
call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE);
}
}
可以看到SLOTSMGRT-EXEC-WRAPPER会判断当前操作的命令是否为写命令,并且这个key是否在迁移或阻塞中,如果是则返回错误,这种情况下需要Proxy进行重试。
2、迁移数据的差别
前面分析了同步和异步迁移数据调用的方法不同:
代码语言:javascript复制switch method {
case models.ForwardSync:
do = func() (int, error) {
return c.MigrateSlot(sid, dest)
}
case models.ForwardSemiAsync:
var option = &redis.MigrateSlotAsyncOption{
MaxBulks: s.config.MigrationAsyncMaxBulks,
MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
NumKeys: s.config.MigrationAsyncNumKeys,
Timeout: math2.MinDuration(time.Second*5,
s.config.MigrationTimeout.Duration()),
}
do = func() (int, error) {
return c.MigrateSlotAsync(sid, dest, option)
}
同步调用的方法前面已经分析过了,看下异步迁移的方法,即MigrateSlotAsync:
代码语言:javascript复制func (c *Client) MigrateSlotAsync(slot int, target string, option *MigrateSlotAsyncOption) (int, error) {
host, port, err := net.SplitHostPort(target)
if err != nil {
return 0, errors.Trace(err)
}
if reply, err := c.Do("SLOTSMGRTTAGSLOT-ASYNC", host, port, int(option.Timeout/time.Millisecond),
option.MaxBulks, option.MaxBytes, slot, option.NumKeys); err != nil {
return 0, errors.Trace(err)
} else {
//
}
}
可以看到是调用SLOTSMGRTTAGSLOT-ASYNC命令进行迁移,Redis Server实现的逻辑比较复杂这里就不具体分析了,大概过程如下:
1)源Redis对key进行序列化异步发送给目标Redis;
2)目标Redis通过Restore还原后回复给源Redis;
3)源Redis收到目标Redis确认后标记这个key迁移完成,迁移下一个key;
另外说下大key的处理,对于大key,如一个长度为1W的list,Codis会将key分拆成多个命令,因为通过不断的rpush最终的结果一样;
Codis会在每一个拆分后的指令中加上一个临时TTL;
等全部拆分的指令执行成功才会删除本地的key;
因此即使中途迁移失败,已迁移成功的key也会超时自动删除,最终效果就好比迁移没有发生一样。
三、总结
1、同步还是异步迁移取决于dashboard配置文件migration_method;
2、同步和异步有两个区别:
一是处理请求的不同,如果当前要操作的key所属Slot正在迁移,同步处理会发送命令等待后端迁移完成才往下操作,异步则是将当前请求封装成一次SLOTSMGRT-EXEC-WRAPPER调用,并且将操作命令及参数都发送过去,后者会判断这个key是否在迁移或阻塞,如果是并且当前为写命令则直接返回失败,由Proxy重试。
二是迁移逻辑不同,同步会调用SLOTSMGRTTAGSLOT迁移,异步则是调用SLOTSMGRTTAGSLOT-ASYNC,前者每次随机迁移一个key,异步的过程则复杂得多,对于小key需要确认才算迁移完成,对于大key还会分拆成多条命令,以保证不阻塞主流程,并且在拆分后的命令都加上TTL,以保证如果中途失败目标Redis的key会及时清掉而不会产生脏数据。
Codis源码分析之Slots迁移篇
Codis Proxy初始化篇
Codis Proxy是如何处理一个请求的
Raft算法之集群成员变化篇
360 Atlas生产环境使用心得