这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始:
代码语言:javascript复制package main
import (
"fmt"
"log"
"sync"
"github.com/Shopify/sarama"
)
// 消费者练习
func main() {
// 生成消费者 实例
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Print(err)
return
}
// 拿到 对应主题下所有分区
partitionList, err := consumer.Partitions("test")
if err != nil {
log.Println(err)
return
}
var wg sync.WaitGroup
wg.Add(1)
// 遍历所有分区
for partition := range partitionList {
//消费者 消费 对应主题的 具体 分区 指定 主题 分区 offset return 对应分区的对象
pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
if err != nil {
log.Println(err)
return
}
// 运行完毕记得关闭
defer pc.AsyncClose()
// 去出对应的 消息
// 通过异步 拿到 消息
go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
wg.Wait()
}
分三个部分:
1,sarama.NewConsumer ,创建一个consumer
2,consumer.ConsumePartition 从指定topic,指定分区消费消息
3, msg := range pc.Messages() 获取消息
如果不需要拿到所有的分区,也可以只指定comsumer group
代码语言:javascript复制package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"github.com/Shopify/sarama"
)
type consumerGroupHandler struct {
name string
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%sn", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 手动确认消息
sess.MarkMessage(msg, "")
}
return nil
}
func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
wg.Done()
for err := range (*group).Errors() {
fmt.Println("ERROR", err)
}
}
func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
fmt.Println(name "start")
wg.Done()
ctx := context.Background()
for {
topics := []string{"test"}
handler := consumerGroupHandler{name: name}
err := (*group).Consume(ctx, topics, handler)
fmt.Println("consume group end")
if err != nil {
panic(err)
}
}
}
func main() {
var wg sync.WaitGroup
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V0_10_2_0
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
defer client.Close()
if err != nil {
panic(err)
}
group1, err := sarama.NewConsumerGroupFromClient("c1", client)
if err != nil {
panic(err)
}
group2, err := sarama.NewConsumerGroupFromClient("c2", client)
if err != nil {
panic(err)
}
group3, err := sarama.NewConsumerGroupFromClient("c3", client)
if err != nil {
panic(err)
}
defer group1.Close()
defer group2.Close()
defer group3.Close()
wg.Add(3)
go consume(&group1, &wg, "c1")
go consume(&group2, &wg, "c2")
go consume(&group3, &wg, "c3")
wg.Wait()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}
我们从NewConsumerGroup作为入口开始源码分析:
consumer_group.go
代码语言:javascript复制func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
client, err := NewClient(addrs, config)
c, err := newConsumerGroup(groupID, client)
}
先创建一个client,然后生成一个consumerGroup 对象:
代码语言:javascript复制type ConsumerGroup interface {
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
// Errors returns a read channel of errors that occurred during the consumer life-cycle.
// By default, errors are logged and not returned over this channel.
// If you want to implement any custom error handling, set your config's
// Consumer.Return.Errors setting to true, and read from this channel.
Errors() <-chan error
// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
// this function before the object passes out of scope, as it will otherwise leak memory.
Close() error
}
代码语言:javascript复制type consumerGroup struct {
client Client
config *Config
consumer Consumer
groupID string
memberID string
errors chan error
lock sync.Mutex
closed chan none
closeOnce sync.Once
userData []byte
}
代码语言:javascript复制func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
consumer, err := NewConsumerFromClient(client)
}
创建consumerGroup的同时会创建consumer对象:
consumer.go
代码语言:javascript复制func NewConsumerFromClient(client Client) (Consumer, error) {
cli := &nopCloserClient{client}
return newConsumer(cli)
}
代码语言:javascript复制func newConsumer(client Client) (Consumer, error) {
}
代码语言:javascript复制type consumer struct {
conf *Config
children map[string]map[int32]*partitionConsumer
brokerConsumers map[*Broker]*brokerConsumer
client Client
lock sync.Mutex
}
创建完ConsumerGroup后我们就开始消费了,对应的接口是Consume
代码语言:javascript复制func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
c.client.RefreshMetadata(topics...)//加载元数据
sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
go c.loopCheckPartitionNumbers(topics, sess)
}
RefreshMetadata用于获取对应元数据信息,代码在client.go
代码语言:javascript复制func (client *client) RefreshMetadata(topics ...string) error {
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
代码语言:javascript复制func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
broker = client.any()
req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
response, err := broker.GetMetadata(req)
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
}
每个 partition 与 consumer 的分配关系称作一个 “claim”;一组 ConsumerGroupClain 这一轮的生命周期称作一个 session。session 的退出发生在 ctx 退出,或者 partition rebalance。session 要求客户端与 coordinator 保持一定的心跳,原版 kafka 客户端为此有一条 session.timeout.ms 的配置,客户端需要在时间范围内对 coordinator 发送心跳,不然将视为该客户端退出而出发 Rebalance。
代码语言:javascript复制func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
coordinator, err := c.client.Coordinator(c.groupID)
join, err := c.joinGroupRequest(coordinator, topics)
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
代码语言:javascript复制func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
go sess.heartbeatLoop()
// start consuming
for topic, partitions := range claims {
for _, partition := range partitions {
sess.waitGroup.Add(1)
go func(topic string, partition int32) {
sess.consume(topic, partition)
}(topic, partition)
}
}
}
代码语言:javascript复制type consumerGroupSession struct {
parent *consumerGroup
memberID string
generationID int32
handler ConsumerGroupHandler
claims map[string][]int32
offsets *offsetManager
ctx context.Context
cancel func()
waitGroup sync.WaitGroup
releaseOnce sync.Once
hbDying, hbDead chan none
}
调用了 sess.consume(topic, partition) 这个接口:
代码语言:javascript复制func (s *consumerGroupSession) consume(topic string, partition int32) {
// create new claim
claim, err := newConsumerGroupClaim(s, topic, partition, offset)
s.handler.ConsumeClaim(s, claim)
}
代码语言:javascript复制func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
}
代码语言:javascript复制type consumerGroupClaim struct {
topic string
partition int32
offset int64
PartitionConsumer
}
调用了ConsumePartition消费对应的partition
consumer.go
代码语言:javascript复制func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
child := &partitionConsumer
if err := child.chooseStartingOffset(offset); err != nil
if leader, err = c.client.Leader(child.topic, child.partition); err != nil
if err := c.addChild(child); err != nil
c.children[child.topic] = topicChildren
topicChildren[child.partition] = child
go withRecover(child.dispatcher)
go withRecover(child.responseFeeder)
child.broker = c.refBrokerConsumer(leader)
bc := c.brokerConsumers[broker]
bc.refs
child.broker.input <- child
}
创建了一个partitionConsumer对象:
代码语言:javascript复制type partitionConsumer struct {
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
consumer *consumer
conf *Config
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
feeder chan *FetchResponse
preferredReadReplica int32
trigger, dying chan none
closeOnce sync.Once
topic string
partition int32
responseResult error
fetchSize int32
offset int64
retries int32
}
同时起了两个协程,这两个协程是核心
1,先看dispatcher,主要是维护订阅者信息
代码语言:javascript复制 func (child *partitionConsumer) dispatcher()
for range child.trigger
if err := child.dispatch(); err != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.consumer.removeChild(child)
close(child.feeder
看下dispatcher协程里的dispatch方法
代码语言:javascript复制 func (child *partitionConsumer) dispatch() error
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil
broker, err := child.preferredBroker()
child.broker = child.consumer.refBrokerConsumer(broker)
child.broker.input <- child
先获得一个brokerConsumer 对象:
代码语言:javascript复制type brokerConsumer struct {
consumer *consumer
broker *Broker
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
subscriptions map[*partitionConsumer]none
wait chan none
acks sync.WaitGroup
refs int
}
代码语言:javascript复制func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
bc = c.newBrokerConsumer(broker)
}
代码语言:javascript复制func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer
go withRecover(bc.subscriptionManager)
go withRecover(bc.subscriptionConsumer)
起了两个协程:
代码语言:javascript复制func (bc *brokerConsumer) subscriptionManager(){
case event, ok := <-bc.input:
buffer = append(buffer, event)
case bc.newSubscriptions <- buffer
buffer = nil
}
input里面有新的订阅请求,会appende到newSubscriptions 里面,不是带缓冲的channel,是一个chnel,里面是个slice
代码语言:javascript复制func (bc *brokerConsumer) subscriptionConsumer()
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)
response, err := bc.fetchNewMessages()
bc.acks.Add(len(bc.subscriptions))
child.feeder <- response
bc.acks.Wait()
bc.handleResponses()
每次收到消费者变换的消息后,都会调用fetchNewMessages,然后放到feeder里面
代码语言:javascript复制func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
return bc.broker.Fetch(request)
}
Fetch就是请求broker,获取消息
代码语言:javascript复制func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
response := new(FetchResponse)
err := b.sendAndReceive(request, response)
}
2,接着看下responseFeeder协程
代码语言:javascript复制 func (child *partitionConsumer) responseFeeder() {
feederLoop:
从broker获取消息的大循环
for response := range child.feeder
for i, msg := range msgs {
case child.messages <- msg:
child.broker.input <- child
continue feederLoop
}
这是整个consumer的消息大循环,不断从feeder里面消费消息,放到messages里面,处理完毕以后将自己放回broker的input里面。
subscriptionManager会从input里面把它取出来,然后取kafka拉取消息,完成了完整的消息循环
最后看下Messages接口
代码语言:javascript复制func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
return child.messages
}
很简单,就是把处理好的消息从messages这个chanel里面取出来。
总结下:
partitonConsumer 会启动 dispatcher 和 responseFeeder 两个 goroutine:
1,dispatcher goroutine 用于跟踪 broker 的变化,偏元信息性质的控制侧,dispatcher 这个 goroutine 用于发现 broker 的变化。它会侦听 dispatcher.trigger 这个 channel 的通知,来发现 Partition 的 Leader 变化。而 trigger 这个 channel 的更新来自 brokerConsumer 对象。
最后 child.broker.input<- child 这一句,相当于使 partitionConsumer 加入 brokerConsumer 的订阅。
2, responseFeeder 用于跟踪消息的到来,偏数据侧。
child.feed 这个 channel 也是来自 brokerConsumer。大约是处理来自 brokerConsumer 的消息,转发给 messages chan。
值得留意有一个配置项目 child.conf.Consumer.MaxProcessingTime,默认值为 100ms,看注释它的意思是如果朝 messages chan 写入超过 100ms 仍未成功,则停止再向 Broker 发送 fetch 请求。