RocketMQ主题的自动创建机制

2023-12-25 19:16:36 浏览数 (1)

问题

在学习RocketMQ的时候,有几个疑问。 如果主题不存在,client把消息发给谁呢? 当发送消息给不存在的主题时,主题是什么时候创建的呢?

猜测

当我执行下面代码时,主题不存在,那么什么时候创建的主题"TopicTest202112151152"呢?

代码语言:javascript复制
  Message msg = new Message("TopicTest202112151152" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ "  i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
            SendResult sendResult = producer.send(msg,1000000000);

其实我当时猜测的是可能发现主题不存在时先给服务器发个消息,让其创建主题,然后再发送消息。 结果是:发送消息的时候创建主题

问题1:client发送消息,主题不存在给谁发?

源码跟踪

以下面一段代码为例,要给“TopicTest202112151154”主题发送消息,发送的内容是时间字符串,跟producer.send方法

代码语言:javascript复制
// Instantiate with a producer group name.
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses.
    producer.setNamesrvAddr("localhost:9876");
    // Launch the instance.
    producer.start();
    // Create a message instance, specifying topic, tag and message body.
    Message msg =
        new Message(
            "TopicTest202112151154",
            "TagA",
            (LocalDateTime.now().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // Call send message to deliver message to one of brokers.
    SendResult sendResult = producer.send(msg, 1000000000);
    System.out.printf("%s%n", sendResult);
    // Shut down once the producer instance is not longer in use.
    producer.shutdown();

跟到DefaultMQProducerImpl###sendDefaultImpl方法

代码语言:javascript复制
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    	//...
    	TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    	//....
    	//...发送消息
    }

跟到DefaultMQProducerImpl###tryToFindTopicPublishInfo方法

代码语言:javascript复制
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //首先从本地缓存中获取,因为主题不存在,所以返回null
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //然后从NameServer获取,因为主题不存在,所以返回一个不Ok的TopicPublishInfo 
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        //因为TopicPublishInfo不Ok
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //重新获取主题,该方法是重点,跟进去
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

跟到MQClientInstance###updateTopicRouteInfoFromNameServer方法 在该方法中获取默认的主题“TBW102”主题在NameServer的路由信息,把新主题的路由信息参考“TBW102”复制一份,此时在客户端上已经认为新主题已经创建好,不过在服务器端是没有创建好改主题的。

代码语言:javascript复制
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
   
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        //获取默认主题defaultMQProducer.getCreateTopicKey(),即TBW102的路由信息
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                            //省略。。。
                    }
                    //然后按照TBW102的topicRouteData把新主题的topicRouteData创建出来,此时客户端就有了新主题的路由信息(实际是TBW102的路由信息)   
        return false;
    }

此时客户端就有新主题的路由信息了,但是路由信息对应的broker上是没有该主题的信息的,不过客户端此时已经知道把消息发给哪个IP了。

问题回答

客户端如果获取的主题信息不存在,会根据“TBW102”主题的信息创建新主题,然后把该新主题的信息存储到客户端本地,此时客户端知道给哪个IP发数据了,然后客户端就会和那个IP的Netty建立连接,然后发数据,Ok了。

问题2:broker收到消息后发现主题不存在,什么时候创建?

从哪开始打断点

首先你要会Netty,这样按照常理你就能知道逻辑在SimpleChannelInboundHandler里。 那么去哪找SimpleChannelInboundHandler呢,应该先找到NettyServer。NettyServer应该在Broker的启动源码里去找。 BrokerController###start方法里有下面的代码

代码语言:javascript复制
if (this.remotingServer != null) {
            this.remotingServer.start();
        }

remotingServer的实现类选择NettyRemotingServer,里面的start方法里有如下代码

代码语言:javascript复制
 ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

其中serverHandler就是MQ自定义的方法,顺藤摸瓜,就找到了NettyServerHandler的channelRead0方法 NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)

代码语言:javascript复制
   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

开始跟源码

当客户端发送消息时,broker的断点会停在下面的processRequestCommand这一行

NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)

代码语言:javascript复制
   public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

NettyRemotingAbstract###processRequestCommand方法 RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd)把任务提交,会到下面代码里的run匿名类里

