【最佳实践】巡检项:消费者创建与健康检查

2022-03-29 17:24:58 浏览数 (2)

在 TDMQ Pulsar 版控制台中,订阅代表一个具体的消费者以及其对某个 Topic 的订阅关系。当一个消费者订阅了某个 Topic 之后,则该 Topic 下的消息均可以被其消费。一个订阅可以订阅多个 Topic ,例如用户在一个 Topic 下创建了一个订阅后,其不仅会订阅当前的 Topic,还会订阅系统自动创建的重试队列 Topic。

本文档可以指导您使用 TDMQ Pulsar 版时,如何利用订阅管理对一个 Topic 下的订阅进行设置。

一、消费者检查操作步骤

查看订阅详情

  1. 登录 TDMQ Pulsar 版控制台,在左侧导航栏中单击Topic 管理
  2. 在 Topic 管理列表页中,找到需要管理订阅的 Topic,单击操作列的查看订阅,进入订阅列表。
  3. 在订阅列表中,一级列表可以看到订阅了当前 Topic 的所有订阅,二级展开后可以看到每个订阅的消费连接实例以及每个分片的消费进度。
消费者消费者

在这里可以看到对应订阅,以及订阅下面的消费者消息情况

二、订阅模式

为了适用不同场景的需求,Pulsar 支持四种订阅模式:Exclusive、Shared、Failover、Key_Shared。

四种模式四种模式

独占模式(Exclusive)

Exclusive 独占模式(默认模式):一个 Subscription 只能与一个 Consumer 关联,只有这个 Consumer 可以接收到 Topic 的全部消息,如果该 Consumer 出现故障了就会停止消费。

Exclusive 订阅模式下,同一个 Subscription 里只有一个 Consumer 能消费 Topic,如果多个 Consumer 订阅则会报错,适用于全局有序消费的场景。

代码语言:javascript复制
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("sub_topic1")
    // 声明消费模式为exclusive(独占)模式
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();

启动多个消费者将收到错误信息。

共享模式(Shared)

消息通过 round robin 轮询机制(也可以自定义)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

代码语言:javascript复制
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("sub_topic1")
    // 声明消费模式为 Shared(共享)模式
    .subscriptionType(SubscriptionType.Shared)
    .subscribe();

多个 Shared 模式消费者。

灾备模式(Failover)

当存在多个 consumer 时,将会按字典顺序排序,第一个 consumer 被初始化为唯一接受消息的消费者。当第一个 consumer 断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个 consumer。

代码语言:javascript复制
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("sub_topic1")
    // 声明消费模式为灾备模式
    .subscriptionType(SubscriptionType.Failover)
    .subscribe();

多个 Failover 模式消费者。

KEY 共享模式(Key_Shared)

当存在多个 consumer 时,将根据消息的 key 进行分发,key 相同的消息只会被分发到同一个消费者。

代码语言:javascript复制
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxx/sdk_java/topic1")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("sub_topic1")
    // 声明消费模式为 Key_Shared(Key 共享)模式
    .subscriptionType(SubscriptionType.Key_Shared)
    .subscribe();

多个 Key_Shared 模式消费者。

0 人点赞