RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

2024-09-13 09:15:03 浏览数 (3)

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

上篇文章主要介绍消息中间件并以RocketMQ架构展开描述其核心组件以及MQ运行流程

本篇文章以Product的视角来看看发送消息的核心原理与设计思想,最后以图文并茂的方式描述出发送消息的核心流程

消息发送方式

RocketMQ中普通消息提供三种发送方式:同步、异步、单向

上篇文章中我们已经使用封装好的API延时过同步发送

在使用三种方式前,我们先来理解它们的理论知识

同步发送:发送完消息后,需要阻塞直到收到Broker的响应,通常用于数据一致性较高的操作,需要确保消息到达Broker并持久化

同步发送收到响应并不一定就是成功,还需要根据响应状态进行判断

SendResult响应状态包括:

  1. SEND_OK:发送成功
  2. FLUSH_DISK_TIMEOUT:刷盘超时
  3. FLUSH_SLAVE_TIMEOUT:同步到备超时
  4. SLAVE_NOT_AVAILABLE:备不可用

(这些状态与设置的刷盘策略有关,后续保证消息可靠的文章再进行详细展开说明,本篇文章还是回归主线探究发送消息)

异步发送:发送完消息后立即响应,不需要阻塞等待,但需要设置监听器,当消息成功或失败时进行业务处理,可以在失败时进行重试等其他逻辑保,通常用于追求响应时间的场景

异步发送相当于同步发送,需要新增SendCallback回调来进行后续成功/失败的处理,并且异步发送没有返回值

代码语言:java复制
@GetMapping("/asyncSend")
public String asyncSend() {
    producer.sendAsyncMsg(topic, "tag", "async hello world!", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("消息发送成功{}", sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
            log.info("消息发送失败", throwable);
            //记录后续重试
        }
    });
    return "asyncSend ok";
}

原生API封装:

代码语言:java复制
public void sendAsyncMsg(String topic, String tag, String jsonBody, SendCallback sendCallback) {
        Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
        try {
            producer.send(message, sendCallback);
        } catch (MQClientException | RemotingException | InterruptedException e) {
            throw new RuntimeException(e);
        }
}

单向发送:只要发出消息就响应,性能最好,通常用于追求性能,不追求可靠的场景,如:异步日志收集

由于单向发送的特性,即不需要回调也没有返回结果

代码语言:java复制
@GetMapping("/sendOnewayMsg")
public String onewaySend() {
    producer.sendOnewayMsg(topic, "tag", "oneway hello world!");
    return "sendOnewayMsg ok";
}

原生API封装:

代码语言:java复制
public void sendOnewayMsg(String topic, String tag, String jsonBody) {
    Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
    try {
        producer.sendOneway(message);
    } catch (MQClientException | RemotingException | InterruptedException e) {
        throw new RuntimeException(e);
    }
}

发送消息原理

在研究发送消息的原理前,不妨来思考下,如果让我们实现,我们要思考下需要哪些步骤?

像我们平时进行业务代码编写前的第一步就是进行参数校验,因为要防止参数“乱填”的意外情况

然后由于需要找到对应的Broker,那肯定要获取Topic路由相关信息

这个路由信息前文说过是从NameServer集群定时获取即时更新的,那么客户端的内存里肯定会进行存储

像这样的数据肯定是类似于多级缓存的,先在本地缓存,如果本地没有或者本地是旧数据,那么就网络通信再去远程(NameServer)获取一份后再更新本地缓存

获取完路由信息后,可以通过设置的Topic获取对应的MessageQueue队列信息,因为Topic下可能有很多队列,因此需要负载均衡算法决定要发送的队列

rocketmq发送消息还提供超时、重试等机制,因此在这个过程中需要计算时间、重试次数

最后发送消息会进行网络通信,我们要选择合适的工具进行RPC

