RocketMQ架构设计
如图所示:
RocketMQ事务消息原理
依赖于TransactionListener接口
- executeLocalTransaction方法会在发送消息后调用,用于执行本地事务,如果本地事务执行成功,rocketmq再提交消息
- checkLocalTransaction用于对本地事务做检查,rocketmq依赖此方法做补偿
通过两个内部的topic来实现对消息的两阶段支持,
prepare:将消息(消息上带有事务标识)投递到一个名为RMS_SYS_TRANS_HALF_TOPIC的topic中, 而不是投递到真正的topic中。
commit/rollback:producer再通过TransactionListener的executeLocalTransaction方法执行本地事务,当producer的localTransaction处理成功或者失败后,producer会向broker发送commit或rollback命令,如果是commit,则broker会将投递到RMQ_SYS_TRANS_HALF_TOPIC中的消息投递到真实的topic中,然后再投递一表示删除的消息到RMQ_SYS_TRANS_OP_HALF_TOPIC中,表示当前事务已完成;如果是rollback,则没有投递到真实topic的过程,只需要投递表示删除的消息到RMQ_SYS_TRANS_OP_HALF_TOPIC。最后,消费者和消费普通的消息一样消费事务消息
- 第一阶段(prepare)失败:给应用返回发送消息失败
- 事务失败:发送回滚命令给broker,由broker执行消息的回滚
- Commit或rollback失败:由broker定时向producer发起事务检查,如果本地事务成功,则提交消息事务,否则回滚消息事务
事务状态的检查有两种情况:
- commit/rollback:broker会执行相应的commit/rollback操作
- 如果是TRANSACTION_NOT_TYPE,则一段时间后会再次检查,当检查的次数超过上限(默认15次)则丢弃消息
RocketMQ顺序消息原理
默认是不能保证的,需要程序保证发送和消费的是同一个queue,多线程消费也无法保证
发送顺序:发送端自己业务逻辑保证先后,发往一个固定的queue,生产者可以在消息体上设置消息的顺序
发送者实现MessageQueueSelector接口,选择一个queue进行发送,也可使用rocketmq提供的默认实
- 现SelectMessageQueueByHash:按参数的hashcode与可选队列进行求余选择
- SelectMessageQueueByRandom:随机选择
mq:queue本身就是顺序追加写,只需保证一个队列统一时间只有一个consumer消费,通过加锁实现,consumer上的顺序消费有一个定时任务、每隔一定时间向broker发送请求延长锁定
消费端:
- pull模式:消费者需要自己维护需要拉取的queue,一次拉取的消息都是顺序的,需要消费端自己保证顺序消费
- push模式:消费实例实现自MQPushConsumer接口,提供注册监听的方法消费消息,registerMessageListener、重载方法
- MessageListenerConcurrently:并行消费
- MessageListenerOrderly:串行消费,consumer会把消息放入本地队列并加锁,定时任务保证锁的同步
简述RocketMQ持久化机制
- commitLog:日志数据文件,被所有的queue共享,大小为1G,写满之后重新生成,顺序写
- consumeQueue:逻辑queue,消息先到达commitLog、然后异步转发到consumeQueue,包含queue在CommitLog中的物理位置偏移量Offset,消息实体内容的大小和Message Tag的hash值。大小约为600W个字节,写满之后重新生成,顺序写
- indexFile:通过key或者时间区间来查找CommitLog中的消息,文件名以创建的时间戳命名,固定的单个IndexFile大小为400M,可以保存2000W个索引
所有队列共用一个日志数据文件,避免了kafka的分区数过多、日志文件过多导致磁盘IO读写压力较大造成性能瓶颈,rocketmq的queue只存储少量数据、更加轻量化,对于磁盘的访问是串行化避免磁盘竞争,缺点在于:写入是顺序写,但读是随机的,先读ConsumeQueue,再读CommitLog,会降低消息读的效率
消息发送到broker后,会被写入commitLog,写之前加锁,保证顺序写入。然后转发到consumeQueue 息消费时先从consumeQueue读取消息在CommitLog中的起始物理偏移量Offset,消息大小、和消息Tag的HashCode值。在从CommitLog读取消息内容
- 同步刷盘,消息持久化到磁盘才会给生产者返回ack,可以保证消息可靠、但是会影响性能
- 异步刷盘:消息写入pageCache就返回ack给生产者,刷盘采用异步线程,降低读写延迟提高性能和吞吐
RocketMQ如何保证不丢消息
生产者:
- 同步阻塞的方式发送消息,加上失败重试机制,可能broker存储失败,可以通过查询确认
- 异步发送需要重写回调方法,检查发送结果
- ack机制,可能存储CommitLog,存储ConsumerQueue失败,此时对消费者不可见
broker:同步刷盘、集群模式下采用同步复制、会等待slave复制完成才会返回确认
消费者:
- offset手动提交,消息消费保证幂等
定时任务实现原理
- 优先队列:基于小顶堆实现,每次新增任务需要进行堆化,取任务时取堆顶元素、调整堆架构,时间复杂度是O(logN)
- 时间轮算法:是一个环形队列,按照时间的单位区分,每个时间单位里面是一个链表、用来存储定时任务,像时钟一样轮询环形队列,取出链表中的任务执行,如果超出了环形队列的时间粒度、可以使用多级时间轮,即使用不同维度的时间单位,就跟时钟或者水表一样,这一层的走了一圈,下一层的才走了一格,时间复杂度为O(1)
往期相关精彩内容推荐:
- 分布式基础概念-消息中间件[Kafka]
- 分布式基础概念-消息中间件[RabbitMQ]_2
- 分布式基础概念-消息中间件[RabbitMQ]
- 直击灵魂的面试之MQ七连问
历史文章导航:
- 对线面试官系列
- 分布式基础概念系列