代码语言:javascript复制
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if (pair != null) {
      Runnable run =
          new Runnable() {
            @Override
            public void run() {
              try {
                doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                final RemotingResponseCallback callback =
                    new RemotingResponseCallback() {
                      @Override
                      public void callback(RemotingCommand response) {
                        doAfterRpcHooks(
                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        if (!cmd.isOnewayRPC()) {
                          if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                              System.out.println(response);
                              ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                              log.error("process request over, but response failed", e);
                              log.error(cmd.toString());
                              log.error(response.toString());
                            }
                          } else {
                          }
                        }
                      }
                    };
                if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                  AsyncNettyRequestProcessor processor =
                      (AsyncNettyRequestProcessor) pair.getObject1();
                  processor.asyncProcessRequest(ctx, cmd, callback);
                } else {
                  NettyRequestProcessor processor = pair.getObject1();
                  RemotingCommand response = processor.processRequest(ctx, cmd);
                  callback.callback(response);
                }
              } catch (Throwable e) {
                log.error("process request exception", e);
                log.error(cmd.toString());

                if (!cmd.isOnewayRPC()) {
                  final RemotingCommand response =
                      RemotingCommand.createResponseCommand(
                          RemotingSysResponseCode.SYSTEM_ERROR,
                          RemotingHelper.exceptionSimpleDesc(e));
                  response.setOpaque(opaque);
                  ctx.writeAndFlush(response);
                }
              }
            }
          };

            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                 //使用线程池把任务提交
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
               
    }

然后跟SendMessageProcessor###asyncProcessRequest方法

代码语言:javascript复制
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
        asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
    }

然后跟SendMessageProcessor###asyncProcessRequest方法

代码语言:javascript复制
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    //走这个分支
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }

然后跟SendMessageProcessor###asyncSendMessage方法 方法里有一个preSend方法

代码语言:javascript复制
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
                                                                
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        //省略
    }

然后跟SendMessageProcessor###asyncSendMessage方法

代码语言:javascript复制
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
                                    SendMessageRequestHeader requestHeader) {
       
       //省略

        //检查主题的问题
        super.msgCheck(ctx, requestHeader, response);
        
        //省略
    }

跟进AbstractSendMessageProcessor###msgCheck方法

代码语言:javascript复制
 protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
        final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
            
            //省略
            
            //broker上创建主题,跟进去
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);

           //省略

    }

TopicConfigManager###createTopicInSendMessageMethod 该方法会创建主题并且持久化,此时主题在broker中存在但是NameServer不存在

代码语言:javascript复制
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
        final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
      
                    
                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                            //创建新主题的topic信息
                            topicConfig = new TopicConfig(topic);

                            int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());

                            if (queueNums < 0) {
                                queueNums = 0;
                            }

                            topicConfig.setReadQueueNums(queueNums);
                            topicConfig.setWriteQueueNums(queueNums);
                            int perm = defaultTopicConfig.getPerm();
                            perm &= ~PermName.PERM_INHERIT;
                            topicConfig.setPerm(perm);
                            topicConfig.setTopicSysFlag(topicSysFlag);
                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                        } 
                    } 
                    if (topicConfig != null) {
                       
                        //持久化
                        this.persist();
                    }
                } 

        return topicConfig;
    }

###ConfigManager###persist方法

代码语言:javascript复制
public synchronized void persist() {
        String jsonString = this.encode(true);
        if (jsonString != null) {
            //我的值是C:Users25682storeconfigtopics.json
            String fileName = this.configFilePath();
            try {
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file "   fileName   " exception", e);
            }
        }
    }

MixAll###string2File

代码语言:javascript复制
//str为最新的全部topic信息
public static void string2File(final String str, final String fileName) throws IOException {
        //先把str存到topics.json.tmp里
        String tmpFile = fileName   ".tmp";
        string2FileNotSafe(str, tmpFile);
        //把topics.json里的数据存储到topics.json.bk里
        String bakFile = fileName   ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }
        //删除topics.json
        File file = new File(fileName);
        file.delete();
        //把topics.json.tmp重命名为topics.json
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }

TBW102主题的作用

Producer 在发送消息时,默认情况下,不需要提前创建好 Topic,如果 Topic 不存在,Broker 会自动创建 Topic。但是新创建的 Topic 它的权限是什么?读写队列数是多少呢?这个时候就需要用到TBW102 了,RocketMQ 会基于该 Topic 的配置创建新的 Topic。

参考

深度解析RocketMQ 主题的创建机制,为何生产建议关掉自动创建Topic

https://blog.csdn.net/a1036645146/article/details/109581499

TBW102主题的作用

https://www.modb.pro/db/130866

0 人点赞