golang源码分析:etcd(5)

2023-08-09 15:16:02 浏览数 (1)

在分析完etcd的client如何使用后,我们看下etcd的client源码,etcd是通过rpc和server通信的,其中关于kv相关操作位于etcd/api的api/v3@v3.5.6/etcdserverpb/rpc.pb.go

代码语言:javascript复制
type KVClient interface {
  // Range gets the keys in the range from the key-value store.
  Range(ctx context.Context, in *RangeRequest, opts ...grpc.CallOption) (*RangeResponse, error)
  // Put puts the given key into the key-value store.
  // A put request increments the revision of the key-value store
  // and generates one event in the event history.
  Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error)
  // DeleteRange deletes the given range from the key-value store.
  // A delete request increments the revision of the key-value store
  // and generates a delete event in the event history for every deleted key.
  DeleteRange(ctx context.Context, in *DeleteRangeRequest, opts ...grpc.CallOption) (*DeleteRangeResponse, error)
  // Txn processes multiple requests in a single transaction.
  // A txn request increments the revision of the key-value store
  // and generates events with the same revision for every completed request.
  // It is not allowed to modify the same key several times within one txn.
  Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error)
  // Compact compacts the event history in the etcd key-value store. The key-value
  // store should be periodically compacted or the event history will continue to grow
  // indefinitely.
  Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
}

比如其中的Put接口

代码语言:javascript复制
func (c *kVClient) Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error) {
  out := new(PutResponse)
  err := c.cc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, opts...)
  if err != nil {
    return nil, err
  }
  return out, nil
}

etcd/client的config.go文件定义了配置相关结构体

代码语言:javascript复制
type Config struct {
  // Endpoints is a list of URLs.
  Endpoints []string `json:"endpoints"`


  // AutoSyncInterval is the interval to update endpoints with its latest members.
  // 0 disables auto-sync. By default auto-sync is disabled.
  AutoSyncInterval time.Duration `json:"auto-sync-interval"`


  // DialTimeout is the timeout for failing to establish a connection.
  DialTimeout time.Duration `json:"dial-timeout"`


  // DialKeepAliveTime is the time after which client pings the server to see if
  // transport is alive.
  DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`


  // DialKeepAliveTimeout is the time that the client waits for a response for the
  // keep-alive probe. If the response is not received in this time, the connection is closed.
  DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`


  // MaxCallSendMsgSize is the client-side request send limit in bytes.
  // If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).
  // Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit.
  // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
  MaxCallSendMsgSize int


  // MaxCallRecvMsgSize is the client-side response receive limit.
  // If 0, it defaults to "math.MaxInt32", because range response can
  // easily exceed request send limits.
  // Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
  // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
  MaxCallRecvMsgSize int


  // TLS holds the client secure credentials, if any.
  TLS *tls.Config


  // Username is a user name for authentication.
  Username string `json:"username"`


  // Password is a password for authentication.
  Password string `json:"password"`


  // RejectOldCluster when set will refuse to create a client against an outdated cluster.
  RejectOldCluster bool `json:"reject-old-cluster"`


  // DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
  // For example, pass "grpc.WithBlock()" to block until the underlying connection is up.
  // Without this, Dial returns immediately and connecting the server happens in background.
  DialOptions []grpc.DialOption


  // Context is the default client context; it can be used to cancel grpc dial out and
  // other operations that do not have an explicit context.
  Context context.Context


  // Logger sets client-side logger.
  // If nil, fallback to building LogConfig.
  Logger *zap.Logger


  // LogConfig configures client-side logger.
  // If nil, use the default logger.
  // TODO: configure gRPC logger
  LogConfig *zap.Config


  // PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
  PermitWithoutStream bool `json:"permit-without-stream"`


  // TODO: support custom balancer picker
}

在client.go中提供了创建一个client的函数:

代码语言:javascript复制
// New creates a new etcdv3 client from a given configuration.
func New(cfg Config) (*Client, error) {
  if len(cfg.Endpoints) == 0 {
    return nil, ErrNoAvailableEndpoints
  }


  return newClient(&cfg)
}

