Codis源码分析之Slots迁移篇

2020-12-14 10:55:24 浏览数 (1)

一、Slots迁移的场景&主要面临的问题

为什么需要Slots迁移,或者说在什么场景下需要迁移?主要是为了扩容,Codis以Slot为单位将整个集群分成了1024个Slots,因此如果在运行过程中想增加服务器,就需要将原有的一些Slots迁移到新的服务器上。

迁移主要的问题:

1、Slot中Key的处理

一个Slot下可能有很多key,因此整个Slot迁移是需要时间的,因此整个Slot在迁移过程中key就有不同的情况,有的正在迁,有的还没迁,有的则已经迁走,针对这些不同状态的key, Codis是如何保证数据的一致性的。

2、大key的处理

像一个list或hash,可能成员成千上万,如何保证迁移的原子性和一致性。

3、如何保障系统的可用性

因为迁移是耗时的,是用同步还是异步,如何在系统可用性和数据一致性做权衡?

二、迁移代码分析

1、入口

迁移一般在Fe界面上由管理员发起,一般来说是迁移一个范围:

后端入口为apiServer::SlotCreateActionRange,这个函数只做一些基本的参数验证,实际调用Topom的SlotCreateActionRange:

代码语言:javascript复制
func (s *Topom) SlotCreateActionRange(beg, end int, gid int, must bool) error {
  //省略一些代码

  var pending []int
  for sid := beg; sid <= end; sid   {
    m, err := ctx.getSlotMapping(sid)
    if err != nil {
      return err
    }
    if m.Action.State != models.ActionNothing {
      if !must {
        continue
      }
      return errors.Errorf("slot-[%d] action already exists", sid)
    }
    if m.GroupId == g.Id {
      if !must {
        continue
      }
      return errors.Errorf("slot-[%d] already in group-[%d]", sid, g.Id)
    }
    pending = append(pending, m.Id)
  }

  for _, sid := range pending {
    m, err := ctx.getSlotMapping(sid)
    if err != nil {
      return err
    }
    defer s.dirtySlotsCache(m.Id)
    //更改状态
    m.Action.State = models.ActionPending
    m.Action.Index = ctx.maxSlotActionIndex()   1
    m.Action.TargetId = g.Id
    //更新Zookeeper的状态
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return err
    }
  }
  return nil
}

先会检查一些状态,如该Slot是否正在迁移,目标Group和当前Group是否一致,后面重点逻辑是将状态改为ActionPending,然后保存到Zk中就返回给用户了。

到上面肯定还没迁完,应该是有后台程序扫描这个状态然后进行迁移,这个入口为Topom::ProcessSlotAction,这个协程随着Dashboard启动的时候启动:

代码语言:javascript复制
go func() {
    for !s.IsClosed() {
      if s.IsOnline() {
        if err := s.ProcessSlotAction(); err != nil {
          log.WarnErrorf(err, "process slot action failed")
          time.Sleep(time.Second * 5)
        }
      }
      time.Sleep(time.Second)
    }
  }()

具体代码如下:

代码语言:javascript复制
func (s *Topom) ProcessSlotAction() error {
  for s.IsOnline() {
    var (
      marks = make(map[int]bool)
      plans = make(map[int]bool)
    )
    var accept = func(m *models.SlotMapping) bool {
      if marks[m.GroupId] || marks[m.Action.TargetId] {
        return false
      }
      if plans[m.Id] {
        return false
      }
      return true
    }
    var update = func(m *models.SlotMapping) bool {
      if m.GroupId != 0 {
        marks[m.GroupId] = true
      }
      marks[m.Action.TargetId] = true
      plans[m.Id] = true
      return true
    }
    var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots)
    for parallel > len(plans) {
    //状态转移在这里完成
      _, ok, err := s.SlotActionPrepareFilter(accept, update)
      if err != nil {
        return err
      } else if !ok {
        break
      }
    }
    if len(plans) == 0 {
      return nil
    }
    var fut sync2.Future
    for sid, _ := range plans {
      fut.Add()
      go func(sid int) {
        log.Warnf("slot-[%d] process action", sid)
        //重点,真正的数据迁移
        var err = s.processSlotAction(sid)
        if err != nil {
          status := fmt.Sprintf("[ERROR] Slot[d]: %s", sid, err)
          s.action.progress.status.Store(status)
        } else {
          s.action.progress.status.Store("")
        }
        fut.Done(strconv.Itoa(sid), err)
      }(sid)
    }
    for _, v := range fut.Wait() {
      if v != nil {
        return v.(error)
      }
    }
    time.Sleep(time.Millisecond * 10)
  }
  return nil
}
代码语言:javascript复制
func (s *Topom) SlotActionPrepareFilter(accept, update func(m *models.SlotMapping) bool) (int, bool, error) {
  //省略一些代码

  switch m.Action.State {

  case models.ActionPending:
    m.Action.State = models.ActionPreparing
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return 0, false, err
    }

    fallthrough
  case models.ActionPreparing:
    m.Action.State = models.ActionPrepared
    if err := s.resyncSlotMappings(ctx, m); err != nil {
      return 0, false, err
    }
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return 0, false, err
    }

    fallthrough
  case models.ActionPrepared:
    m.Action.State = models.ActionMigrating
    if err := s.resyncSlotMappings(ctx, m); err != nil {
      log.Warnf("slot-[%d] resync to migrating failed", m.Id)
      return 0, false, err
    }
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return 0, false, err
    }

    fallthrough
  case models.ActionMigrating:
    return m.Id, true, nil
  case models.ActionFinished:
    return m.Id, true, nil
  default:
    return 0, false, errors.Errorf("slot-[%d] action state is invalid", m.Id)
  }
}

