整体架构
Name Server
管理Broker实例的注册,提供心跳检测机制
路由管理: Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息
生产者 Producer
以生产者组的形式出现,一个生产者组可以同时发送多个主题的消息
Broker
存储消息、转发消息
Consumer消费者
以消费组的形式出现
同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息
消息模型
- 消息写入能力的水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)
- ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(一般使用这个)
- 集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费
- 广播模式下,同一个 ConsumerGroup 中的每个Consumer 实例都处理全部的队列
可靠性
生产者可靠性 - 重试策略
如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次
如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次
Broker 可靠性 - 刷盘与同步机制
消息写入能力水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)
ConsumerGroup下的消费者主要有两种负载均衡模式,即 广播模式 ,和 集群模式(一般使用这个)
集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费
广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列
可靠性
生产者可靠性 - 重试策略
如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次
如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次
Broker 可靠性 - 刷盘与同步机制
刷盘机制
刷盘方式 | 说明 | 特点 |
---|---|---|
同步刷盘 | 写PageCache,立即刷盘,刷盘完成,返回成功 | 数据安全,吞吐量不大 |
异步刷盘 | 写PageCache,返回成功 依靠刷盘机制刷盘 PageCache中的消息积累到一定的量 或定时触发一次写磁盘操作 | 吞吐量大,性能高,PageCache可能丢失 |
同步机制
同步机制 | 说明 | 特地 |
---|---|---|
同步复制(推荐) | 主从,都写入成功后,返回成功 | 易恢复,写入延迟大,降低系统吞吐量 |
异步复制 | 写主成功,就返回成功 | 数据可能丢失,写入性能高,系统吞吐量大 |
消息者可靠性 - 重试策略
- 返回CONSUME_SUCCESS才算消费完成
- 16次消费都失败,进入死信队列
- CONSUME_LATER按不同messageDelayLevel时间进行再次消费,最长时间为2个小时
Exactly Once需要依托于本地事务表
首选选定唯一键,msgId,或者业务唯一键,例如订单Id
如果 本地事务表中,没有就插入之后执行消费。
实例- 事务消息,顺序消息,tag过滤
一般使用pull模式消费,一个应用一个topic,多个tags模式
pom
代码语言:javascript复制<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
配置
代码语言:javascript复制#nameserver 的ip:host
rocketmq.name-server = ip:host
#消费者不配置
rocketmq.producer.group= wenlei-producer-group
普通消息,带tag,keys
代码语言:javascript复制//普通消息,带tag,keys
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult commonMsg() {
Message message = MessageBuilder.withPayload("消息体")
.setHeader("KEYS", "我是Key").build();
//topic:tag 标记要发送的tag
SendResult sendResult = rocketMQTemplate
.syncSend("wenlei-topic:tag1", message);
log.info("sendResult:{},{},sendStatus{}",
sendResult.getMsgId(),keys,sendResult.getSendStatus().name());
return sendResult;
}
顺序消息
代码语言:javascript复制public SendResult order() {
String shardingKey = UUID.randomUUID().toString();
Message message = MessageBuilder
.withPayload("顺序消息体").setHeader("KEYS", shardingKey).build();
SendResult sendResult = rocketMQTemplate
.syncSendOrderly("wenlei-topic:tag1", message,shardingKey);
log.info("sendResult:{},{},sendStatus{}"
,sendResult.getMsgId(),shardingKey
,sendResult.getSendStatus().name());
return sendResult;
}
事务消息
一个rocketMQTemplate 只能有一个RocketMQLocalTransactionListener, 下面是做额外的ExtRocketMQTemplate
代码语言:javascript复制@ExtRocketMQTemplateConfiguration
@Component("extRocketMQTemplate")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
代码语言:javascript复制RocketMQLocalTransactionListener 执行本地事务,查询本地事务的状态。
@Slf4j
// 绑定extRocketMQTemplate
@RocketMQTransactionListener(
rocketMQTemplateBeanName ="extRocketMQTemplate")
public class TransactionMsgListener
implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState
executeLocalTransaction(Message msg, Object arg) {
try {
log.info("本地的业务工作");
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
e.printStackTrace();
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState
checkLocalTransaction(Message msg) {
log.info("本地的业务工作的状态");
if(成功状态){
return RocketMQLocalTransactionState.COMMIT;
}else if(失败状态){
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}
代码语言:javascript复制// 发送事务消息
public TransactionSendResult tranction() {
String transactionId = UUID.randomUUID().toString();
TransactionSendResult result = this.extRocketMQTemplate
.sendMessageInTransaction("wenlei-topic:tag2",
MessageBuilder.withPayload(param)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build(), param);
return result;
}
按tag消费
代码语言:javascript复制@Component
@RocketMQMessageListener(
// topic:消息的发送者使用同一个topic
topic = "wenlei-topic",
//group:在RocketMQ中消费者和发送者组没有关系
consumerGroup = "test-group",
//tag:设置为 * 时,表示全部。
selectorExpression = "tag1 || tag2 || tag3",
//消费模式:默认 CLUSTERING ( CLUSTERING:负载均衡 )
//( BROADCASTING:广播机制 ) 一般不用
messageModel = MessageModel.CLUSTERING )
@Slf4j
public class MyConsumer implements RocketMQListener<MessageExt> {
@Override public void onMessage(MessageExt message) {
log.info("consumer:{},tag:{},keys:{}",
new String(message.getBody(), Charset.forName("utf8")),
message.getTags(),message.getKeys());
}
}