confluent-kafka-go源码分析

2022-08-02 19:13:58 浏览数 (1)

confluent-kafka-go是已知的kafka 客户端中最快的,为什么呢?因为它非常轻量,通过cgo 对librdkafka做了一个封装,所以本质上运行的是一个c客户端。

一、安装和配置

安装

代码语言:javascript复制
go get -u github.com/confluentinc/confluent-kafka-go
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

安装kafka

代码语言:javascript复制
brew install kafka
brew services start kafka
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

cd /usr/local/Cellar/kafka/2.6.0_1/
./bin/kafka-topics --create  --zookeeper localhost:2181 --partitions 1 --replication-factor 1  --topic test

./bin/kafka-topics --list --zookeeper localhost:2181
test

./bin/kafka-console-producer --broker-list localhost:9092 --topic test
>This is a message

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message

至此,kafka已经安装完毕。但是如果我们直接用go来连接会报错

代码语言:javascript复制
1617546888.931|FAIL|rdkafka#producer-1| [thrd:bogon:9092/0]: bogon:9092/0: Failed to resolve 'bogon:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)

如果外界需要访问kafka需要配置advertised.listeners,"PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL

代码语言:javascript复制
vi /usr/local/etc/kafka/server.properties
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://localhost:9092

重启kfaka

代码语言:javascript复制
% brew services  restart kafka
Stopping `kafka`... (might take a while)
==> Successfully stopped `kafka` (label: homebrew.mxcl.kafka)
==> Successfully started `kafka` (label: homebrew.mxcl.kafka)

已经配置完毕,接着,我们通过go代码来进行生产和消费

代码语言:javascript复制
package main

import (
  "fmt"

  "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

  p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
  if err != nil {
    panic(err)
  }

  defer p.Close()

  // Delivery report handler for produced messages
  go func() {
    for e := range p.Events() {
      switch ev := e.(type) {
      case *kafka.Message:
        if ev.TopicPartition.Error != nil {
          fmt.Printf("Delivery failed: %vn", ev.TopicPartition)
        } else {
          fmt.Printf("Delivered message to %vn", ev.TopicPartition)
        }
      }
    }
  }()

  // Produce messages to topic (asynchronously)
  topic := "test"
  for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
    p.Produce(&kafka.Message{
      TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
      Value:          []byte(word),
    }, nil)
  }

  // Wait for message deliveries before shutting down
  p.Flush(15 * 1000)
}
代码语言:javascript复制
package main

import (
  "fmt"

  "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

  c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost",
    "group.id":          "myGroup",
    "auto.offset.reset": "earliest",
  })

  if err != nil {
    panic(err)
  }

  c.SubscribeTopics([]string{"test", "^aRegex.*[Tt]opic"}, nil)

  for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
      fmt.Printf("Message on %s: %sn", msg.TopicPartition, string(msg.Value))
    } else {
      // The client will automatically try to recover from all errors.
      fmt.Printf("Consumer error: %v (%v)n", err, msg)
    }
  }

  c.Close()
}

运行一下,发现已经成功了

代码语言:javascript复制
go run exp1/producer/main.go
Delivered message to test[0]@4
Delivered message to test[0]@5

go run exp1/consumer/main.go
Consumer error: Subscribed topic not available: ^aRegex.*[Tt]opic: Broker: Unknown topic or partition (<nil>)
Message on test[0]@4: Welcome
Message on test[0]@5: to

二、源码分析

生产者主要调用了三个接口

1, kafka.NewProducer

2, for e := range p.Events() 在协程中监听生产者事件

3, p.Produce 生产消息

消费者也主要调用了三个接口

1, kafka.NewConsumer

2,c.SubscribeTopics

3, msg, err := c.ReadMessage(-1) 消费消息

下面我们看下confluent-kafka-go 源码的结构

代码语言:javascript复制
cd ~/go/pkg/mod/gopkg.in/confluentinc/confluent-kafka-go.v1@v1.6.1
ls
CHANGELOG.md    LICENSE         README.md       examples        kafka           kafkatest       mk

核心源码在kafka这个目录下

