confluent-kafka-go是已知的kafka 客户端中最快的,为什么呢?因为它非常轻量,通过cgo 对librdkafka做了一个封装,所以本质上运行的是一个c客户端。
一、安装和配置
安装
代码语言:javascript复制go get -u github.com/confluentinc/confluent-kafka-go
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install
安装kafka
代码语言:javascript复制brew install kafka
brew services start kafka
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
cd /usr/local/Cellar/kafka/2.6.0_1/
./bin/kafka-topics --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic test
./bin/kafka-topics --list --zookeeper localhost:2181
test
./bin/kafka-console-producer --broker-list localhost:9092 --topic test
>This is a message
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
至此,kafka已经安装完毕。但是如果我们直接用go来连接会报错
代码语言:javascript复制1617546888.931|FAIL|rdkafka#producer-1| [thrd:bogon:9092/0]: bogon:9092/0: Failed to resolve 'bogon:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)
如果外界需要访问kafka需要配置advertised.listeners,"PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL
代码语言:javascript复制vi /usr/local/etc/kafka/server.properties
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://localhost:9092
重启kfaka
代码语言:javascript复制% brew services restart kafka
Stopping `kafka`... (might take a while)
==> Successfully stopped `kafka` (label: homebrew.mxcl.kafka)
==> Successfully started `kafka` (label: homebrew.mxcl.kafka)
已经配置完毕,接着,我们通过go代码来进行生产和消费
代码语言:javascript复制package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %vn", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %vn", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "test"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
代码语言:javascript复制package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"test", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %sn", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)n", err, msg)
}
}
c.Close()
}
运行一下,发现已经成功了
代码语言:javascript复制go run exp1/producer/main.go
Delivered message to test[0]@4
Delivered message to test[0]@5
go run exp1/consumer/main.go
Consumer error: Subscribed topic not available: ^aRegex.*[Tt]opic: Broker: Unknown topic or partition (<nil>)
Message on test[0]@4: Welcome
Message on test[0]@5: to
二、源码分析
生产者主要调用了三个接口
1, kafka.NewProducer
2, for e := range p.Events() 在协程中监听生产者事件
3, p.Produce 生产消息
消费者也主要调用了三个接口
1, kafka.NewConsumer
2,c.SubscribeTopics
3, msg, err := c.ReadMessage(-1) 消费消息
下面我们看下confluent-kafka-go 源码的结构
代码语言:javascript复制cd ~/go/pkg/mod/gopkg.in/confluentinc/confluent-kafka-go.v1@v1.6.1
ls
CHANGELOG.md LICENSE README.md examples kafka kafkatest mk
核心源码在kafka这个目录下
代码语言:javascript复制% cd kafka && tree
.
|____kafka_test.go
|____log.go
|____adminoptions.go
|____header.go
|____config.go
|____metadata.go
|____error.go
|____stats_event_test.go
|____event.go
|____misc.go
|____consumer_performance_test.go
|____time.go
|____txn_integration_test.go
|____build_glibc_linux.go
|____producer.go
|____error_gen.go
|____header_test.go
|____offset.go
|____message.go
|____build_musl_linux.go
|____build_darwin.go
|____go_rdkafka_generr
| |____go_rdkafka_generr.go
|____librdkafka_vendor
| |____rdkafka.h
| |____librdkafka_musl_linux.a
| |____README.md
| |____librdkafka_darwin.a
| |____.gitignore
| |____import.sh
| |____librdkafka_glibc_linux.a
| |____bundle-import.sh
| |____LICENSES.txt
| |____librdkafka.go
|____message_test.go
|____adminapi_test.go
|____event_test.go
|____handle.go
|____glue_rdkafka.h
|____producer_performance_test.go
|____README.md
|____testconf-example.json
|____select_rdkafka.h
|____kafka.go
|____.gitignore
|____context.go
|____consumer.go
|____producer_test.go
|____testhelpers.go
|____consumer_test.go
|____adminapi.go
|____generated_errors.go
|____00version.go
|____metadata_test.go
|____error_test.go
|____api.html
|____config_test.go
|____integration_test.go
|____testhelpers_test.go
|____build_dynamic.go
librdkafka_vendor目录下面是针对不同平台,编译的c语言的包,默认是静态链接的加载方式,如果是一个位置的平台,有两种解决方法:
1,编译一个静态链接库,放在librdkafka_vendor 下面,修改bundle-import.sh 文件,编译
2,编译librdkafka成功后,在编译调用代码的时候,指定为动态加载 -tag dynamic
这个目录下最核心的主要有两个文件consumer.go、producer.go
首先看下producer.go
1,func NewProducer首先初始化了一个 Producer的结构体
代码语言:javascript复制type Producer struct {
events chan Event
produceChannel chan *Message
handle handle
// Terminates the poller() goroutine
pollerTermChan chan bool
}
然后解析一系列依赖的配置,接着通过cgo调用一系列c函数来实现producer的初始化。
2,注册生产者关心的一系列事件
代码语言:javascript复制 C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH)
3,创建生产者实例
代码语言:javascript复制p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
4,获取生产队列的主replication
代码语言:javascript复制p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)
5,起协程监听生产者事件
代码语言:javascript复制 go func() {
poller(p, p.pollerTermChan)
p.handle.waitGroup.Done()
}()
代码语言:javascript复制_, term := p.handle.eventPoll(p.events, 100, 1000, termChan)
在event.go文件里定义了事件的分发处理函数
代码语言:javascript复制func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {
里面也是对cgo的事件进行了封装
代码语言:javascript复制rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)
处理了kafka的一系列事件fetch、reblance、error等等。
6,起协程生产消息
代码语言:javascript复制 go func() {
producer(p)
p.handle.waitGroup.Done()
}()
代码语言:javascript复制func channelProducer(p *Producer) {
for m := range p.produceChannel {
err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
里面是cgo函数
代码语言:javascript复制C.do_produce(p.handle.rk, crkt,
C.int32_t(msg.TopicPartition.Partition),
C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
C.int64_t(timestamp),
(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
(C.uintptr_t)(cgoid))
上面两个协程都只是注册了事件,在协程里等待chanel的信号,channel里信号到达才开始处理。
7,Events()仅仅是返回了事件的channel
代码语言:javascript复制func (p *Producer) Events() chan Event {
return p.events
}
8,produce函数和初始化的时候注册的函数底层调用的是同一个,不同的是,初始化的时候需要等待事件的到来。
代码语言:javascript复制func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {
接着看看consumer.go
1,func NewConsumer里面初始化了一个结构体
代码语言:javascript复制type Consumer struct {
events chan Event
handle handle
eventsChanEnable bool
readerTermChan chan bool
rebalanceCb RebalanceCb
appReassigned bool
appRebalanceEnable bool // Config setting
}
2,解析了一系列依赖的配置
3,注册需要监听的事件
代码语言:javascript复制C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH)
4,初始化consumer实例
代码语言:javascript复制c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
5,将consumer注册到事件监听handler里面
代码语言:javascript复制C.rd_kafka_poll_set_consumer(c.handle.rk)
6,获取队列的主分区
代码语言:javascript复制c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)
7,起协程消费消息
代码语言:javascript复制 go func() {
consumerReader(c, c.readerTermChan)
c.handle.waitGroup.Done()
}()
里面等待事件的到来,确定是否终止,和producer一样
代码语言:javascript复制 _, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
8,订阅topic
代码语言:javascript复制func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
代码语言:javascript复制ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
e := C.rd_kafka_subscribe(c.handle.rk, ctopics)
9,通过poll获取事件消息
代码语言:javascript复制 ev := c.Poll(timeoutMs)
switch e := ev.(type) {
case *Message:
代码语言:javascript复制ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil
最终也是调用event.go 的eventPoll来获取消息
代码语言:javascript复制rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)