架构
- RocketMQ包含四个组件NameServer, Broker, Consumer, Producer
- NameServer类似注册中心, Broker接收存储消息, Consumer和Producer在项目内定义
- Broker向所有NameServer注册自己, 持续发送心跳包
- Consumer和Producer向NameServer保持长连接, 每隔30s向NameServer获取所有Topic的队列情况
- Consumer和Producer向NameServer获取Broker的地址, 进行消息收发, 也会与所有关联的Broker保持长连接
- 一个Broker内有多个Queue, 一个Queue内会存在多个Topic的消息, 一个Topic也会映射到多个Broker
- Queue内存储的并非消息内容, 而是指向CommitLog的索引
- 消息在每个Broker内以Queue的形式存储
特性
基于rocketmq-spring-boot-starter
- 发送普通消息, 延时消息和顺序消息, 事务消息
- 事务消息, Producer先把消息发送到Broker, 此时的消息状态为半消息, 之后Producer再对消息进行二次确认(Commit或Rollback), Consumer才能消费该条消息, Broker会定时扫描长时间没有进行二次确认的消息, 主动向Producer进行消息回查
- 普通和延时可以并行消费, 顺序消息按照先入先出的顺序进行消费
- 发送失败重试, 失败后重试指定次数
- 消费重试按异常类型可以分为异常重试和超时重试
- 超时重试: Consumer处理时间过长, 在超时时间内没有返回给Broker消费状态, 那么Broker也会自动重试(通过System.exit(-1)重现类似场景, Thread.sleep()无法复现)
- 异常重试, 根据消费者返回的状态判断消费是否成功, 按消息类型可以分为两种重试机制
- 顺序消息: 失败后默认1秒重试一次, 直到成功; 顺序消息与普通消息可能存放在一个Queue中, 由于顺序消息的消费特性, 当顺序消息被消费时, 会锁住当前Queue, 若该消息消费失败, 则同一Queue内后续的消息会阻塞到该消息消费成功为止, 对应状态枚举为ConsumeOrderlyStatus
- 普通消息: 失败后有限次数的重试, 重试过程不阻塞Queue, 间隔时间依次递增, 对应状态枚举为ConsumeConcurrentlyStatus
- 顺序消息 消息会被发送到同一个broker中, 消费者进行消费时, 会锁住当前队列, 以保证消费顺序