代码语言:javascript复制
% cd kafka && tree
.
|____kafka_test.go
|____log.go
|____adminoptions.go
|____header.go
|____config.go
|____metadata.go
|____error.go
|____stats_event_test.go
|____event.go
|____misc.go
|____consumer_performance_test.go
|____time.go
|____txn_integration_test.go
|____build_glibc_linux.go
|____producer.go
|____error_gen.go
|____header_test.go
|____offset.go
|____message.go
|____build_musl_linux.go
|____build_darwin.go
|____go_rdkafka_generr
| |____go_rdkafka_generr.go
|____librdkafka_vendor
| |____rdkafka.h
| |____librdkafka_musl_linux.a
| |____README.md
| |____librdkafka_darwin.a
| |____.gitignore
| |____import.sh
| |____librdkafka_glibc_linux.a
| |____bundle-import.sh
| |____LICENSES.txt
| |____librdkafka.go
|____message_test.go
|____adminapi_test.go
|____event_test.go
|____handle.go
|____glue_rdkafka.h
|____producer_performance_test.go
|____README.md
|____testconf-example.json
|____select_rdkafka.h
|____kafka.go
|____.gitignore
|____context.go
|____consumer.go
|____producer_test.go
|____testhelpers.go
|____consumer_test.go
|____adminapi.go
|____generated_errors.go
|____00version.go
|____metadata_test.go
|____error_test.go
|____api.html
|____config_test.go
|____integration_test.go
|____testhelpers_test.go
|____build_dynamic.go

librdkafka_vendor目录下面是针对不同平台,编译的c语言的包,默认是静态链接的加载方式,如果是一个位置的平台,有两种解决方法:

1,编译一个静态链接库,放在librdkafka_vendor 下面,修改bundle-import.sh 文件,编译

2,编译librdkafka成功后,在编译调用代码的时候,指定为动态加载 -tag dynamic

这个目录下最核心的主要有两个文件consumer.go、producer.go

首先看下producer.go

1,func NewProducer首先初始化了一个 Producer的结构体

代码语言:javascript复制
type Producer struct {
  events         chan Event
  produceChannel chan *Message
  handle         handle

  // Terminates the poller() goroutine
  pollerTermChan chan bool
}

然后解析一系列依赖的配置,接着通过cgo调用一系列c函数来实现producer的初始化。

2,注册生产者关心的一系列事件

代码语言:javascript复制
  C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH)

3,创建生产者实例

代码语言:javascript复制
p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)

4,获取生产队列的主replication

代码语言:javascript复制
p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)

5,起协程监听生产者事件

代码语言:javascript复制
  go func() {
    poller(p, p.pollerTermChan)
    p.handle.waitGroup.Done()
  }()
代码语言:javascript复制
_, term := p.handle.eventPoll(p.events, 100, 1000, termChan)

在event.go文件里定义了事件的分发处理函数

代码语言:javascript复制
func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {

里面也是对cgo的事件进行了封装

代码语言:javascript复制
rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)

处理了kafka的一系列事件fetch、reblance、error等等。

6,起协程生产消息

代码语言:javascript复制
  go func() {
    producer(p)
    p.handle.waitGroup.Done()
  }()
代码语言:javascript复制
func channelProducer(p *Producer) {
  for m := range p.produceChannel {
    err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)

里面是cgo函数

代码语言:javascript复制
C.do_produce(p.handle.rk, crkt,
    C.int32_t(msg.TopicPartition.Partition),
    C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
    valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
    keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
    C.int64_t(timestamp),
    (*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
    (C.uintptr_t)(cgoid))

上面两个协程都只是注册了事件,在协程里等待chanel的信号,channel里信号到达才开始处理。

7,Events()仅仅是返回了事件的channel

代码语言:javascript复制
func (p *Producer) Events() chan Event {
  return p.events
}

8,produce函数和初始化的时候注册的函数底层调用的是同一个,不同的是,初始化的时候需要等待事件的到来。

代码语言:javascript复制
func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {

接着看看consumer.go

1,func NewConsumer里面初始化了一个结构体

代码语言:javascript复制
type Consumer struct {
  events             chan Event
  handle             handle
  eventsChanEnable   bool
  readerTermChan     chan bool
  rebalanceCb        RebalanceCb
  appReassigned      bool
  appRebalanceEnable bool // Config setting
}

2,解析了一系列依赖的配置

3,注册需要监听的事件

代码语言:javascript复制
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH)

4,初始化consumer实例

代码语言:javascript复制
c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)

5,将consumer注册到事件监听handler里面

代码语言:javascript复制
C.rd_kafka_poll_set_consumer(c.handle.rk)

6,获取队列的主分区

代码语言:javascript复制
c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)

7,起协程消费消息

代码语言:javascript复制
    go func() {
      consumerReader(c, c.readerTermChan)
      c.handle.waitGroup.Done()
    }()

里面等待事件的到来,确定是否终止,和producer一样

代码语言:javascript复制
  _, term := c.handle.eventPoll(c.events, 100, 1000, termChan)

8,订阅topic

代码语言:javascript复制
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) 
代码语言:javascript复制
ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
e := C.rd_kafka_subscribe(c.handle.rk, ctopics)

9,通过poll获取事件消息

代码语言:javascript复制
    ev := c.Poll(timeoutMs)

    switch e := ev.(type) {
    case *Message:
代码语言:javascript复制
ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil

最终也是调用event.go 的eventPoll来获取消息

代码语言:javascript复制
rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)

0 人点赞