go: 优雅处理kafka消费退出

2023-11-30 15:57:48 浏览数 (1)

在业务中,kafka的消费者服务非常常见。主要流程是从kafka中取出消息,处理消息。

本文使用kafka-go(github.com/segmentio/kafka-go),调研kafka优雅退出的方式和注意事项。

在这之前,先准备一个多 partitions的 kafka作为实验环境。

代码语言:txt复制
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic testkafka # 创建kafka topic
kafka-topics.sh --delete --bootstrap-server localhost:9092  --topic testkafka # 删除topic

写入端

代码语言:txt复制
func WriteMsg2Kafka() {
	kafkawriter := &kafka.Writer{
		Addr:         kafka.TCP("127.0.0.1:9092"),
		Topic:        "testkafka",
		Balancer:     &kafka.RoundRobin{},
		Async:        true,
		Compression:  kafka.Snappy,
		RequiredAcks: kafka.RequireOne,
		Completion: func(messages []kafka.Message, err error) {
			if err != nil {
				logrus.Errorf("write kafka error:%v", err)
			}
		},
		ErrorLogger: logrus.StandardLogger(),
	}
	i := 0
	for {
		i  = 1
		kafkawriter.WriteMessages(context.Background(), kafka.Message{
			Value: []byte(fmt.Sprint(i)),
		})
		logrus.Infof("send msg")
		time.Sleep(time.Millisecond * 10)
	}
}

为了方便观察现象,写入端会向kafka中顺序写入 1、2、3...。这样在消费者就能知道是否丢失了消息。

消费者

代码语言:txt复制
func ReadKafkaWithKafkago(ctx context.Context, task string) {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        []string{"127.0.0.1:9092"},
		Topic:          "testkafka",
		CommitInterval: time.Second, // 重要的配置,如果不配置将严重影响写入性能
		GroupID:        "test1",
		// Partition: partition.ID,
		// MaxWait: time.Millisecond * 500,
		// Logger:        logrus.StandardLogger(),
		QueueCapacity: 50,
	})

loop:
	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
				logrus.Infof("kafka get context canceled:%v", err)
				break loop
			}
			if errors.Is(err, io.EOF) {
				// 当reader.Close后,进入这个分支
				logrus.Infof("kafka get eof")
				break loop
			}
			logrus.Errorf("kafka get msg error:%v", err)
			continue
		}
		// logrus.Infof("%s", msg.Value)

		t := &Testkafka{}
		id, _ := datautils.ToInt(string(msg.Value))
		t.Id = id
		t.Task = task
		t.Insert()
		err = reader.CommitMessages(context.TODO(), msg)
		if err != nil {
			logrus.Errorf("commit error:%v", err)
		}
	}
	reader.Close()
}
  1. 在这个函数中,传入了ctx用来控制消费者的生命周期。
  2. ctx.Done的时候,触发kafka get context canceled调用。循环被终止。
  3. 在循环跳出后,调用reader.Close()
  4. 为了验证是否丢数据,会将每条消息(id)写入到mysql的表中。

在main中,监听退出信号:

代码语言:txt复制
  ctx, cancel := context.WithCancel(context.Background())
  sigs := make(chan os.Signal, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    sig := <-sigs
    logrus.Infof("GraceFullyExit has exited, sig:%v", sig)
    cancel() // 给kafka消费者发信号让它退出
  }()
  task := os.Args[2]
  kafkatest.ReadKafkaWithKafkago(ctx, task)
  time.Sleep(time.Second)

结论

  1. 使用SELECT id FROM testkafka t1 WHERE NOT EXISTS (SELECT * FROM testkafka t2 WHERE t2.id = t1.id 1)可以观察mysql的表是否ID连续。用以判定是否有数据丢失。
  2. 多次kill 1-N个消费者并重启消费者,不影响kafka数据消费的完整性。这个示例满足数据不丢失这一要求。
  3. 如果只有一个消费者,在kill掉并拉起时,不会有数据重消费的问题。
  4. 如果有多个消费者,kill掉其中一个会偶尔出现少量已入库的消息被重消费。具体原因不明,猜测原因和rebalance机制有关。

0 人点赞