RocketMQ-生产者核心解析、主从同步机制解析,生产者同步异步消息发送
- 生产者核心参数
- Master - Slave主从同步机制解析
- 数据内容同步
- 元数据信息同步
- 通信协议
- 生产者消息发送
- 生产者同步消息发送
- 生产者异步消息发送
生产者核心参数
producerGroup: 组名 createTopicKey:创建topic,实际生产实践不允许生产者创建top。 defaultTopicQueueNums(默认为4):默认的topic关联的队列数量 sendMsgTimeout(单位:ms):发送消息连接broker超时时间。 compressMsgBodyOverHowmuch(默认压缩字节4096):消息体达到多少压缩。 retryTimesWhenSendFailed (可配置):发送失败重试次数 retryAnotherBrokerWhenNotStoreOK(默认false):发送broker存储失败换个broker发送。 maxMessageSize(默认128K):消息最大可以设置多大。 heartbeatBrokerInterval:与broker的心跳间隔(以微秒为单位,默认为30毫秒)
Master - Slave主从同步机制解析
同步的信息主要是数据内容和元数据信息
数据内容同步
实时进行同步,同步的是commitlog中的数据
,对实时性要求高,并且丢失数据就无法恢复。使用原生socket
进行同步。
源码分析:
数据内容同步主要是在rocketmq-store中。主要涉及 HAConnection
、HAService
、WaitNotifyObject
三个类。并没有使用netty而是原生nio,是为了更加高效。
- 对于Master节点
HAService AcceptSocketService(内部类):接受slave节点连接。
HAConnection ReadSocketService(内部类):读来自Slave节点的数据。 WriteSocketService(内部类):写往到Slave节点的数据。
- 对于Slave节点
HAService HAClient(内部类):对Master节点连接、读写数据。
元数据信息同步
broker判断如果是slave节点,那么会启动定时任务不断同步,如果丢失也可以从其他地方重试获取。包含topic信息和offset等。使用netty同步。
源码分析:
元数据同步主要发生在broker,所以这部分代码主要在rocketmq-broker的模块中。是在handleSlaveSynchronize
方法中通过定义了一个固定时间的定时任务,时间是10秒钟执行一次,当然前提条件是broker节点的角色是slave,而broker节点是master时,如果有定时任务会取消,因为master是不用同步元数据信息。
这个方法会有三个地方调用:
- broker刚刚启动
- master切换成slave
- slave切换成master。
定时任务的逻辑是写在syncAll
方法中。主要是需要同步4部分内容:
- 同步topic配置信息
- 同步消费者偏移量
- 同步延时偏移量
- 同步订阅组配置信息。
4个方法的内容实际上就是封装了netty做rpc的调用,对不同的操作都会对应到一个code。
通信协议
Slave => Master:上报CommitLog已经同步到的物理位置。使用maxPhyOffset
字段,代表CommitLog接受到的最大物理位置。
Master => Slave:传输新的CommitLog数据。使用fromPhyOffset
字段,代表CommitLog开始传输的物理位置。
生产者消息发送
生产者同步消息发送
消息的同步发送:producer.send(msg) 同步发送消息核心实现:DefaultMQProducerlmpl 同步发送消息可以直接获取返回值:
代码语言:javascript复制// 同步发送消息,直接获取发送结果
SendResult sr = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNumber = (Integer)arg;
return mqs.get(queueNumber);
}
}, 2);
System.err.println(sr);
生产者异步消息发送
producer.send(Message msg, SendCallback sendCallback) 异步发送消息核心实现:DefaultMQProducerlmpl 异步发送消息需要通过回调函数获取返回值:
代码语言:javascript复制// 异步发送消息,回调函数获取结果
producer.send(message, new SendCallback() {
// 可靠性消息投递
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msgId: " sendResult.getMsgId() ", status: " sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
// 发送失败做异常处理
e.printStackTrace();
System.err.println("------发送失败");
}
});
本文内容到此结束了, 如有收获欢迎点赞