文章目录- 一 源码环境搭建
- 1. 源码拉取
- 2 导入IDEA
- 3 调试
- 1)启动NameServer
- 2)启动Broker
- 3)发送消息
- 4)消费消息
- 二 RocketMQ源代码的目录结构
- 三 RocketMQ的设计理念和设计目标
- 设计理念
- 设计目标
- 1. 架构模式
- 2.顺序消息
- 3.消息过滤
- 4.消息存储
- 5.消息高可用性
- 6.消息到达(消费)低延迟
- 7.确保消息必须被消费一次
- 8.回溯消息
- 9.消息堆积
- 10.定时消息
- 11.消息重试机制
- 1. 源码拉取
- 2 导入IDEA
- 3 调试
- 1)启动NameServer
- 2)启动Broker
- 3)发送消息
- 4)消费消息
- 设计理念
- 设计目标
- 1. 架构模式
- 2.顺序消息
- 3.消息过滤
- 4.消息存储
- 5.消息高可用性
- 6.消息到达(消费)低延迟
- 7.确保消息必须被消费一次
- 8.回溯消息
- 9.消息堆积
- 10.定时消息
- 11.消息重试机制
一 源码环境搭建
依赖工具
- JDK :1.8
- Maven
- IntelliJ IDEA
1. 源码拉取
从官方仓库 https://github.com/apache/rocketmq clone
或者download
源码。
2 导入IDEA
执行安装
代码语言:javascript复制clean install -Dmaven.test.skip=true
3 调试
创建conf
配置文件夹,从distribution
拷贝broker.conf
和logback_broker.xml
和logback_namesrv.xml
1)启动NameServer
- 展开namesrv模块,右键NamesrvStartup.java
- 配置ROCKETMQ_HOME
- 重新启动 控制台打印结果
The Name Server boot success. serializeType=JSON
2)启动Broker
broker.conf
配置文件内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
# 存储路径
storePathRootDir=E:\RocketMQ\data\rocketmq\dataDir
# commitLog路径
storePathCommitLog=E:\RocketMQ\data\rocketmq\dataDir\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\RocketMQ\data\rocketmq\dataDir\consumequeue
# 消息索引存储路径
storePathIndex=E:\RocketMQ\data\rocketmq\dataDir\index
# checkpoint文件路径
storeCheckpoint=E:\RocketMQ\data\rocketmq\dataDir\checkpoint
# abort文件存储路径
abortFile=E:\RocketMQ\data\rocketmq\dataDir\abort
- 创建数据文件夹
dataDir
- 启动
BrokerStartup
,配置broker.conf
和ROCKETMQ_HOME
3)发送消息
- 进入example模块的
org.apache.rocketmq.example.quickstart
- 指定Namesrv地址
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
- 运行
main
方法,发送消息
4)消费消息
- 进入example模块的
org.apache.rocketmq.example.quickstart
- 指定Namesrv地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
- 运行
main
方法,消费消息
二 RocketMQ源代码的目录结构
源码目录结构:
- broker: broker 模块(broke 启动进程)
- client :消息客户端,包含消息生产者、消息消费者相关类
- common :公共包
- dev :开发者信息(非源代码)
- distribution :部署实例文件夹(非源代码)
- example: RocketMQ 例代码
- filter :消息过滤相关基础类
- filtersrv:消息过滤服务器实现相关类(Filter启动进程)
- logappender:日志实现相关类
- namesrv:NameServer实现相关类(NameServer启动进程)
- openmessageing:消息开放标准
- remoting:远程通信模块,给予Netty
- srcutil:服务工具类
- store:消息存储实现相关类
- style:checkstyle相关实现
- test:测试相关类
- tools:工具类,监控命令相关实现类
三 RocketMQ的设计理念和设计目标
设计理念
RocketMQ设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一,主要体现在如下三个方面。
首先,NameServer设计极其简单,摒弃了业界常用的使用Zookeeper充当信息管理的“注册中心”,而是自研NameServer来实现元数据的管理(Topic路由信息等)。从实际需求出发,因为 Topic路由信息无须在集群之间保持强一致,追求最终一致性,并且能容忍分钟级的不一致。正是基于此种情况,RocketMQ的NameServer集群之间互不通信,极大地降低了NameServer实现的复杂程度,对网络的要求也降低了不少,但是性能相比较Zookeeper有了极大的提升。
其次是高效的IO存储机制。RocketMQ追求消息发送的高吞吐量,RocketMQ的消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引入内存映射机制,所有主题的消息存储基于顺序写,极大地提供了消息写性能,同时为了兼顾消息消费与消息查找,引入了消息消费队列文件与索引文件。
最后是容忍存在设计缺陷,适当将某些工作下放给RocketMQ使用者。消息中间件的实现者经常会遇到一个难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次。RocketMQ的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息被消费者消费,但设计上允许消息被重复消费,这样极大地简化了消息中间件的内核,使得实现消息发送高可用变得非常简单与高效,消息重复问题由消费者在消息消费时实现幂等。
设计目标
RocketMQ作为一款消息中间件,需要解决如下问题。
1. 架构模式
RocketMQ与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括:消息发送者、消息服务器(消息存储)、消息消费、路由发现。
2.顺序消息
所谓顺序消息,就是消息消费者按照消息达到消息存储服务器的顺序消费。RocketMQ可以严格保证消息有序。
3.消息过滤
消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。RocketMQ消息过滤支持在服务端与消费端的消息过滤机制。 1)消息在Broker端过滤。Broker只将消息消费者感兴趣的消息发送给消息消费者。 2)消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从Broker传输到消费端。
4.消息存储
消息中间件的一个核心实现是消息的存储,对消息存储一般有如下两个维度的考量:消息堆积能力和消息存储性能。RocketMQ追求消息存储的高性能,引入内存映射机制,所有主题的消息顺序存储在同一个文件中。同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。
5.消息高可用性
通常影响消息可靠性的有以下几种情况。 1)Broker正常关机。 2)Broker异常Crash。 3)OS Crash。 4)机器断电,但是能立即恢复供电情况。 5)机器无法开机(可能是CPU、主板、内存等关键设备损坏)。 6)磁盘设备损坏。
针对上述情况,情况14的RocketMQ在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息。情况56属于单点故障,一旦发生,该节点上的消息全部丢失,如果开启了异步复制机制,RoketMQ能保证只丢失少量消息,RocketMQ在后续版本中将引入双写机制,以满足消息可靠性要求极高的场合。
6.消息到达(消费)低延迟
RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。
7.确保消息必须被消费一次
RocketMQ通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于ACK消息有可能丢失等其他原因,RocketMQ无法做到消息只被消费一次,有重复消费的可能。
8.回溯消息
回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。RocketMQ支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。
9.消息堆积
消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力。RocketMQ消息存储使用磁盘文件(内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。RocketMQ消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天。
10.定时消息
定时消息是指消息发送到Broker后,不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故RocketMQ不支持任意进度的定时消息,而只支持特定延迟级别。
11.消息重试机制
消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制。
参考书目《Rocketmq技术内幕》