Kafka 并发消费单个 partition

2023-05-01 09:38:27 浏览数 (2)

背景

kafka可以通过多个partition实现并发,但是针对单个partition,必须顺序提交。假如消息发送顺序为1,2,3,如果先提交3,会导致1,2被提交。所以不能并发执行后立即提交。

解决思路

记录接收消息的顺序到listA,然后并发执行,执行成功的消息,记录到setB。起个goroutine定时提交,顺序遍历listA,依次判断该消息是否在setB里,如果不在,就把前面遍历的那部分提交,然后等待下一次定时执行。

实现

代码语言:javascript复制
type ConsumerGroupRepo struct {
    reader          *kafka.Reader
    fetchMsgHeader  *msgNode
    fetchMsgTail    *msgNode
    commitMsgHeader *msgNode
    commitMsgTail   *msgNode
    msgChan         chan *kafka.Message
    queueLock       sync.Mutex
}

type msgNode struct {
    msg  *kafka.Message
    next *msgNode
}

func NewConsumerGroupRepo(brokers []string, groupID string, topics []string) *ConsumerGroupRepo {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     brokers,
        GroupID:     groupID,
        GroupTopics: topics,
        MinBytes:    10e3, //10KB
        MaxBytes:    10e6, //10MB
    })
    return &ConsumerGroupRepo{
        reader:          reader,
        fetchMsgHeader:  nil,
        fetchMsgTail:    nil,
        commitMsgHeader: nil,
        commitMsgTail:   nil,
        msgChan:         make(chan *kafka.Message),
    }
}

func (c *ConsumerGroupRepo) Consume(ctx context.Context) error {
    defer close(c.msgChan)
    for {
        select {
        case <-ctx.Done():
            return errors.New("kafka consume stop, context cancel error")
        default:
            m, err := c.reader.FetchMessage(ctx)
            if err != nil {
                return err
            }
            c.queueLock.Lock()
            if c.fetchMsgHeader == nil {
                c.fetchMsgHeader = &msgNode{msg: &m, next: nil}
                c.fetchMsgTail = c.fetchMsgHeader
            } else {
                c.fetchMsgTail.next = &msgNode{msg: &m, next: nil}
                c.fetchMsgTail = c.fetchMsgTail.next
            }
            c.queueLock.Unlock()
            c.msgChan <- &m
        }
    }
}

func (c *ConsumerGroupRepo) Commit(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return errors.New("kafka commit stop, context cancel error")
        default:
            time.Sleep(time.Second)
            if err := c.commit(ctx); err != nil {
                return err
            }
        }
    }
}

func (c *ConsumerGroupRepo) commit(ctx context.Context) error {
    c.queueLock.Lock()
    commitMsgs := make([]kafka.Message, 0, 32)
    for c.fetchMsgHeader != nil {
        var commitNode *msgNode = nil
        cnode := c.commitMsgHeader
        var cnodePre *msgNode = nil
        // 可用set优化
    for cnode != nil {
            if cnode.msg == c.fetchMsgHeader.msg {
                commitNode = c.fetchMsgHeader
                commitMsgs = append(commitMsgs, *(commitNode.msg))
                // 移动fetch指针
                c.fetchMsgHeader = c.fetchMsgHeader.next
                if cnode == c.commitMsgHeader {
                    // 若过commit队列第一个匹配,则直接修改头指针
                    c.commitMsgHeader = c.commitMsgHeader.next
                } else {
                    // 删除中间节点
                    cnodePre.next = cnode.next
                }
                // 若删除的为tail节点
                if cnode == c.commitMsgTail {
                    c.commitMsgTail = cnodePre
                }
                break
            }
            cnodePre = cnode
            cnode = cnode.next
        }
        if commitNode == nil {
            break
        }
    }
    c.queueLock.Unlock()
    if len(commitMsgs) > 0 {
        err := c.reader.CommitMessages(ctx, commitMsgs...)
        return errors.Wrap(err, "commit kafka msg error")
    }
    return nil
}

func (c *ConsumerGroupRepo) GetMsgChan() <-chan *kafka.Message {
    return c.msgChan
}

func (c *ConsumerGroupRepo) CommitMsg(msg *kafka.Message) {
    c.queueLock.Lock()
    defer c.queueLock.Unlock()
    if c.commitMsgHeader == nil {
        c.commitMsgHeader = &msgNode{msg: msg, next: nil}
        c.commitMsgTail = c.commitMsgHeader
    } else {
        c.commitMsgTail.next = &msgNode{msg: msg, next: nil}
        c.commitMsgTail = c.commitMsgTail.next
    }
}

使用

代码语言:javascript复制
func (u *xxxServer) Start(ctx context.Context) error {
    u.wg.Add(u.xxxConcurrentNum   2)
    cancelCtx, cancel := context.WithCancel(context.Background())
    u.cancelFunc = cancel
    // 启动拉取消息goroutine
    go func() {
        defer u.wg.Done()
        if err := u.consumerGroupRepo.Consume(cancelCtx); err != nil {
            log.Errorf("kafka consumer error: % v", err)
            u.cancelFunc()
        }
    }()
    // 启动commit goroutine
    go func() {
        defer u.wg.Done()
        if err := u.consumerGroupRepo.Commit(cancelCtx); err != nil {
            log.Errorf("kafka commit error: % v", err)
            u.cancelFunc()
        }
    }()
    // 启动消费 goroutine
    for i := 0; i < u.xxxConcurrentNum; i   {
        go func() {
            defer u.wg.Done()
            for msg := range u.consumerGroupRepo.GetMsgChan() {
                if err := u.handleMsg(cancelCtx, msg); err != nil {
                    log.Errorf("handle kafka msg error: % v", err)
                    u.cancelFunc()
                }
                u.consumerGroupRepo.CommitMsg(msg)
            }
        }()
    }
    return nil
}

Post Views: 5

0 人点赞