golang源码分析:sarama kafka client(part II:消费者)

2022-08-02 19:27:58 浏览数 (1)

这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始:

代码语言:javascript复制
package main

import (
  "fmt"
  "log"
  "sync"

  "github.com/Shopify/sarama"
)

// 消费者练习

func main() {
  // 生成消费者 实例
  consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
  if err != nil {
    log.Print(err)
    return
  }
  // 拿到 对应主题下所有分区
  partitionList, err := consumer.Partitions("test")
  if err != nil {
    log.Println(err)
    return
  }

  var wg sync.WaitGroup
  wg.Add(1)
  // 遍历所有分区
  for partition := range partitionList {
    //消费者 消费 对应主题的 具体 分区 指定 主题 分区 offset  return 对应分区的对象
    pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
    if err != nil {
      log.Println(err)
      return
    }

    // 运行完毕记得关闭
    defer pc.AsyncClose()

    // 去出对应的 消息
    // 通过异步 拿到 消息
    go func(sarama.PartitionConsumer) {
      defer wg.Done()
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
    }(pc)
  }
  wg.Wait()
}

分三个部分:

1,sarama.NewConsumer ,创建一个consumer

2,consumer.ConsumePartition 从指定topic,指定分区消费消息

3, msg := range pc.Messages() 获取消息

如果不需要拿到所有的分区,也可以只指定comsumer group

代码语言:javascript复制
package main

import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "sync"

  "github.com/Shopify/sarama"
)

type consumerGroupHandler struct {
  name string
}

func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  for msg := range claim.Messages() {
    fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%sn", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
    // 手动确认消息
    sess.MarkMessage(msg, "")
  }
  return nil
}

func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
  wg.Done()
  for err := range (*group).Errors() {
    fmt.Println("ERROR", err)
  }
}

func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
  fmt.Println(name   "start")
  wg.Done()
  ctx := context.Background()
  for {
    topics := []string{"test"}
    handler := consumerGroupHandler{name: name}
    err := (*group).Consume(ctx, topics, handler)
    fmt.Println("consume group end")
    if err != nil {
      panic(err)
    }
  }
}

func main() {
  var wg sync.WaitGroup
  config := sarama.NewConfig()
  config.Consumer.Return.Errors = false
  config.Version = sarama.V0_10_2_0
  client, err := sarama.NewClient([]string{"localhost:9092"}, config)
  defer client.Close()
  if err != nil {
    panic(err)
  }
  group1, err := sarama.NewConsumerGroupFromClient("c1", client)
  if err != nil {
    panic(err)
  }
  group2, err := sarama.NewConsumerGroupFromClient("c2", client)
  if err != nil {
    panic(err)
  }
  group3, err := sarama.NewConsumerGroupFromClient("c3", client)
  if err != nil {
    panic(err)
  }
  defer group1.Close()
  defer group2.Close()
  defer group3.Close()
  wg.Add(3)
  go consume(&group1, &wg, "c1")
  go consume(&group2, &wg, "c2")
  go consume(&group3, &wg, "c3")
  wg.Wait()
  signals := make(chan os.Signal, 1)
  signal.Notify(signals, os.Interrupt)
  select {
  case <-signals:
  }
}

我们从NewConsumerGroup作为入口开始源码分析:

consumer_group.go

代码语言:javascript复制
func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
    client, err := NewClient(addrs, config)
    c, err := newConsumerGroup(groupID, client)
}

先创建一个client,然后生成一个consumerGroup 对象:

代码语言:javascript复制
type ConsumerGroup interface {
  Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  // Errors returns a read channel of errors that occurred during the consumer life-cycle.
  // By default, errors are logged and not returned over this channel.
  // If you want to implement any custom error handling, set your config's
  // Consumer.Return.Errors setting to true, and read from this channel.
  Errors() <-chan error
  // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
  // this function before the object passes out of scope, as it will otherwise leak memory.
  Close() error
}
代码语言:javascript复制
type consumerGroup struct {
  client Client

  config   *Config
  consumer Consumer
  groupID  string
  memberID string
  errors   chan error

  lock      sync.Mutex
  closed    chan none
  closeOnce sync.Once

  userData []byte
}
代码语言:javascript复制
func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
  consumer, err := NewConsumerFromClient(client) 
}

