我们来看下lease目录,了解下租约是如何实现的。首先我们还是从server的初始化地方开始:server/etcdserver/server.go,调用了NewLessor来初始化租约管理器
代码语言:javascript复制func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
对应源码位于server/lease/lessor.go定义了租约管理相关接口和具体实现:
代码语言:javascript复制type TxnDelete interface {
DeleteRange(key, end []byte) (n, rev int64)
End()
}
代码语言:javascript复制type Lessor interface {
// SetRangeDeleter lets the lessor create TxnDeletes to the store.
// Lessor deletes the items in the revoked or expired lease by creating
// new TxnDeletes.
SetRangeDeleter(rd RangeDeleter)
SetCheckpointer(cp Checkpointer)
// Grant grants a lease that expires at least after TTL seconds.
Grant(id LeaseID, ttl int64) (*Lease, error)
// Revoke revokes a lease with given ID. The item attached to the
// given lease will be removed. If the ID does not exist, an error
// will be returned.
Revoke(id LeaseID) error
// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
// the expiry of leases to less than the full TTL when possible.
Checkpoint(id LeaseID, remainingTTL int64) error
// Attach attaches given leaseItem to the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Attach(id LeaseID, items []LeaseItem) error
// GetLease returns LeaseID for given item.
// If no lease found, NoLease value will be returned.
GetLease(item LeaseItem) LeaseID
// Detach detaches given leaseItem from the lease with given LeaseID.
// If the lease does not exist, an error will be returned.
Detach(id LeaseID, items []LeaseItem) error
// Promote promotes the lessor to be the primary lessor. Primary lessor manages
// the expiration and renew of leases.
// Newly promoted lessor renew the TTL of all lease to extend previous TTL.
Promote(extend time.Duration)
// Demote demotes the lessor from being the primary lessor.
Demote()
// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
// an error will be returned.
Renew(id LeaseID) (int64, error)
// Lookup gives the lease at a given lease id, if any
Lookup(id LeaseID) *Lease
// Leases lists all leases.
Leases() []*Lease
// ExpiredLeasesC returns a chan that is used to receive expired leases.
ExpiredLeasesC() <-chan []*Lease
// Recover recovers the lessor state from the given backend and RangeDeleter.
Recover(b backend.Backend, rd RangeDeleter)
// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
// times is undefined.
Stop()
}
代码语言:javascript复制type lessor struct {
mu sync.RWMutex
// demotec is set when the lessor is the primary.
// demotec will be closed if the lessor is demoted.
demotec chan struct{}
leaseMap map[LeaseID]*Lease
leaseExpiredNotifier *LeaseExpiredNotifier
leaseCheckpointHeap LeaseQueue
itemMap map[LeaseItem]LeaseID
// When a lease expires, the lessor will delete the
// leased range (or key) by the RangeDeleter.
rd RangeDeleter
// When a lease's deadline should be persisted to preserve the remaining TTL across leader
// elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
cp Checkpointer
// backend to persist leases. We only persist lease ID and expiry for now.
// The leased items can be recovered by iterating all the keys in kv.
b backend.Backend
// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
// requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64
expiredC chan []*Lease
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}
lg *zap.Logger
// Wait duration between lease checkpoints.
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
// whether lessor should always persist remaining TTL (always enabled in v3.6).
checkpointPersist bool
// cluster is used to adapt lessor logic based on cluster version
cluster cluster
}
代码语言:javascript复制type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool
}
在初始化租约管理器的时候会传入backend.Backend作为持久化存储器,对应具体实现就是bolt
代码语言:javascript复制func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
return newLessor(lg, b, cluster, cfg)
}
代码语言:javascript复制func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor
l := &lessor{
b: b,
l.initAndRecover()
go l.runLoop()
然后会起一个协程不断回收过期的租约,通知给channel
代码语言:javascript复制func (le *lessor) runLoop() {
for {
le.revokeExpiredLeases()
le.checkpointScheduledLeases()
代码语言:javascript复制func (le *lessor) revokeExpiredLeases() {
if le.isPrimary() {
ls = le.findExpiredLeases(revokeLimit)
}
if len(ls) != 0 {
select {
case <-le.stopC:
return
case le.expiredC <- ls:
代码语言:javascript复制func (le *lessor) checkpointScheduledLeases() {
if le.isPrimary() {
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
}
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
代码语言:javascript复制func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
return le.expiredC
}
当然这里也实现了核心授予租期的函数,通过persistTo 来持久化
代码语言:javascript复制func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
l := &Lease{
if _, ok := le.leaseMap[id]; ok {
if le.isPrimary() {
l.refresh(0)
} else {
l.forever()
}
le.leaseMap[id] = l
l.persistTo(le.b)
代码语言:javascript复制func (l *Lease) persistTo(b backend.Backend) {
回收租约
代码语言:javascript复制func (le *lessor) Revoke(id LeaseID) error {
delete(le.leaseMap, id)
txn := le.rd()
keys := l.Keys()
sort.StringSlice(keys).Sort()
for _, key := range keys {
txn.DeleteRange([]byte(key), nil)
}
刷新租约(续约)
代码语言:javascript复制func (le *lessor) Renew(id LeaseID) (int64, error) {
l := le.leaseMap[id]
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
代码语言:javascript复制func (le *lessor) Promote(extend time.Duration) {
文件里同时包含一个没有实现的FakeLessor
代码语言:javascript复制type FakeLessor struct{}
代码语言:javascript复制func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
代码语言:javascript复制type FakeTxnDelete struct {
backend.BatchTx
}
看完租约管理器,我们看下租约的具体实现server/lease/lease.go
代码语言:javascript复制type Lease struct {
ID LeaseID
ttl int64 // time to live of the lease in seconds
remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
// expiryMu protects concurrent accesses to expiry
expiryMu sync.RWMutex
// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
expiry time.Time
// mu protects concurrent accesses to itemSet
mu sync.RWMutex
itemSet map[LeaseItem]struct{}
revokec chan struct{}
}
它是通过persistTo,函数持久租约到backend存储的
代码语言:javascript复制func (l *Lease) persistTo(b backend.Backend) {
tx := b.BatchTx()
tx.LockInsideApply()
schema.MustUnsafePutLease(tx, &lpb)
租约本身是一个带ttl的k/v
代码语言:javascript复制type LeaseItem struct {
Key string
}
目录下实现了一个http协议的租约服务和对应的客户端操作函数:server/lease/leasehttp/http.go提供了俩路径
代码语言:javascript复制LeasePrefix = "/leases"
LeaseInternalPrefix = "/leases/internal"
代码语言:javascript复制type leaseHandler struct {
l lease.Lessor
waitch func() <-chan struct{}
}
分别对应了租约的刷新和查找
代码语言:javascript复制func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case LeasePrefix:
ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
case LeaseInternalPrefix:
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
代码语言:javascript复制func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(lreq))
代码语言:javascript复制func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(lreq))
server/lease/leasepb/lease.pb.go定义了租约结构的protoserver/lease/leasepb/lease.proto
代码语言:javascript复制message Lease {
int64 ID = 1;
int64 TTL = 2;
int64 RemainingTTL = 3;
}
代码语言:javascript复制message LeaseInternalRequest {
etcdserverpb.LeaseTimeToLiveRequest LeaseTimeToLiveRequest = 1;
}
代码语言:javascript复制message LeaseInternalResponse {
etcdserverpb.LeaseTimeToLiveResponse LeaseTimeToLiveResponse = 1;
}
server/lease/lease_queue.go里面实现了一个队列,用来维护租约的有序性
代码语言:javascript复制type LeaseWithTime struct {
id LeaseID
time time.Time
index int
}
代码语言:javascript复制type LeaseQueue []*LeaseWithTime
代码语言:javascript复制func (pq *LeaseQueue) Push(x interface{}) {
代码语言:javascript复制type LeaseExpiredNotifier struct {
m map[LeaseID]*LeaseWithTime
queue LeaseQueue
}
通过map加队列的形式实现了类似lru的能力
代码语言:javascript复制func (mq *LeaseExpiredNotifier) RegisterOrUpdate(item *LeaseWithTime) {
if old, ok := mq.m[item.id]; ok {
old.time = item.time
heap.Fix(&mq.queue, old.index)
} else {
heap.Push(&mq.queue, item)
mq.m[item.id] = item
}
server/lease/metrics.go里面提供了监控的能力,把租约相关指标提供给promrtheus来进行监控
代码语言:javascript复制leaseGranted = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd_debugging",
Subsystem: "lease",
Name: "granted_total",
Help: "The total number of granted leases.",
})
以上就是etcd的lease目录下相关代码的核心实现。