一、Slots迁移的场景&主要面临的问题
为什么需要Slots迁移,或者说在什么场景下需要迁移?主要是为了扩容,Codis以Slot为单位将整个集群分成了1024个Slots,因此如果在运行过程中想增加服务器,就需要将原有的一些Slots迁移到新的服务器上。
迁移主要的问题:
1、Slot中Key的处理
一个Slot下可能有很多key,因此整个Slot迁移是需要时间的,因此整个Slot在迁移过程中key就有不同的情况,有的正在迁,有的还没迁,有的则已经迁走,针对这些不同状态的key, Codis是如何保证数据的一致性的。
2、大key的处理
像一个list或hash,可能成员成千上万,如何保证迁移的原子性和一致性。
3、如何保障系统的可用性
因为迁移是耗时的,是用同步还是异步,如何在系统可用性和数据一致性做权衡?
二、迁移代码分析
1、入口
迁移一般在Fe界面上由管理员发起,一般来说是迁移一个范围:
后端入口为apiServer::SlotCreateActionRange,这个函数只做一些基本的参数验证,实际调用Topom的SlotCreateActionRange:
代码语言:javascript复制func (s *Topom) SlotCreateActionRange(beg, end int, gid int, must bool) error {
//省略一些代码
var pending []int
for sid := beg; sid <= end; sid {
m, err := ctx.getSlotMapping(sid)
if err != nil {
return err
}
if m.Action.State != models.ActionNothing {
if !must {
continue
}
return errors.Errorf("slot-[%d] action already exists", sid)
}
if m.GroupId == g.Id {
if !must {
continue
}
return errors.Errorf("slot-[%d] already in group-[%d]", sid, g.Id)
}
pending = append(pending, m.Id)
}
for _, sid := range pending {
m, err := ctx.getSlotMapping(sid)
if err != nil {
return err
}
defer s.dirtySlotsCache(m.Id)
//更改状态
m.Action.State = models.ActionPending
m.Action.Index = ctx.maxSlotActionIndex() 1
m.Action.TargetId = g.Id
//更新Zookeeper的状态
if err := s.storeUpdateSlotMapping(m); err != nil {
return err
}
}
return nil
}
先会检查一些状态,如该Slot是否正在迁移,目标Group和当前Group是否一致,后面重点逻辑是将状态改为ActionPending,然后保存到Zk中就返回给用户了。
到上面肯定还没迁完,应该是有后台程序扫描这个状态然后进行迁移,这个入口为Topom::ProcessSlotAction,这个协程随着Dashboard启动的时候启动:
代码语言:javascript复制go func() {
for !s.IsClosed() {
if s.IsOnline() {
if err := s.ProcessSlotAction(); err != nil {
log.WarnErrorf(err, "process slot action failed")
time.Sleep(time.Second * 5)
}
}
time.Sleep(time.Second)
}
}()
具体代码如下:
代码语言:javascript复制func (s *Topom) ProcessSlotAction() error {
for s.IsOnline() {
var (
marks = make(map[int]bool)
plans = make(map[int]bool)
)
var accept = func(m *models.SlotMapping) bool {
if marks[m.GroupId] || marks[m.Action.TargetId] {
return false
}
if plans[m.Id] {
return false
}
return true
}
var update = func(m *models.SlotMapping) bool {
if m.GroupId != 0 {
marks[m.GroupId] = true
}
marks[m.Action.TargetId] = true
plans[m.Id] = true
return true
}
var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots)
for parallel > len(plans) {
//状态转移在这里完成
_, ok, err := s.SlotActionPrepareFilter(accept, update)
if err != nil {
return err
} else if !ok {
break
}
}
if len(plans) == 0 {
return nil
}
var fut sync2.Future
for sid, _ := range plans {
fut.Add()
go func(sid int) {
log.Warnf("slot-[%d] process action", sid)
//重点,真正的数据迁移
var err = s.processSlotAction(sid)
if err != nil {
status := fmt.Sprintf("[ERROR] Slot[d]: %s", sid, err)
s.action.progress.status.Store(status)
} else {
s.action.progress.status.Store("")
}
fut.Done(strconv.Itoa(sid), err)
}(sid)
}
for _, v := range fut.Wait() {
if v != nil {
return v.(error)
}
}
time.Sleep(time.Millisecond * 10)
}
return nil
}
代码语言:javascript复制func (s *Topom) SlotActionPrepareFilter(accept, update func(m *models.SlotMapping) bool) (int, bool, error) {
//省略一些代码
switch m.Action.State {
case models.ActionPending:
m.Action.State = models.ActionPreparing
if err := s.storeUpdateSlotMapping(m); err != nil {
return 0, false, err
}
fallthrough
case models.ActionPreparing:
m.Action.State = models.ActionPrepared
if err := s.resyncSlotMappings(ctx, m); err != nil {
return 0, false, err
}
if err := s.storeUpdateSlotMapping(m); err != nil {
return 0, false, err
}
fallthrough
case models.ActionPrepared:
m.Action.State = models.ActionMigrating
if err := s.resyncSlotMappings(ctx, m); err != nil {
log.Warnf("slot-[%d] resync to migrating failed", m.Id)
return 0, false, err
}
if err := s.storeUpdateSlotMapping(m); err != nil {
return 0, false, err
}
fallthrough
case models.ActionMigrating:
return m.Id, true, nil
case models.ActionFinished:
return m.Id, true, nil
default:
return 0, false, errors.Errorf("slot-[%d] action state is invalid", m.Id)
}
}
可以看到整个的状态变换过程如下:
ActionPending =》ActionPreparing =》ActionPrepared
=> ActionMigrating => ActionFinished
在ActionMigrating之前变更都只是更新Zk中的状态,ActionPreparing和ActionPrepared还会调用resyncSlotMappings通过Proxy重连新的Redis Server并且设置slot从哪迁移等信息:
代码语言:javascript复制case models.ActionPrepared:
fallthrough
case models.ActionMigrating:
slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
slot.BackendAddrGroupId = m.Action.TargetId
slot.MigrateFrom = ctx.getGroupMaster(m.GroupId)
slot.MigrateFromGroupId = m.GroupId
然后看实际的数据迁移是怎么发生的,回到ProcessSlotAction方法
代码语言:javascript复制var err = s.processSlotAction(sid)
代码语言:javascript复制func (s *Topom) processSlotAction(sid int) error {
var db int = 0
for s.IsOnline() {
if exec, err := s.newSlotActionExecutor(sid); err != nil {
return err
} else if exec == nil {
time.Sleep(time.Second)
} else {
n, nextdb, err := exec(db)
if err != nil {
return err
}
log.Debugf("slot-[%d] action executor %d", sid, n)
//迁移完成判断
if n == 0 && nextdb == -1 {
return s.SlotActionComplete(sid)
}
status := fmt.Sprintf("[OK] Slot[d]@DB[%d]=%d", sid, db, n)
s.action.progress.status.Store(status)
if us := s.GetSlotActionInterval(); us != 0 {
time.Sleep(time.Microsecond * time.Duration(us))
}
db = nextdb
}
}
return nil
}
通过newSlotActionExecutor得到执行器,
代码语言:javascript复制switch method {
case models.ForwardSync:
do = func() (int, error) {
return c.MigrateSlot(sid, dest)
}
case models.ForwardSemiAsync:
var option = &redis.MigrateSlotAsyncOption{
MaxBulks: s.config.MigrationAsyncMaxBulks,
MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
NumKeys: s.config.MigrationAsyncNumKeys,
Timeout: math2.MinDuration(time.Second*5,
s.config.MigrationTimeout.Duration()),
}
do = func() (int, error) {
return c.MigrateSlotAsync(sid, dest, option)
}
可以看到迁移分同步和异步,看同步:
代码语言:javascript复制func (c *Client) MigrateSlot(slot int, target string) (int, error) {
host, port, err := net.SplitHostPort(target)
if err != nil {
return 0, errors.Trace(err)
}
mseconds := int(c.Timeout / time.Millisecond)
if reply, err := c.Do("SLOTSMGRTTAGSLOT", host, port, mseconds, slot); err != nil {
return 0, errors.Trace(err)
} else {
p, err := redigo.Ints(redigo.Values(reply, nil))
if err != nil || len(p) != 2 {
return 0, errors.Errorf("invalid response = %v", reply)
}
return p[1], nil
}
}
可以看到如果是同步迁移会调用SLOTSMGRTTAGSLOT命令进行迁移,这是一个Codis对Redis改造的命令,会随机迁移Slot下一个Key,所以在上面有判断是否迁移完成的:
代码语言:javascript复制func (s *Topom) processSlotAction(sid int) error {
var db int = 0
for s.IsOnline() {
if exec, err := s.newSlotActionExecutor(sid); err != nil {
return err
} else {
n, nextdb, err := exec(db)
//迁移完成判断
if n == 0 && nextdb == -1 {
return s.SlotActionComplete(sid)
}
}
}
return nil
}
即命令返回2个参数(第3个异常忽略),第1个表示迁移的数量,第2个表示下一个要迁移的数据库,如果前者为0后者为-1则表示迁移完成。迁移完成后调用SlotActionComplete标记迁移完成
代码语言:javascript复制case models.ActionFinished:
log.Warnf("slot-[%d] resync to finished", m.Id)
if err := s.resyncSlotMappings(ctx, m); err != nil {
log.Warnf("slot-[%d] resync to finished failed", m.Id)
return err
}
defer s.dirtySlotsCache(m.Id)
m = &models.SlotMapping{
Id: m.Id,
GroupId: m.Action.TargetId,
}
return s.storeUpdateSlotMapping(m)
2、迁移过程中Slot的key的读写处理
前面分析Proxy代码的时候讲过,一个请求会进入到Session的handleRequest:
代码语言:javascript复制func (s *Session) handleRequest(r *Request, d *Router) error {
opstr, flag, err := getOpInfo(r.Multi)
if err != nil {
return err
}
r.OpStr = opstr
r.OpFlag = flag
r.Broken = &s.broken
if flag.IsNotAllowed() {
return fmt.Errorf("command '%s' is not allowed", opstr)
}
switch opstr {
case "QUIT":
return s.handleQuit(r)
case "AUTH":
return s.handleAuth(r)
}
if !s.authorized {
if s.config.SessionAuth != "" {
r.Resp = redis.NewErrorf("NOAUTH Authentication required")
return nil
}
s.authorized = true
}
switch opstr {
case "SELECT":
return s.handleSelect(r)
case "PING":
return s.handleRequestPing(r, d)
case "INFO":
return s.handleRequestInfo(r, d)
case "MGET":
return s.handleRequestMGet(r, d)
case "MSET":
return s.handleRequestMSet(r, d)
case "DEL":
return s.handleRequestDel(r, d)
case "EXISTS":
return s.handleRequestExists(r, d)
case "SLOTSINFO":
return s.handleRequestSlotsInfo(r, d)
case "SLOTSSCAN":
return s.handleRequestSlotsScan(r, d)
case "SLOTSMAPPING":
return s.handleRequestSlotsMapping(r, d)
default:
return d.dispatch(r)
}
}
默认会走到d.dispatch,如果是同步的话会走下面的逻辑:
代码语言:javascript复制func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
if s.migrate.bc != nil && len(hkey) != 0 {
if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
log.Debugf("slot-d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
return nil, err
}
}
r.Group = &s.refs
r.Group.Add(1)
return d.forward2(s, r), nil
}
如果slot正在迁移会调用slotsmgrt处理,
代码语言:javascript复制unc (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uint) error {
m := &Request{}
m.Multi = []*redis.Resp{
redis.NewBulkBytes([]byte("SLOTSMGRTTAGONE")),
redis.NewBulkBytes(s.backend.bc.host),
redis.NewBulkBytes(s.backend.bc.port),
redis.NewBulkBytes([]byte("3000")),
redis.NewBulkBytes(hkey),
}
可以看到,如果当前处理的key所属的slot正在迁移,则调用SLOTSMGRTTAGSLOT命令将这个key迁移完成再返回给客户端,即必须要迁移这个key完成才返回给客户端。
三、总结
1、Slots迁移由管理员在Fe手动发起,发起后Codis只是将Slot状态变成
ActionPending;
2、Codis后台线程会扫描上述状态的Slots,依次进行以下状态的转换:
ActionPending => ActionPreparing => ActionPrepared =>
ActionMigrating;
3、ActionMigrating状态的Slots由Codis向Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key,这个过程会一直持续,直到Slot下所有Key迁移完成;
4、迁移过程中的Slot下的操作如果是同步则会先等待key迁移操作完成才往下操作,只要下层Redis Server执行是原子的,则可以保证整个过程的原子性。
可以看到,整个过程还是比较复杂的,特别是一些核心逻辑在Redis Server了,在Redis Server层如何保证操作的原子性和一致性,这个和异步迁移后面另外再讲述。