golang源码分析:etcd(13)

2023-09-20 08:29:07 浏览数 (3)

我们来看下lease目录,了解下租约是如何实现的。首先我们还是从server的初始化地方开始:server/etcdserver/server.go,调用了NewLessor来初始化租约管理器

代码语言:javascript复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
    MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
    CheckpointInterval:         cfg.LeaseCheckpointInterval,
    CheckpointPersist:          cfg.LeaseCheckpointPersist,
    ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
  })

对应源码位于server/lease/lessor.go定义了租约管理相关接口和具体实现:

代码语言:javascript复制
type TxnDelete interface {
  DeleteRange(key, end []byte) (n, rev int64)
  End()
}
代码语言:javascript复制
type Lessor interface {
  // SetRangeDeleter lets the lessor create TxnDeletes to the store.
  // Lessor deletes the items in the revoked or expired lease by creating
  // new TxnDeletes.
  SetRangeDeleter(rd RangeDeleter)


  SetCheckpointer(cp Checkpointer)


  // Grant grants a lease that expires at least after TTL seconds.
  Grant(id LeaseID, ttl int64) (*Lease, error)
  // Revoke revokes a lease with given ID. The item attached to the
  // given lease will be removed. If the ID does not exist, an error
  // will be returned.
  Revoke(id LeaseID) error


  // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
  // the expiry of leases to less than the full TTL when possible.
  Checkpoint(id LeaseID, remainingTTL int64) error


  // Attach attaches given leaseItem to the lease with given LeaseID.
  // If the lease does not exist, an error will be returned.
  Attach(id LeaseID, items []LeaseItem) error


  // GetLease returns LeaseID for given item.
  // If no lease found, NoLease value will be returned.
  GetLease(item LeaseItem) LeaseID


  // Detach detaches given leaseItem from the lease with given LeaseID.
  // If the lease does not exist, an error will be returned.
  Detach(id LeaseID, items []LeaseItem) error


  // Promote promotes the lessor to be the primary lessor. Primary lessor manages
  // the expiration and renew of leases.
  // Newly promoted lessor renew the TTL of all lease to extend   previous TTL.
  Promote(extend time.Duration)


  // Demote demotes the lessor from being the primary lessor.
  Demote()


  // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
  // an error will be returned.
  Renew(id LeaseID) (int64, error)


  // Lookup gives the lease at a given lease id, if any
  Lookup(id LeaseID) *Lease


  // Leases lists all leases.
  Leases() []*Lease


  // ExpiredLeasesC returns a chan that is used to receive expired leases.
  ExpiredLeasesC() <-chan []*Lease


  // Recover recovers the lessor state from the given backend and RangeDeleter.
  Recover(b backend.Backend, rd RangeDeleter)


  // Stop stops the lessor for managing leases. The behavior of calling Stop multiple
  // times is undefined.
  Stop()
}
代码语言:javascript复制
type lessor struct {
  mu sync.RWMutex


  // demotec is set when the lessor is the primary.
  // demotec will be closed if the lessor is demoted.
  demotec chan struct{}


  leaseMap             map[LeaseID]*Lease
  leaseExpiredNotifier *LeaseExpiredNotifier
  leaseCheckpointHeap  LeaseQueue
  itemMap              map[LeaseItem]LeaseID


  // When a lease expires, the lessor will delete the
  // leased range (or key) by the RangeDeleter.
  rd RangeDeleter


  // When a lease's deadline should be persisted to preserve the remaining TTL across leader
  // elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
  cp Checkpointer


  // backend to persist leases. We only persist lease ID and expiry for now.
  // The leased items can be recovered by iterating all the keys in kv.
  b backend.Backend


  // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
  // requests for shorter TTLs are extended to the minimum TTL.
  minLeaseTTL int64


  expiredC chan []*Lease
  // stopC is a channel whose closure indicates that the lessor should be stopped.
  stopC chan struct{}
  // doneC is a channel whose closure indicates that the lessor is stopped.
  doneC chan struct{}


  lg *zap.Logger


  // Wait duration between lease checkpoints.
  checkpointInterval time.Duration
  // the interval to check if the expired lease is revoked
  expiredLeaseRetryInterval time.Duration
  // whether lessor should always persist remaining TTL (always enabled in v3.6).
  checkpointPersist bool
  // cluster is used to adapt lessor logic based on cluster version
  cluster cluster
}  
代码语言:javascript复制
type LessorConfig struct {
  MinLeaseTTL                int64
  CheckpointInterval         time.Duration
  ExpiredLeasesRetryInterval time.Duration
  CheckpointPersist          bool
}

在初始化租约管理器的时候会传入backend.Backend作为持久化存储器,对应具体实现就是bolt