总结一下,如果让我们设计起码要有这些流程:参数校验、获取路由信息、根据负载均衡算法选择队列、计算超时,重试次数、选择网络通信RPC工具...

在设计完流程后,如果我们是一位”成熟的设计师“,那么一定会将这些步骤中通用的步骤抽象成模板,模板可以作为三种发送消息通用方式,而那些变动的就是策略,解耦互不影响,并在重要的流程前后留下”钩子“,方便让使用者进行扩展

rocketmq流程与我们设计、思考的流程类似,先准备一张最终的流程图,方便跟着流程图一起阅读源码:

image.pngimage.png
sendDefaultImpl 通用发送消息模板

通过三种发送方式,都会来到DefaultMQProducerImpl.sendDefaultImpl这个就是通用方法的模板

代码块中只展示部分关键代码,流程如下:

  1. 参数校验 Validators.checkMessage
  2. 获取路由信息 tryToFindTopicPublishInfo
  3. 选择一个要发送的MessageQueue selectOneMessageQueue
  4. 发送消息 sendKernelImpl

在3、4步骤中还会进行重试、超时判断

代码语言:java复制
private SendResult sendDefaultImpl(
    //消息
    Message msg,
    //方式
    final CommunicationMode communicationMode,
    //异步的回调
    final SendCallback sendCallback,
    //超时时间
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //参数校验
    Validators.checkMessage(msg, this.defaultMQProducer);
    
    //获取路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        
        //计算重试次数
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1   this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        //已经重发次数
        int times = 0;
        //重试循环
        for (; times < timesTotal; times  ) {
            //上次试过的Broker
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            
            //选择一个要发送的MessageQueue
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    //重发时设置topic
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    //超时退出
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }

                    //发送
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    //记录延时
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    //最后分情况处理
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            //如果响应状态不成功 如果设置重试其他broker则进行重试
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } 
       //...             
}

其中CommunicationMode就是发送的方式,分别为:SYNC同步、ASYNC异步、ONEWAY单向

tryToFindTopicPublishInfo 获取路由信息

rocketmq中使用大量散列表存储数据,其中存储路由信息的是

代码语言:java复制
ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>()

topicPublishInfoTable中Key为topic,Value为路由信息TopicPublishInfo

TopicPublishInfo中主要包括messageQueueList对应的队列列表、sendWhichQueue后续用来选择哪一个队列、topicRouteData路由数据

在topicRouteData路由数据中主要有brokerDatas、queueDatas

brokerDatas包含所有的Broker信息,queueDatas包含每个broker上对应的数据,比如读写队列数量

image.pngimage.png

在获取路由信息的方法中,先尝试从本地获取 this.topicPublishInfoTable.get ,如果本地不存在则从NameServer获取 this.mQClientFactory.updateTopicRouteInfoFromNameServer

(这里的this.mQClientFactory实际上是MQClientInstance,生产者、消费者都会用到,用于客户端远程调用服务端,里面也会存对应相关的组件)

代码语言:java复制
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //本地获取
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //远程获取
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //远程获取
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
selectOneMessageQueue 选择队列

选择队列默认情况下会来到这里,会通过线性轮询选择队列 selectOneMessageQueue,重试的区别为本次选择的broker不和上次的相同

(因为上次失败broker可能会存在问题,这次就换一个broker)

代码语言:java复制
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //lastBrokerName:上一次的broker
    if (lastBrokerName == null) {
        //线性轮询选择队列 selectOneMessageQueue
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i  ) {
            //线性轮询选择队列
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //找到不和上次一样的broker
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}
sendKernelImpl 封装消息

在发送消息前需要对消息进行封装,如:设置唯一ID、尝试压缩消息、封装消息头等

在发送前还有检查禁止发送的钩子和发送前后执行的钩子,方便扩展

