在业务中,kafka的消费者服务非常常见。主要流程是从kafka中取出消息,处理消息。
本文使用kafka-go(github.com/segmentio/kafka-go),调研kafka优雅退出的方式和注意事项。
在这之前,先准备一个多 partitions的 kafka作为实验环境。
代码语言:txt复制kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic testkafka # 创建kafka topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic testkafka # 删除topic
写入端
代码语言:txt复制func WriteMsg2Kafka() {
kafkawriter := &kafka.Writer{
Addr: kafka.TCP("127.0.0.1:9092"),
Topic: "testkafka",
Balancer: &kafka.RoundRobin{},
Async: true,
Compression: kafka.Snappy,
RequiredAcks: kafka.RequireOne,
Completion: func(messages []kafka.Message, err error) {
if err != nil {
logrus.Errorf("write kafka error:%v", err)
}
},
ErrorLogger: logrus.StandardLogger(),
}
i := 0
for {
i = 1
kafkawriter.WriteMessages(context.Background(), kafka.Message{
Value: []byte(fmt.Sprint(i)),
})
logrus.Infof("send msg")
time.Sleep(time.Millisecond * 10)
}
}
为了方便观察现象,写入端会向kafka中顺序写入 1、2、3...。这样在消费者就能知道是否丢失了消息。
消费者
代码语言:txt复制func ReadKafkaWithKafkago(ctx context.Context, task string) {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"127.0.0.1:9092"},
Topic: "testkafka",
CommitInterval: time.Second, // 重要的配置,如果不配置将严重影响写入性能
GroupID: "test1",
// Partition: partition.ID,
// MaxWait: time.Millisecond * 500,
// Logger: logrus.StandardLogger(),
QueueCapacity: 50,
})
loop:
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logrus.Infof("kafka get context canceled:%v", err)
break loop
}
if errors.Is(err, io.EOF) {
// 当reader.Close后,进入这个分支
logrus.Infof("kafka get eof")
break loop
}
logrus.Errorf("kafka get msg error:%v", err)
continue
}
// logrus.Infof("%s", msg.Value)
t := &Testkafka{}
id, _ := datautils.ToInt(string(msg.Value))
t.Id = id
t.Task = task
t.Insert()
err = reader.CommitMessages(context.TODO(), msg)
if err != nil {
logrus.Errorf("commit error:%v", err)
}
}
reader.Close()
}
- 在这个函数中,传入了
ctx
用来控制消费者的生命周期。 - 当
ctx.Done
的时候,触发kafka get context canceled
调用。循环被终止。 - 在循环跳出后,调用
reader.Close()
- 为了验证是否丢数据,会将每条消息(id)写入到mysql的表中。
在main中,监听退出信号:
代码语言:txt复制 ctx, cancel := context.WithCancel(context.Background())
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
logrus.Infof("GraceFullyExit has exited, sig:%v", sig)
cancel() // 给kafka消费者发信号让它退出
}()
task := os.Args[2]
kafkatest.ReadKafkaWithKafkago(ctx, task)
time.Sleep(time.Second)
结论
- 使用
SELECT id FROM testkafka t1 WHERE NOT EXISTS (SELECT * FROM testkafka t2 WHERE t2.id = t1.id 1)
可以观察mysql的表是否ID连续。用以判定是否有数据丢失。 - 多次kill 1-N个消费者并重启消费者,不影响kafka数据消费的完整性。这个示例满足数据不丢失这一要求。
- 如果只有一个消费者,在kill掉并拉起时,不会有数据重消费的问题。
- 如果有多个消费者,kill掉其中一个会偶尔出现少量已入库的消息被重消费。具体原因不明,猜测原因和rebalance机制有关。