golang源码分析:sarama kafka client(part III:client的角色)

2022-08-02 19:28:16 浏览数 (1)

理解client的角色对我们理解kafka和sarama非常有帮助。下面将一一详细介绍:

我们用到了各种各样的client,返回的对象都是一个Broker的指针,本质上讲,我们通过kafka client 最终都是和broker通信,所以用Broker对象封装和kafka的连接,表示Client。不同场景下,Client有不同的角色,角色是通过元数据来确定的。

代码语言:javascript复制
func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) 
  metadata := client.cachedMetadata(topic, partitionID)
  err := client.RefreshMetadata(topic)
代码语言:javascript复制
func (client *client) RefreshMetadata(topics ...string) error
  return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
代码语言:javascript复制
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error 
    broker := client.any()
    response, err := broker.GetMetadata(req)
    err := b.sendAndReceive(request, response)
    shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
  client.deregisterBroker(broker)
   _ = broker.Close()

可以看到,获取元数据的过程是随机选择一个broker(没有元数据不知道身份),然后获取元数据,存储下来。

1,Controller

首先是Controller,Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务

代码语言:javascript复制
func (client *client) Controller() (*Broker, error){
       client.refreshMetadata()
       controller := client.cachedController()
       _ = controller.Open(client.conf)
}
代码语言:javascript复制
func (client *client) cachedController() *Broker {
    return client.brokers[client.controllerID]
 }

通过元数据,获取controllerId,然后通过controllerID找到Controller,和前面介绍的一样,topic和partation相关的增删操作会用到controller,主要在admin.go中

代码语言:javascript复制
func NewClusterAdminFromClient(client Client) (ClusterAdmin, error)
    _, err := client.Controller()
func (ca *clusterAdmin) Controller() (*Broker, error)
    return ca.client.Controller()
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
    b, err := ca.Controller()
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
    controller, err := ca.Controller()
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
    b, err := ca.Controller()
func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error
    b, err := ca.Controller()

2,Coordinator

每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。

代码语言:javascript复制
func (client *client) Coordinator(consumerGroup string) (*Broker, error) 
  _ = coordinator.Open(client.conf)

获取元数据后,就会获取Coordinator

代码语言:javascript复制
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) 
    broker := client.any();
    request := new(FindCoordinatorRequest)
    request.CoordinatorKey = consumerGroup
    request.CoordinatorType = CoordinatorGroup
  response, err := broker.FindCoordinator(request)

调整消费组的地方会用到,admin.go

代码语言:javascript复制
func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error)
    controller, err := ca.client.Coordinator(group)
    response, err := broker.DescribeGroups(&DescribeGroupsRequest{
      Groups: brokerGroups,
    })
func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
    return coordinator.FetchOffset(request)
func (ca *clusterAdmin) DeleteConsumerGroup(group string) error
    coordinator, err := ca.client.Coordinator(group)
    resp, err := coordinator.DeleteGroups(request)

创建session的时候也会用到consumer_group.go

代码语言:javascript复制
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
  coordinator, err := c.client.Coordinator(c.groupID)
  return c.newSession(ctx, topics, handler, retries)
}

离开也会用到

代码语言:javascript复制
func (c *consumerGroup) leave() error {
  c.lock.Lock()
  defer c.lock.Unlock()
  if c.memberID == "" {
    return nil
  }

  coordinator, err := c.client.Coordinator(c.groupID)

心跳维持

代码语言:javascript复制
func (s *consumerGroupSession) heartbeatLoop() {
  for {
    coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
    }
}

3,any 任意broker

代码语言:javascript复制
func (client *client) any() *Broker
  _ = client.seedBrokers[0].Open(client.conf)
  _ = broker.Open(client.conf)

4,Leader

写相关的操作都是先写到leader partation,然后同步到副本分区。

代码语言:javascript复制
func (client *client) Leader(topic string, partitionID int32) (*Broker, error)
    leader, err := client.cachedLeader(topic, partitionID)
代码语言:javascript复制
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error)
  _ = b.Open(client.conf)

使用的地方如下:

admin.go

代码语言:javascript复制
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error  

async_producer.go

代码语言:javascript复制
func (pp *partitionProducer) dispatch() 
func (pp *partitionProducer) updateLeader() error
func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) 

consumer.go

代码语言:javascript复制
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) 
    child.broker = c.refBrokerConsumer(leader)
      bc = c.newBrokerConsumer(broker)
func (child *partitionConsumer) preferredBroker() (*Broker, error)  

上面就是client的四个角色和应用场景,理解他们的含义对理解kafka和sarama有重要的意义。

0 人点赞