可以看到整个的状态变换过程如下:

ActionPending =》ActionPreparing =》ActionPrepared

=> ActionMigrating => ActionFinished

在ActionMigrating之前变更都只是更新Zk中的状态,ActionPreparing和ActionPrepared还会调用resyncSlotMappings通过Proxy重连新的Redis Server并且设置slot从哪迁移等信息:

代码语言:javascript复制
case models.ActionPrepared:
    fallthrough
  case models.ActionMigrating:
    slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
    slot.BackendAddrGroupId = m.Action.TargetId
    slot.MigrateFrom = ctx.getGroupMaster(m.GroupId)
    slot.MigrateFromGroupId = m.GroupId

然后看实际的数据迁移是怎么发生的,回到ProcessSlotAction方法

代码语言:javascript复制
var err = s.processSlotAction(sid)
代码语言:javascript复制
func (s *Topom) processSlotAction(sid int) error {
  var db int = 0
  for s.IsOnline() {
    if exec, err := s.newSlotActionExecutor(sid); err != nil {
      return err
    } else if exec == nil {
      time.Sleep(time.Second)
    } else {
      n, nextdb, err := exec(db)
      if err != nil {
        return err
      }
      log.Debugf("slot-[%d] action executor %d", sid, n)
      //迁移完成判断
      if n == 0 && nextdb == -1 {
        return s.SlotActionComplete(sid)
      }
      status := fmt.Sprintf("[OK] Slot[d]@DB[%d]=%d", sid, db, n)
      s.action.progress.status.Store(status)

      if us := s.GetSlotActionInterval(); us != 0 {
        time.Sleep(time.Microsecond * time.Duration(us))
      }
      db = nextdb
    }
  }
  return nil
}

通过newSlotActionExecutor得到执行器,