先创建client,然后通过client创建各种结构体,最后启动一个协程进行同步操作:

代码语言:javascript复制
func newClient(cfg *Config) (*Client, error) {
        client := &Client{
    conn:     nil,
    cfg:      *cfg,
    creds:    creds,
    ctx:      ctx,
    cancel:   cancel,
    mu:       new(sync.RWMutex),
    callOpts: defaultCallOpts,
    lgMu:     new(sync.RWMutex),
  }
client.resolver = resolver.New(cfg.Endpoints...)
conn, err := client.dialWithBalancer()
client.conn = conn


client.Cluster = NewCluster(client)
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)
err = client.getToken(ctx)
go client.autoSync()

其中client的定义如下:

代码语言:javascript复制
// Client provides and manages an etcd v3 client session.
type Client struct {
  Cluster
  KV
  Lease
  Watcher
  Auth
  Maintenance


  conn *grpc.ClientConn


  cfg      Config
  creds    grpccredentials.TransportCredentials
  resolver *resolver.EtcdManualResolver
  mu       *sync.RWMutex


  ctx    context.Context
  cancel context.CancelFunc


  // Username is a user name for authentication.
  Username string
  // Password is a password for authentication.
  Password        string
  authTokenBundle credentials.Bundle


  callOpts []grpc.CallOption


  lgMu *sync.RWMutex
  lg   *zap.Logger
}

cluster的初始化代码位于cluster.go

代码语言:javascript复制
func NewCluster(c *Client) Cluster {
  api := &cluster{remote: RetryClusterClient(c)}
  if c != nil {
    api.callOpts = c.callOpts
  }
  return api
}

它的接口提供了如何操作集群的一系列函数:

代码语言:javascript复制
type Cluster interface {
  // MemberList lists the current cluster membership.
  MemberList(ctx context.Context) (*MemberListResponse, error)


  // MemberAdd adds a new member into the cluster.
  MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)


  // MemberAddAsLearner adds a new learner member into the cluster.
  MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)


  // MemberRemove removes an existing member from the cluster.
  MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)


  // MemberUpdate updates the peer addresses of the member.
  MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)


  // MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
  MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error)
}

而kv.go里面定义了如何初始化kv,以及kv的接口

代码语言:javascript复制
func NewKV(c *Client) KV {
  api := &kv{remote: RetryKVClient(c)}
  if c != nil {
    api.callOpts = c.callOpts
  }
  return api
}
代码语言:javascript复制
type KV interface {
  // Put puts a key-value pair into etcd.
  // Note that key,value can be plain bytes array and string is
  // an immutable representation of that bytes array.
  // To get a string of bytes, do string([]byte{0x10, 0x20}).
  Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)


  // Get retrieves keys.
  // By default, Get will return the value for "key", if any.
  // When passed WithRange(end), Get will return the keys in the range [key, end).
  // When passed WithFromKey(), Get returns keys greater than or equal to key.
  // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
  // if the required revision is compacted, the request will fail with ErrCompacted .
  // When passed WithLimit(limit), the number of returned keys is bounded by limit.
  // When passed WithSort(), the keys will be sorted.
  Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)


  // Delete deletes a key, or optionally using WithRange(end), [key, end).
  Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)


  // Compact compacts etcd KV history before the given rev.
  Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)


  // Do applies a single Op on KV without a transaction.
  // Do is useful when creating arbitrary operations to be issued at a
  // later time; the user can range over the operations, calling Do to
  // execute them. Get/Put/Delete, on the other hand, are best suited
  // for when the operation should be issued at the time of declaration.
  Do(ctx context.Context, op Op) (OpResponse, error)


  // Txn creates a transaction.
  Txn(ctx context.Context) Txn
}

具体实现层面,kv包装了一个client,具体操作就是调用rpc的Do方法

