1. 为什么是 RocketMQ
2. 使用场景
MQ 的基本用途
- 异步处理(如:订单支付后,扣减库存、增加积分,引入 MQ 异步去做,降低响应时间)
- 系统解耦(如:订单完成后,发送 MQ,下游依赖系统(库存、营销)自行调用)
- 削峰填谷(如:秒杀活动,请求进入MQ,服务器慢慢处理,多余请求抛弃)
- 日志收集(如:分布式系统调用日志记录)
MQ 本质:异步通信
3. 工作原理
RocketMQ 四大核心组件
- NameServer:主要负责对于元数据的管理,包括了对于 Topic 和路由信息的管理。(类比:快递中心)
- Broker:消息中转角色,负责存储消息,转发消息。(类比:快递柜)
- Producer:负责生产消息。(类比:寄件人发货)
- Consumer:负责消费消息。(类比:收件人收货)
4. 演进过程
- 在 RocketMQ 的早期版本其实不叫 RocketMQ,而是叫 MetaQ,在 MetaQ1.0 和 MetaQ2.0 实际上是依赖的是 Zookeeper,但是从 MetaQ3.0 的时候更名为 RocketMQ,同时也去掉了 Zookeeper。
- RocketMQ 整体设计思想是来自于 kafka,而 kafka 是使用 Zookeeper 作为注册中心,所以早期版本也是使用的 Zookeeper,到了后期发展,摈弃了 Zookeeper,自己写了一套 NameServer,保证独立性。
- 采用 Zookeeper 需要保持强一致性,还会导致整体架构就会变得复杂,在维护、搭建成本上都会上升,为了保证高性能,低维护成本,那么就开发了 NameServer。
5. 从 NameServer 起点
5.1 RocketMQ 大脑 —— NameServer
NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现,主要功能如下:
Broker管理:接收 Broker 集群的注册信息并且保存下来作为路由信息的基本数据,提供心跳监测机制,检查 Broker 是否还存活。
路由信息管理:每个 NameServer中都保存 Broker 集群的整个路由信息可用于客户端查询队列信息,Producer 和 Consumer 通过 NameServer可以获取整个 Broker 集群路由信息从而进行消息投递和消费。
5.2 本地调试
日志文件存储路径配置和启动
5.3 NamesrvStartup 启动类
启动类: org.apache.rocketmq.namesrv.NamesrvStartup
代码片段如下:
代码语言:java复制public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
通过 main 方法启动 NameServer,分为两大步,先创建 NamesrvController,而后再初始化并启动 NamesrvController。
5.4 NameServer 启动时序图
6. 路由管理
6.1 路由注册
6.1.1 心跳请求
在 RocketMQ 中,默认情况下,Broker 服务器会每间隔 30 秒向集群中所有的 NameServer 发送心跳信息。
org.apache.rocketmq.broker.BrokerController#start
代码语言:java复制// 每隔30s发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
NameServer 会接收到 Broker 发来的心跳包,DefaultRequestProcessor(默认请求处理器) 根据请求头中的code 码,执行处理心跳包的操作。
具体会调用 RouteInfoManager.registerBroker() 操作。其实 broker 注册路由过程,主要还是操作RouteInfoManager 中的多个映射表
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
6.1.2 RouteInfoManager
RouteInfoManager:用于管理心跳信息以及路由管理
RouteInfoManager 管理的路由元数据:(元数据是什么?)
① topicQueueTable:topic路由信息,broker名称,读队列数,写队列数等
② brokerAddrTable:broker信息,broker集群,broker名称,broker地址等
③ clusterAddrTable:broker集群信息,broker名称列表
④ brokerLiveTable:broker状态信息,broker存活最新上报时间
⑤ filterServerTable:filterServer列表,消息过滤信息
路由元信息Map结构示例如下:
6.2 路由删除
org.apache.rocketmq.namesrv.NamesrvController#initialize
代码语言:javascript复制// 每隔 10s 进行扫描判断是否存在失效的Broker,如果存在则移除失效broker
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker
代码语言:javascript复制public int scanNotActiveBroker() {
int removeCount = 0;
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
removeCount ;
}
}
return removeCount;
}
备注:Broker 视角触发路由删除,即 Broker 在正常关闭的情况下,会执行 unregisterBroker 指令这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该 broker 相关的信息。
6.3 路由发现
RocketMQ 的路由发现采用的是 Pull 模型,当 Topic 路由信息发生变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由,默认让客户端每 30 秒会拉取一次最新的路由。
(Push、Pull、Long Polling 三种模型的优缺点?)
7. 小结
7.1 回顾
这次源码剖析学到了什么?
本次分享很多不足的地方还请指教。
如下图所示:
7.2 源码学习的方法
本地调试法(启动类,Debug,QuickStart等)
7.3 NameServer 设计的亮点
- 读写锁使用,Map 结构存储路由信息;
- NameServer 之间不通信(AP,最终一致性);
- NameServer 只管理 Broker 集群等信息。
8. 本地源码调试 —— Step by Step【附录】
(1)从 GitHub 上 clone 代码到本地
https://github.com/apache/rocketmq.git
(2)导入 IDEA
(3)执行 mvn clean install -DskipTests
(4)启动 NameServer
(5)启动 Broker
(6)启动 Producer
(7)启动 Consumer
8.1 启动 NameServer
org.apache.rocketmq.namesrv.NamesrvStartup#main
打开 【Edit Configuration】配置启动参数:
ROCKET_HOME 环境变量设置完成如下:
再次启动 NameServer
添加 logback_namesrv.xml 文件到指定文件路径下:
打开 logback_namesrv.xml 文件并替换 RocketMQ 运行路径
再次启动,出现“The Name Server boot success …”字样,表示启动成功。
8.2 启动 Broker
org.apache.rocketmq.broker.BrokerStartup#main
ROCKET_HOME 环境变量设置如下:
再次启动 NameServer
添加 logback_namesrv.xml 文件到指定文件路径下,报错:未找到 broker.conf 文件
打开 broker.conf 文件并添加配置信息
代码语言:javascript复制brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#追加配置如下:
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#nameServer 地址,分号分割
namesrvAddr=127.0.0.1:9876
#broker 地址
brokerIP1=10.2.67.197
#存储路径
storePathRootDir=D:/xxx/rocketmqserver/store
#commitLog 存储路径
storePathCommitLog=D:/xxx/rocketmqserver/store/commitlog
#消费队列存储路径
storePathConsumeQueue=D:/xxx/rocketmqserver/store/consumequeue
#消息索引存储路径
storePathIndex=D:/xxx/rocketmqserver/store/index
#checkpoint 文件存储路径
storeCheckpoint=D:/xxx/rocketmqserver/store/checkpoint
#abort 文件存储路径
abortFile=D:/xxx/rocketmqserver/store/abort
设置【Programma arguments】项目启动参数如下:
再次启动,出现“The broker[…] boot success …”字样,表示启动成功。
8.3 启动 Producer
org.apache.rocketmq.example.quickstart.Producer#main
8.4 启动 Consumer
org.apache.rocketmq.example.quickstart.Consumer#main
原文链接:https://mp.weixin.qq.com/s/94DaQVGvZwFQlSUvQmrL1A