代码语言:javascript复制
switch method {
      case models.ForwardSync:
        do = func() (int, error) {
          return c.MigrateSlot(sid, dest)
        }
      case models.ForwardSemiAsync:
        var option = &redis.MigrateSlotAsyncOption{
          MaxBulks: s.config.MigrationAsyncMaxBulks,
          MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
          NumKeys:  s.config.MigrationAsyncNumKeys,
          Timeout: math2.MinDuration(time.Second*5,
            s.config.MigrationTimeout.Duration()),
        }
        do = func() (int, error) {
          return c.MigrateSlotAsync(sid, dest, option)
        }

可以看到迁移分同步和异步,看同步:

代码语言:javascript复制
func (c *Client) MigrateSlot(slot int, target string) (int, error) {
  host, port, err := net.SplitHostPort(target)
  if err != nil {
    return 0, errors.Trace(err)
  }
  mseconds := int(c.Timeout / time.Millisecond)
  if reply, err := c.Do("SLOTSMGRTTAGSLOT", host, port, mseconds, slot); err != nil {
    return 0, errors.Trace(err)
  } else {
    p, err := redigo.Ints(redigo.Values(reply, nil))
    if err != nil || len(p) != 2 {
      return 0, errors.Errorf("invalid response = %v", reply)
    }
    return p[1], nil
  }
}

可以看到如果是同步迁移会调用SLOTSMGRTTAGSLOT命令进行迁移,这是一个Codis对Redis改造的命令,会随机迁移Slot下一个Key,所以在上面有判断是否迁移完成的:

代码语言:javascript复制
func (s *Topom) processSlotAction(sid int) error {
  var db int = 0
  for s.IsOnline() {
    if exec, err := s.newSlotActionExecutor(sid); err != nil {
      return err
    } else {
      n, nextdb, err := exec(db)
      //迁移完成判断
      if n == 0 && nextdb == -1 {
        return s.SlotActionComplete(sid)
      }
  
    }
  }
  return nil
}

即命令返回2个参数(第3个异常忽略),第1个表示迁移的数量,第2个表示下一个要迁移的数据库,如果前者为0后者为-1则表示迁移完成。迁移完成后调用SlotActionComplete标记迁移完成

代码语言:javascript复制
case models.ActionFinished:

    log.Warnf("slot-[%d] resync to finished", m.Id)

    if err := s.resyncSlotMappings(ctx, m); err != nil {
      log.Warnf("slot-[%d] resync to finished failed", m.Id)
      return err
    }
    defer s.dirtySlotsCache(m.Id)

    m = &models.SlotMapping{
      Id:      m.Id,
      GroupId: m.Action.TargetId,
    }
    return s.storeUpdateSlotMapping(m)

2、迁移过程中Slot的key的读写处理

前面分析Proxy代码的时候讲过,一个请求会进入到Session的handleRequest:

代码语言:javascript复制
func (s *Session) handleRequest(r *Request, d *Router) error {
  opstr, flag, err := getOpInfo(r.Multi)
  if err != nil {
    return err
  }
  r.OpStr = opstr
  r.OpFlag = flag
  r.Broken = &s.broken

  if flag.IsNotAllowed() {
    return fmt.Errorf("command '%s' is not allowed", opstr)
  }

  switch opstr {
  case "QUIT":
    return s.handleQuit(r)
  case "AUTH":
    return s.handleAuth(r)
  }

  if !s.authorized {
    if s.config.SessionAuth != "" {
      r.Resp = redis.NewErrorf("NOAUTH Authentication required")
      return nil
    }
    s.authorized = true
  }

  switch opstr {
  case "SELECT":
    return s.handleSelect(r)
  case "PING":
    return s.handleRequestPing(r, d)
  case "INFO":
    return s.handleRequestInfo(r, d)
  case "MGET":
    return s.handleRequestMGet(r, d)
  case "MSET":
    return s.handleRequestMSet(r, d)
  case "DEL":
    return s.handleRequestDel(r, d)
  case "EXISTS":
    return s.handleRequestExists(r, d)
  case "SLOTSINFO":
    return s.handleRequestSlotsInfo(r, d)
  case "SLOTSSCAN":
    return s.handleRequestSlotsScan(r, d)
  case "SLOTSMAPPING":
    return s.handleRequestSlotsMapping(r, d)
  default:
    return d.dispatch(r)
  }
}

默认会走到d.dispatch,如果是同步的话会走下面的逻辑:

代码语言:javascript复制
func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
  if s.migrate.bc != nil && len(hkey) != 0 {
    if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
      log.Debugf("slot-d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
        s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
      return nil, err
    }
  }
  r.Group = &s.refs
  r.Group.Add(1)
  return d.forward2(s, r), nil
}

如果slot正在迁移会调用slotsmgrt处理,

代码语言:javascript复制
unc (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uint) error {
  m := &Request{}
  m.Multi = []*redis.Resp{
    redis.NewBulkBytes([]byte("SLOTSMGRTTAGONE")),
    redis.NewBulkBytes(s.backend.bc.host),
    redis.NewBulkBytes(s.backend.bc.port),
    redis.NewBulkBytes([]byte("3000")),
    redis.NewBulkBytes(hkey),
  }

可以看到,如果当前处理的key所属的slot正在迁移,则调用SLOTSMGRTTAGSLOT命令将这个key迁移完成再返回给客户端,即必须要迁移这个key完成才返回给客户端。

三、总结

1、Slots迁移由管理员在Fe手动发起,发起后Codis只是将Slot状态变成

ActionPending;

2、Codis后台线程会扫描上述状态的Slots,依次进行以下状态的转换:

ActionPending => ActionPreparing => ActionPrepared =>

ActionMigrating;

3、ActionMigrating状态的Slots由Codis向Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key,这个过程会一直持续,直到Slot下所有Key迁移完成;

4、迁移过程中的Slot下的操作如果是同步则会先等待key迁移操作完成才往下操作,只要下层Redis Server执行是原子的,则可以保证整个过程的原子性。

可以看到,整个过程还是比较复杂的,特别是一些核心逻辑在Redis Server了,在Redis Server层如何保证操作的原子性和一致性,这个和异步迁移后面另外再讲述。

0 人点赞