代码语言:javascript复制
    type kv struct {
  remote   pb.KVClient
  callOpts []grpc.CallOption
}      
代码语言:javascript复制
func NewKV(c *Client) KV {
  api := &kv{remote: RetryKVClient(c)}  
代码语言:javascript复制
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
  r, err := kv.Do(ctx, OpPut(key, val, opts...))
  return r.put, toErr(ctx, err)
}

op.go中定义了Op结构体

代码语言:javascript复制
// Op represents an Operation that kv can execute.
type Op struct {
  t   opType
  key []byte
  end []byte


  // for range
  limit        int64
  sort         *SortOption
  serializable bool
  keysOnly     bool
  countOnly    bool
  minModRev    int64
  maxModRev    int64
  minCreateRev int64
  maxCreateRev int64


  // for range, watch
  rev int64


  // for watch, put, delete
  prevKV bool


  // for watch
  // fragmentation should be disabled by default
  // if true, split watch events when total exceeds
  // "--max-request-bytes" flag value   512-byte
  fragment bool


  // for put
  ignoreValue bool
  ignoreLease bool


  // progressNotify is for progress updates.
  progressNotify bool
  // createdNotify is for created event
  createdNotify bool
  // filters for watchers
  filterPut    bool
  filterDelete bool


  // for put
  val     []byte
  leaseID LeaseID


  // txn
  cmps    []Cmp
  thenOps []Op
  elseOps []Op


  isOptsWithFromKey bool
  isOptsWithPrefix  bool
}

其中操作类型是一个枚举值

代码语言:javascript复制
type opType int

包括range,put,delete和事务

代码语言:javascript复制
const (
  // A default Op has opType 0, which is invalid.
  tRange opType = iota   1
  tPut
  tDeleteRange
  tTxn
)

调用Do的时候根据不同操作类型发起不同请求:

代码语言:javascript复制
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
   case tPut:
    var resp *pb.PutResponse
    r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
    resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
    if err == nil {
      return OpResponse{put: (*PutResponse)(resp)}, nil
    }

retry.go定义了如何重试

代码语言:javascript复制
func RetryKVClient(c *Client) pb.KVClient {
  return &retryKVClient{
    kc: pb.NewKVClient(c.conn),
  }

}

代码语言:javascript复制
type retryKVClient struct {
  kc pb.KVClient
}

lease.go里面封装了如何进行租约的操作:

代码语言:javascript复制
func NewLease(c *Client) Lease {
  return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout time.Second)
}  
代码语言:javascript复制
func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {

租约的核心接口包括赋予租约,续约,回收等

代码语言:javascript复制
type Lease interface {
  // Grant creates a new lease.
  Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)


  // Revoke revokes the given lease.
  Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)


  // TimeToLive retrieves the lease information of the given lease ID.
  TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)


  // Leases retrieves all leases.
  Leases(ctx context.Context) (*LeaseLeasesResponse, error)


  // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
  // to the channel are not consumed promptly the channel may become full. When full, the lease
  // client will continue sending keep alive requests to the etcd server, but will drop responses
  // until there is capacity on the channel to send more responses.
  //
  // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
  // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
  // containing the error reason.
  //
  // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
  // alive stream is interrupted in some way the client cannot handle itself;
  // given context "ctx" is canceled or timed out.
  //
  // TODO(v4.0): post errors to last keep alive message before closing
  // (see https://github.com/etcd-io/etcd/pull/7866)
  KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)


  // KeepAliveOnce renews the lease once. The response corresponds to the
  // first message from calling KeepAlive. If the response has a recoverable
  // error, KeepAliveOnce will retry the RPC with a new keep alive message.
  //
  // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
  KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)


  // Close releases all resources Lease keeps for efficient communication
  // with the etcd server.
  Close() error
}
代码语言:javascript复制
type lessor struct {
  mu sync.Mutex // guards all fields


  // donec is closed and loopErr is set when recvKeepAliveLoop stops
  donec   chan struct{}
  loopErr error


  remote pb.LeaseClient


  stream       pb.Lease_LeaseKeepAliveClient
  streamCancel context.CancelFunc


  stopCtx    context.Context
  stopCancel context.CancelFunc


  keepAlives map[LeaseID]*keepAlive


  // firstKeepAliveTimeout is the timeout for the first keepalive request
  // before the actual TTL is known to the lease client
  firstKeepAliveTimeout time.Duration


  // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
  firstKeepAliveOnce sync.Once


  callOpts []grpc.CallOption


  lg *zap.Logger
}
代码语言:javascript复制
// keepAlive multiplexes a keepalive for a lease over multiple channels
type keepAlive struct {
  chs  []chan<- *LeaseKeepAliveResponse
  ctxs []context.Context
  // deadline is the time the keep alive channels close if no response
  deadline time.Time
  // nextKeepAlive is when to send the next keep alive message
  nextKeepAlive time.Time
  // donec is closed on lease revoke, expiration, or cancel.
  donec chan struct{}
}

