ROCKETMQ极简介绍,顺序,事务示例

2023-10-16 15:16:08 浏览数 (2)

整体架构

Name Server

管理Broker实例的注册,提供心跳检测机制

路由管理: Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息

生产者 Producer

以生产者组的形式出现,一个生产者组可以同时发送多个主题的消息

Broker

存储消息、转发消息

Consumer消费者

以消费组的形式出现

同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息

消息模型

  • 消息写入能力的水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)
  • ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(一般使用这个)
    • 集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费
    • 广播模式下,同一个 ConsumerGroup 中的每个Consumer 实例都处理全部的队列

可靠性

生产者可靠性 - 重试策略

如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次

如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次

Broker 可靠性 - 刷盘与同步机制

消息写入能力水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)

ConsumerGroup下的消费者主要有两种负载均衡模式,即 广播模式 ,和 集群模式(一般使用这个)

集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费

广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列

可靠性

生产者可靠性 - 重试策略

如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次

如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次

Broker 可靠性 - 刷盘与同步机制

刷盘机制

刷盘方式

说明

特点

同步刷盘

写PageCache,立即刷盘,刷盘完成,返回成功

数据安全,吞吐量不大

异步刷盘

写PageCache,返回成功 依靠刷盘机制刷盘 PageCache中的消息积累到一定的量 或定时触发一次写磁盘操作

吞吐量大,性能高,PageCache可能丢失

同步机制

同步机制

说明

特地

同步复制(推荐)

主从,都写入成功后,返回成功

易恢复,写入延迟大,降低系统吞吐量

异步复制

写主成功,就返回成功

数据可能丢失,写入性能高,系统吞吐量大

消息者可靠性 - 重试策略

  • 返回CONSUME_SUCCESS才算消费完成
  • 16次消费都失败,进入死信队列
  • CONSUME_LATER按不同messageDelayLevel时间进行再次消费,最长时间为2个小时

Exactly Once需要依托于本地事务表

首选选定唯一键,msgId,或者业务唯一键,例如订单Id

如果 本地事务表中,没有就插入之后执行消费。

实例- 事务消息,顺序消息,tag过滤

一般使用pull模式消费,一个应用一个topic,多个tags模式

pom

代码语言:javascript复制
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

配置

代码语言:javascript复制
#nameserver 的ip:host
rocketmq.name-server = ip:host
#消费者不配置
rocketmq.producer.group= wenlei-producer-group

普通消息,带tag,keys

代码语言:javascript复制
//普通消息,带tag,keys
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult  commonMsg() {
    Message message = MessageBuilder.withPayload("消息体")
       .setHeader("KEYS", "我是Key").build();
  //topic:tag 标记要发送的tag
    SendResult sendResult = rocketMQTemplate
          .syncSend("wenlei-topic:tag1", message);
    log.info("sendResult:{},{},sendStatus{}",
             sendResult.getMsgId(),keys,sendResult.getSendStatus().name());
    return sendResult;
}

顺序消息

代码语言:javascript复制
public SendResult  order() { 
    String shardingKey =  UUID.randomUUID().toString();
    Message message = MessageBuilder
      .withPayload("顺序消息体").setHeader("KEYS", shardingKey).build();
    SendResult sendResult = rocketMQTemplate
      .syncSendOrderly("wenlei-topic:tag1", message,shardingKey);
    log.info("sendResult:{},{},sendStatus{}"
             ,sendResult.getMsgId(),shardingKey
             ,sendResult.getSendStatus().name());
    return sendResult;
}

事务消息

一个rocketMQTemplate 只能有一个RocketMQLocalTransactionListener, 下面是做额外的ExtRocketMQTemplate

代码语言:javascript复制
@ExtRocketMQTemplateConfiguration
@Component("extRocketMQTemplate")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

RocketMQLocalTransactionListener 执行本地事务,查询本地事务的状态。

代码语言:javascript复制
@Slf4j
// 绑定extRocketMQTemplate
@RocketMQTransactionListener(
  rocketMQTemplateBeanName ="extRocketMQTemplate")
public class TransactionMsgListener 
  implements RocketMQLocalTransactionListener {
    @Override    
    public RocketMQLocalTransactionState
    executeLocalTransaction(Message msg, Object arg) {
        try {
           log.info("本地的业务工作");
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

  @Override    
  public RocketMQLocalTransactionState 
    checkLocalTransaction(Message msg) {
        log.info("本地的业务工作的状态");
        if(成功状态){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(失败状态){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}
代码语言:javascript复制
// 发送事务消息
public TransactionSendResult  tranction() {
    String transactionId = UUID.randomUUID().toString();
    TransactionSendResult result = this.extRocketMQTemplate
      .sendMessageInTransaction("wenlei-topic:tag2",
            MessageBuilder.withPayload(param)
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .build(), param);
    return result;
}

按tag消费

代码语言:javascript复制
@Component
@RocketMQMessageListener(
        // topic:消息的发送者使用同一个topic      
          topic = "wenlei-topic",
        //group:在RocketMQ中消费者和发送者组没有关系        
         consumerGroup = "test-group",
        //tag:设置为 * 时,表示全部。       
         selectorExpression = "tag1 || tag2 || tag3",
        //消费模式:默认 CLUSTERING ( CLUSTERING:负载均衡 )
        //( BROADCASTING:广播机制 ) 一般不用     
          messageModel = MessageModel.CLUSTERING  )
@Slf4j
public class MyConsumer implements RocketMQListener<MessageExt> {
    @Override    public void onMessage(MessageExt message) {
        log.info("consumer:{},tag:{},keys:{}",
        new String(message.getBody(), Charset.forName("utf8")),
        message.getTags(),message.getKeys());
    }
}

0 人点赞