代码语言:java复制
private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

    
    //获取broker信息
    String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            //不是批量消息就设置唯一ID
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }

            //尝试压缩消息
            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                sysFlag |= compressType.getCompressionFlag();
                msgBodyCompressed = true;
            }

			//尝试执行检查禁止发送消息的钩子
            if (hasCheckForbiddenHook()) {
                //...
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }

            //尝试执行发送消息前的钩子
            if (this.hasSendMessageHook()) {
                //...
                this.executeSendMessageHookBefore(context);
            }

            //封装消息头
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            //set...

            
            //根据不同的发送方式调整
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    //...
                    //获取MQ客户端发送
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    //检查超时
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    //获取MQ客户端发送消息
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            //尝试执行发送完消息的钩子
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
        } 
        //...
}
使用Netty进行网络通信RPC

同步消息最终会调用invokeSync,这种服务间的网络通信又称为远程调用RPC

在RPC前后也有钩子可以进行扩展

最终调用invokeSyncImpl会通过netty的channel进行写数据

代码语言:java复制
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
            //rpc前的钩子
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call the addr["   addr   "] timeout");
            }
        	//使用netty的channel写数据
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
        	//rpc后的钩子
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            this.updateChannelLastResponseTime(addr);
            return response;
    } 
}

通过netty的channel写请求,并添加监听器,最后使用结果调用waitResponse进行同步等待

代码语言:java复制
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    try {
        //结果
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        //写请求 并添加监听器
        channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
            //...
        });

        //同步调用 等待结果
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}

异步消息RPC类似,只是不需要最后的同步等待

重试机制

走完整体的发送消息流程,我们再回过头来查看重试机制

总共尝试发送消息的次数取决于 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1

如果是同步消息则为 1 retryTimesWhenSendFailed 默认2次 = 3次,其他情况就是1次

也就是说只有同步发送才会重试!异步、单向都不会进行重试?

就在我查找同步最大重试次数 retryTimesWhenSendFailed 时,同时还发现异步的最大重试次数 retryTimesWhenSendAsyncFailed

实际上异步发送重试的代码在异常的catch块中,异常才去执行 onExceptionImpl

如果重试同步发送时,需要去其他broker还要把 retryAnotherBrokerWhenNotStoreOK 设置为true,默认false

发送消息流程总结

至此发送消息的流程算是过了一遍,在查看源码的过程中大部分内容都是见名知意的,这不比公司的”shit mountain“看着舒服?

最后再来总结下流程,便于同学们记忆:

image.pngimage.png
  1. 先校验参数,避免参数出错
  2. 再获取Topic路由信息,如果本地没有就从NameServer获取
  3. 然后通过线性轮询法选择队列,如果retryAnotherBrokerWhenNotStoreOK 开启后,同步失败新的重试会选择其他broker
  4. 紧接着对消息进行封装,设置唯一ID、压缩消息、检查禁止发送钩子、发送前后钩子等
  5. 最后使用Netty写请求进行rpc,期间也会有rpc的钩子,如果是同步则会等待
  6. 在此期间会进行重试、超时检测

总结

消息发送的方式有三种:同步、异步、单向,根据顺序可靠性逐渐下降、性能逐渐提升

同步消息能够通过响应判断是否真正成功,常用于需要消息可靠、数据一致的场景,如同步

异步消息通过实现回调处理成功与失败,常用于响应时间敏感的场景,如异步短信

单向消息不需要进行处理,常用于追求性能的场景,如异步日志

消息发送的过程中会先检查消息参数,确保消息无误,再获取路由信息,如果本地不存在则向NameServer获取

路由信息存储topic对应的broker、队列列表、broker上的队列等相关信息

然后通过线性轮询算法选择要发送消息的队列,如果重试则不会选择相同的broker

接着会设置消息的唯一ID、判断是否压缩消息、尝试执行检查禁止发送、发送消息前后的钩子等

最后使用netty写请求进行rpc调用,同时也会有rpc前后的钩子

在此期间同步、异步会根据参数进行超时检查、重试等操作

最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

0 人点赞