watch.go定义了watch机制相关操作

代码语言:javascript复制
func NewWatcher(c *Client) Watcher {
  return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
}  
代码语言:javascript复制
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
          w := &watcher{
    remote:  wc,
    streams: make(map[string]*watchGrpcStream),
  }

里面包含了一个string到grpc stream的映射,watch机制是通过grpc stream实现的。

代码语言:javascript复制
type Watcher interface {
  // Watch watches on a key or prefix. The watched events will be returned
  // through the returned channel. If revisions waiting to be sent over the
  // watch are compacted, then the watch will be canceled by the server, the
  // client will post a compacted error watch response, and the channel will close.
  // If the requested revision is 0 or unspecified, the returned channel will
  // return watch events that happen after the server receives the watch request.
  // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
  // and "WatchResponse" from this closed channel has zero events and nil "Err()".
  // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
  // to release the associated resources.
  //
  // If the context is "context.Background/TODO", returned "WatchChan" will
  // not be closed and block until event is triggered, except when server
  // returns a non-recoverable error (e.g. ErrCompacted).
  // For example, when context passed with "WithRequireLeader" and the
  // connected server has no leader (e.g. due to network partition),
  // error "etcdserver: no leader" (ErrNoLeader) will be returned,
  // and then "WatchChan" is closed with non-nil "Err()".
  // In order to prevent a watch stream being stuck in a partitioned node,
  // make sure to wrap context with "WithRequireLeader".
  //
  // Otherwise, as long as the context has not been canceled or timed out,
  // watch will retry on other recoverable errors forever until reconnected.
  //
  // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
  // Currently, client contexts are overwritten with "valCtx" that never closes.
  // TODO(v3.4): configure watch retry policy, limit maximum retry number
  // (see https://github.com/etcd-io/etcd/issues/8980)
  Watch(ctx context.Context, key string, opts ...OpOption) WatchChan


  // RequestProgress requests a progress notify response be sent in all watch channels.
  RequestProgress(ctx context.Context) error


  // Close closes the watcher and cancels all watch requests.
  Close() error
}  
代码语言:javascript复制
type watcher struct {
  remote   pb.WatchClient
  callOpts []grpc.CallOption


  // mu protects the grpc streams map
  mu sync.Mutex


  // streams holds all the active grpc streams keyed by ctx value.
  streams map[string]*watchGrpcStream
  lg      *zap.Logger
}

auth.go定义了认证机制:

代码语言:javascript复制
func NewAuth(c *Client) Auth {
  api := &authClient{remote: RetryAuthClient(c)}
  if c != nil {
    api.callOpts = c.callOpts
  }
  return api
}

maintenance.go类似

代码语言:javascript复制
func NewMaintenance(c *Client) Maintenance {
  api := &maintenance{

总结下,raftclient 相关代码都是在grpc接口基础上的一系列封装,提供的接口简洁,功能单一、含义明确,整体的接口设计思路值得我们学习和模仿,虽然这是经过多个版本迭代的结果。

0 人点赞