创建consumerGroup的同时会创建consumer对象:

consumer.go

代码语言:javascript复制
func NewConsumerFromClient(client Client) (Consumer, error) {
  cli := &nopCloserClient{client}
  return newConsumer(cli)
}
代码语言:javascript复制
func newConsumer(client Client) (Consumer, error) {
}
代码语言:javascript复制
type consumer struct {
  conf            *Config
  children        map[string]map[int32]*partitionConsumer
  brokerConsumers map[*Broker]*brokerConsumer
  client          Client
  lock            sync.Mutex
}

创建完ConsumerGroup后我们就开始消费了,对应的接口是Consume

代码语言:javascript复制
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
   c.client.RefreshMetadata(topics...)//加载元数据
   sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
   go c.loopCheckPartitionNumbers(topics, sess)
}

RefreshMetadata用于获取对应元数据信息,代码在client.go

代码语言:javascript复制
func (client *client) RefreshMetadata(topics ...string) error {
  return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
代码语言:javascript复制
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
    broker = client.any()
    req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
    response, err := broker.GetMetadata(req)
    shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
}

每个 partition 与 consumer 的分配关系称作一个 “claim”;一组 ConsumerGroupClain 这一轮的生命周期称作一个 session。session 的退出发生在 ctx 退出,或者 partition rebalance。session 要求客户端与 coordinator 保持一定的心跳,原版 kafka 客户端为此有一条 session.timeout.ms 的配置,客户端需要在时间范围内对 coordinator 发送心跳,不然将视为该客户端退出而出发 Rebalance。

代码语言:javascript复制
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
    coordinator, err := c.client.Coordinator(c.groupID)
    join, err := c.joinGroupRequest(coordinator, topics)
    groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
  return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
代码语言:javascript复制
func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
    offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
    go sess.heartbeatLoop() 
      // start consuming
    for topic, partitions := range claims {
      for _, partition := range partitions {
        sess.waitGroup.Add(1)
        go func(topic string, partition int32) {
        sess.consume(topic, partition)
      }(topic, partition)
    }
  }
}
代码语言:javascript复制
type consumerGroupSession struct {
  parent       *consumerGroup
  memberID     string
  generationID int32
  handler      ConsumerGroupHandler

  claims  map[string][]int32
  offsets *offsetManager
  ctx     context.Context
  cancel  func()

  waitGroup       sync.WaitGroup
  releaseOnce     sync.Once
  hbDying, hbDead chan none
}

调用了 sess.consume(topic, partition) 这个接口:

代码语言:javascript复制
func (s *consumerGroupSession) consume(topic string, partition int32) {
  // create new claim
  claim, err := newConsumerGroupClaim(s, topic, partition, offset)
   s.handler.ConsumeClaim(s, claim)
}
代码语言:javascript复制
func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
  pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
}
代码语言:javascript复制
type consumerGroupClaim struct {
  topic     string
  partition int32
  offset    int64
  PartitionConsumer
}

调用了ConsumePartition消费对应的partition

consumer.go

代码语言:javascript复制
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
    child := &partitionConsumer
    if err := child.chooseStartingOffset(offset); err != nil
      if leader, err = c.client.Leader(child.topic, child.partition); err != nil
      if err := c.addChild(child); err != nil
        c.children[child.topic] = topicChildren
        topicChildren[child.partition] = child
      go withRecover(child.dispatcher)
      go withRecover(child.responseFeeder)
      child.broker = c.refBrokerConsumer(leader)
        bc := c.brokerConsumers[broker]
        bc.refs  
      child.broker.input <- child
}

创建了一个partitionConsumer对象:

代码语言:javascript复制
type partitionConsumer struct {
  highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG

  consumer *consumer
  conf     *Config
  broker   *brokerConsumer
  messages chan *ConsumerMessage
  errors   chan *ConsumerError
  feeder   chan *FetchResponse

  preferredReadReplica int32

  trigger, dying chan none
  closeOnce      sync.Once
  topic          string
  partition      int32
  responseResult error
  fetchSize      int32
  offset         int64
  retries        int32
}

同时起了两个协程,这两个协程是核心