代码语言:javascript复制
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
  return newLessor(lg, b, cluster, cfg)
}
代码语言:javascript复制
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor 

      l := &lessor{
        b:                         b,
      l.initAndRecover()
      go l.runLoop()

然后会起一个协程不断回收过期的租约,通知给channel

代码语言:javascript复制
func (le *lessor) runLoop() {
      for {
    le.revokeExpiredLeases()
    le.checkpointScheduledLeases()
代码语言:javascript复制
func (le *lessor) revokeExpiredLeases() {
        if le.isPrimary() {
    ls = le.findExpiredLeases(revokeLimit)
  }
        if len(ls) != 0 {
    select {
    case <-le.stopC:
      return
    case le.expiredC <- ls:
代码语言:javascript复制
func (le *lessor) checkpointScheduledLeases() {
          if le.isPrimary() {
      cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
    }
      le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
代码语言:javascript复制
func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
  return le.expiredC
}

当然这里也实现了核心授予租期的函数,通过persistTo 来持久化

代码语言:javascript复制
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
      l := &Lease{
      if _, ok := le.leaseMap[id]; ok {
        if le.isPrimary() {
    l.refresh(0)
  } else {
    l.forever()
  }
        le.leaseMap[id] = l
  l.persistTo(le.b)    
代码语言:javascript复制
func (l *Lease) persistTo(b backend.Backend) {

回收租约

代码语言:javascript复制
func (le *lessor) Revoke(id LeaseID) error {
      delete(le.leaseMap, id)
      txn := le.rd()
      keys := l.Keys()
  sort.StringSlice(keys).Sort()
  for _, key := range keys {
    txn.DeleteRange([]byte(key), nil)
  }

刷新租约(续约)

代码语言:javascript复制
func (le *lessor) Renew(id LeaseID) (int64, error) {
      l := le.leaseMap[id]
        l.refresh(0)
  item := &LeaseWithTime{id: l.ID, time: l.expiry}
  le.leaseExpiredNotifier.RegisterOrUpdate(item)
代码语言:javascript复制
func (le *lessor) Promote(extend time.Duration) {

文件里同时包含一个没有实现的FakeLessor

代码语言:javascript复制
type FakeLessor struct{}
代码语言:javascript复制
func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
代码语言:javascript复制
type FakeTxnDelete struct {
  backend.BatchTx
}

看完租约管理器,我们看下租约的具体实现server/lease/lease.go

代码语言:javascript复制
type Lease struct {
  ID           LeaseID
  ttl          int64 // time to live of the lease in seconds
  remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
  // expiryMu protects concurrent accesses to expiry
  expiryMu sync.RWMutex
  // expiry is time when lease should expire. no expiration when expiry.IsZero() is true
  expiry time.Time


  // mu protects concurrent accesses to itemSet
  mu      sync.RWMutex
  itemSet map[LeaseItem]struct{}
  revokec chan struct{}
}

它是通过persistTo,函数持久租约到backend存储的

代码语言:javascript复制
func (l *Lease) persistTo(b backend.Backend) {
      tx := b.BatchTx()
      tx.LockInsideApply()
      schema.MustUnsafePutLease(tx, &lpb)

租约本身是一个带ttl的k/v

代码语言:javascript复制
type LeaseItem struct {
  Key string
}

目录下实现了一个http协议的租约服务和对应的客户端操作函数:server/lease/leasehttp/http.go提供了俩路径

代码语言:javascript复制
LeasePrefix         = "/leases"
LeaseInternalPrefix = "/leases/internal"
代码语言:javascript复制
type leaseHandler struct {
  l      lease.Lessor
  waitch func() <-chan struct{}
}

分别对应了租约的刷新和查找

代码语言:javascript复制
func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
      switch r.URL.Path {
        case LeasePrefix:
          ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
        case LeaseInternalPrefix:
          l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
代码语言:javascript复制
func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
      req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(lreq))    
代码语言:javascript复制
func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
      req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(lreq))

server/lease/leasepb/lease.pb.go定义了租约结构的protoserver/lease/leasepb/lease.proto

代码语言:javascript复制
message Lease {
  int64 ID = 1;
  int64 TTL = 2;
  int64 RemainingTTL = 3;
}
代码语言:javascript复制
message LeaseInternalRequest {
  etcdserverpb.LeaseTimeToLiveRequest LeaseTimeToLiveRequest = 1;
}
代码语言:javascript复制
message LeaseInternalResponse {
  etcdserverpb.LeaseTimeToLiveResponse LeaseTimeToLiveResponse = 1;
}

server/lease/lease_queue.go里面实现了一个队列,用来维护租约的有序性

代码语言:javascript复制
type LeaseWithTime struct {
  id    LeaseID
  time  time.Time
  index int
}
代码语言:javascript复制
type LeaseQueue []*LeaseWithTime
代码语言:javascript复制
func (pq *LeaseQueue) Push(x interface{}) {
代码语言:javascript复制
type LeaseExpiredNotifier struct {
  m     map[LeaseID]*LeaseWithTime
  queue LeaseQueue
}

通过map加队列的形式实现了类似lru的能力

代码语言:javascript复制
func (mq *LeaseExpiredNotifier) RegisterOrUpdate(item *LeaseWithTime) {
        if old, ok := mq.m[item.id]; ok {
    old.time = item.time
    heap.Fix(&mq.queue, old.index)
  } else {
    heap.Push(&mq.queue, item)
    mq.m[item.id] = item
  }

server/lease/metrics.go里面提供了监控的能力,把租约相关指标提供给promrtheus来进行监控

代码语言:javascript复制
leaseGranted = prometheus.NewCounter(prometheus.CounterOpts{
    Namespace: "etcd_debugging",
    Subsystem: "lease",
    Name:      "granted_total",
    Help:      "The total number of granted leases.",
  })

以上就是etcd的lease目录下相关代码的核心实现。

1 人点赞