golang源码分析:sarama kafka client(part IV:reblance)

2022-08-03 13:49:44 浏览数 (1)

代码语言:javascript复制
Sarama Go客户端存在以下已知问题:
当Topic新增分区时,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。
当Sarama Go客户端同时订阅两个以上的Topic时,有可能会导致部分分区无法正常消费消息。
当Sarama Go客户端的消费位点重置策略设置为Oldest(earliest)时,如果客户端宕机或服务端版本升级,由于Sarama Go客户端自行实现OutOfRange机制,有可能会导致客户端从最小位点开始重新消费所有消息。
解决方案
建议尽早将Sarama Go客户端替换为Confluent Go客户端。

看到这句话我就犯嘀咕了,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。作为一个star 7.7k的项目,竟然有这么严重的缺陷,我广大gopher的容忍度也是可以的,真是这样吗?

看源码之前,我们先看下kafka的reblance协议的发展历程:

1, Zookeeper 的 Watcher 实现

在0.9版本之前,在 Kafka 最初始的解决方案中,是依赖 Zookeeper 的 Watcher 实现的。该方案中,每个 Consumer Group 在 Zookeeper 下都维护了一个对应的 /consumers/{group_id}/ids 路径,该路径下使用临时节点记录该 Consumer Group 中的 Consumer Id,这个 Consumer Id 临时节点在 Consumer 启动时创建。另外,kafka 还会创建 owners 和 offsets 两个节点,这两个节点与 ids 节点同级,其中 owners 记录了 consumer 与 partition 的分配关系;offsets 节点用来记录了对应 Consumer Group 在相应 partition 上的消费位置。Group内每个Consumer通过在ZK内抢注节点来决定消费哪些Partition,并注册对Group和Broker相关节点的监听,以获知消费环境的变化(其他Consumer掉线、Broker宕机等),进而触发Rebalance;

问题:

  • 羊群效应(herd effect)——一个被监听的ZK节点发生变化,导致大量的通知发送给所有监听者(即Consumer);
  • 脑裂(split brain)——ZK只保证最终一致性,不同的Consumer在同一时刻可能看到不同的Group和Broker状态,造成Rebalance混乱。

2, Eager Rebalance Protocol

Kafka 在后续版本对 Rebalance 方案进行了改进(也就是 Eager Rebalance Protocol),改进方案的核心设计思想是:将全部的 consumer group 分成多个子集,每个 consumer group 集合在 broker 对应一个 GroupCoordinator,由 GroupCoordinator 管理对应 consumer groups 的 rebalance(每个 broker 都拥有成为 GroupCoordinator 的能力)。

动时,都会顺带启动一个Group Coordinator实例。每个Consumer Group在初始化时,都会分配给一个Group Coordinator实例来管理消费关系和Offset

Group Coordinator提交Offset时也不再是向ZK写,而是写入那个广为人知的特殊Topic——__consumer_offsets里。key是group-topic-partition格式的,value为Offset值。

那么该如何确定一个Consumer Group被分配给哪个Group Coordinator呢?Kafka根据groupId.hashCode() % offsets.topic.num.partitions取绝对值来得出该Group的Offset信息写入__consumer_offsets的分区号,并将Group分配给该分区Leader所在的Broker上的那个Group Coordinator。

整个Rebalance分为两个大步骤:JOIN和SYNC。

JOIN:所有Consumer都会向Coordinator发送join-group,请求重新加入Group(那些原本已经在Group内的也不例外),同时放弃掉已分配给自己的Partition。

SYNC:这一步需要做的事情是:Coordinator在所有Consumer里选择一个担任Leader,并由Leader调用Partition分配规则来确定消费对应关系。各个Consumer发送sync-group请求。Leader发送的请求里包含有已经确定的消费分配信息,其他Consumer的请求为空。Coordinator将消费分配信息原样封装在sync-group响应中,并投递给各个Consumer,最终使Group内所有成员都获知自己该消费的Partition。

问题:

stop-the-world问题——需要收回(revoke)所有Partition再重新分配(reassign),在此时间内,所有Consumer都无法进行消费。如果Rebalance时间长,会造成lag。

back-and-forth问题——如果多次触发Rebalance,很有可能造成一个Consumer消费的Partition被分配给其他Consumer,然后又分配回来,做了无用功。

3,Incremental(增量)Rebalance

在 kafka 2.4 版本中,为了进一步减少 rebalance 带来的 Stop The World,提出了 Incremental Cooperative Rebalance 协议。其核心思想就是使用将一次全局的 rebalance,改成多次小规模 rebalance,最终收敛到 rebalance 的状态。

Incremental Cooperative Rebalance 协议,该协议最核心的思想就是:

consumer 比较新旧两个 partition 分配结果,只停止消费回收(revoke)的 partition,对于两次都分配给自己的 partition,consumer 根本没有必要停止消费,这也就解决了 Stop The World 的问题。通过多轮的局部 rebalance 来最终实现全局的 rebalance 。

源码实现:

从官方的例子开始,源码位置examples/consumergroup/main.go

代码语言:javascript复制
    for {
      // `Consume` should be called inside an infinite loop, when a
      // server-side rebalance happens, the consumer session will need to be
      // recreated to get the new claims
      if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
        log.Panicf("Error from consumer: %v", err)
      }
      // check if context was cancelled, signaling that the consumer should stop
      if ctx.Err() != nil {
        return
      }
      consumer.ready = make(chan bool)
    }

看下注释:Consume函数应该在一个死循环里调用,如果发生了服务端reblance,客户端需要重建session。 接着看下Consume函数的内部实现consumer_group.go

代码语言:javascript复制
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
  // Refresh metadata for requested topics
  if err := c.client.RefreshMetadata(topics...); err != nil {
    return err
  }

  // Init session
  sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
    // loop check topic partition numbers changed
  // will trigger rebalance when any topic partitions number had changed
  // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
  go c.loopCheckPartitionNumbers(topics, sess)

  // Wait for session exit signal
  <-sess.ctx.Done() 

1,获取元数据

元数据的同步可以参考我前面的分享golang源码分析:sarama kafka client(part III:client的角色)

2,创建新的session

代码语言:javascript复制
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
    coordinator, err := c.client.Coordinator(c.groupID)
      join, err := c.joinGroupRequest(coordinator, topics)
    }
  // Prepare distribution plan if we joined as the leader
  var plan BalanceStrategyPlan
  if join.LeaderId == join.MemberId {
    members, err := join.GetMembers()
    plan, err = c.balance(members)
  }
  // Sync consumer group
  groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
  members, err := groupRequest.GetMemberAssignment()
   return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}

先获取协调者,然后join group,如果自己是leader,计算分区分配计划,然后,sync 给其他consumer

代码语言:javascript复制
func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
  strategy := c.config.Consumer.Group.Rebalance.Strategy
  return strategy.Plan(members, topics)
}

根据配置的分区算法进行分区,Partition的分配有内置的三种算法实现(range、round-robin、sticky)

RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个topic,RangeAssignor策略会将消费组内所有订阅这个topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

① 假设n=一个topic的分区数 / 订阅此topic消费者数量,

② m=分区数%消费者数量,

③ 那么前m个消费者每个分配n 1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询消费者方式逐个将分区分配给每个消费者。

StickyAssignor策略,“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

① 分区的分配要尽可能的均匀;

② 分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。

3,起一个检查分区数的协程

代码语言:javascript复制
func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
    oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics)
      for {
         newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics)
           for topic, num := range oldTopicToPartitionNum {
              if newTopicToPartitionNum[topic] != num {
                return // trigger the end of the session on exit
              }
          }
     }

4,等待协程的退出

总结:可以看到,只有在分区数发生变化的情况下,上述协程才会退出,走到外面的大循环,进行reblance,如果某个客户端掉线,只要分区数不变,是不会reblance的,果然坑爹。

0 人点赞