1,先看dispatcher,主要是维护订阅者信息

代码语言:javascript复制
    func (child *partitionConsumer) dispatcher() 
      for range child.trigger
        if err := child.dispatch(); err != nil {
        child.consumer.unrefBrokerConsumer(child.broker)
        child.consumer.removeChild(child)
        close(child.feeder

看下dispatcher协程里的dispatch方法

代码语言:javascript复制
 func (child *partitionConsumer) dispatch() error 
      if err := child.consumer.client.RefreshMetadata(child.topic); err != nil 
      broker, err := child.preferredBroker()
      child.broker = child.consumer.refBrokerConsumer(broker)
      child.broker.input <- child

先获得一个brokerConsumer 对象:

代码语言:javascript复制
type brokerConsumer struct {
  consumer         *consumer
  broker           *Broker
  input            chan *partitionConsumer
  newSubscriptions chan []*partitionConsumer
  subscriptions    map[*partitionConsumer]none
  wait             chan none
  acks             sync.WaitGroup
  refs             int
}
代码语言:javascript复制
func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
   bc = c.newBrokerConsumer(broker)
}
代码语言:javascript复制
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer 
      go withRecover(bc.subscriptionManager)
      go withRecover(bc.subscriptionConsumer)

起了两个协程:

代码语言:javascript复制
func (bc *brokerConsumer) subscriptionManager(){
      case event, ok := <-bc.input:
         buffer = append(buffer, event)
      case bc.newSubscriptions <- buffer
         buffer = nil
 }

input里面有新的订阅请求,会appende到newSubscriptions 里面,不是带缓冲的channel,是一个chnel,里面是个slice

代码语言:javascript复制
func (bc *brokerConsumer) subscriptionConsumer() 
      for newSubscriptions := range bc.newSubscriptions {
          bc.updateSubscriptions(newSubscriptions)
          response, err := bc.fetchNewMessages()
          bc.acks.Add(len(bc.subscriptions))
          child.feeder <- response
          bc.acks.Wait()
          bc.handleResponses()

每次收到消费者变换的消息后,都会调用fetchNewMessages,然后放到feeder里面

代码语言:javascript复制
func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
     for child := range bc.subscriptions {
        request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
     }
     return bc.broker.Fetch(request)
}

Fetch就是请求broker,获取消息

代码语言:javascript复制
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  response := new(FetchResponse)
  err := b.sendAndReceive(request, response)
}

2,接着看下responseFeeder协程

代码语言:javascript复制
 func (child *partitionConsumer) responseFeeder() {
   feederLoop:
      从broker获取消息的大循环
      for response := range child.feeder
           for i, msg := range msgs {
               case child.messages <- msg:
                   child.broker.input <- child
                   continue feederLoop
 }

这是整个consumer的消息大循环,不断从feeder里面消费消息,放到messages里面,处理完毕以后将自己放回broker的input里面。

subscriptionManager会从input里面把它取出来,然后取kafka拉取消息,完成了完整的消息循环

最后看下Messages接口

代码语言:javascript复制
func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
  return child.messages
}

很简单,就是把处理好的消息从messages这个chanel里面取出来。

总结下:

partitonConsumer 会启动 dispatcher 和 responseFeeder 两个 goroutine:

1,dispatcher goroutine 用于跟踪 broker 的变化,偏元信息性质的控制侧,dispatcher 这个 goroutine 用于发现 broker 的变化。它会侦听 dispatcher.trigger 这个 channel 的通知,来发现 Partition 的 Leader 变化。而 trigger 这个 channel 的更新来自 brokerConsumer 对象。

最后 child.broker.input<- child 这一句,相当于使 partitionConsumer 加入 brokerConsumer 的订阅。

2, responseFeeder 用于跟踪消息的到来,偏数据侧。

child.feed 这个 channel 也是来自 brokerConsumer。大约是处理来自 brokerConsumer 的消息,转发给 messages chan。

值得留意有一个配置项目 child.conf.Consumer.MaxProcessingTime,默认值为 100ms,看注释它的意思是如果朝 messages chan 写入超过 100ms 仍未成功,则停止再向 Broker 发送 fetch 请求。

0 人点赞