问题
在学习RocketMQ的时候,有几个疑问。 如果主题不存在,client把消息发给谁呢? 当发送消息给不存在的主题时,主题是什么时候创建的呢?
猜测
当我执行下面代码时,主题不存在,那么什么时候创建的主题"TopicTest202112151152"呢?
代码语言:javascript复制 Message msg = new Message("TopicTest202112151152" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
SendResult sendResult = producer.send(msg,1000000000);
其实我当时猜测的是可能发现主题不存在时先给服务器发个消息,让其创建主题,然后再发送消息。 结果是:发送消息的时候创建主题
问题1:client发送消息,主题不存在给谁发?
源码跟踪
以下面一段代码为例,要给“TopicTest202112151154”主题发送消息,发送的内容是时间字符串,跟producer.send方法
代码语言:javascript复制// Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
// Launch the instance.
producer.start();
// Create a message instance, specifying topic, tag and message body.
Message msg =
new Message(
"TopicTest202112151154",
"TagA",
(LocalDateTime.now().toString()).getBytes(RemotingHelper.DEFAULT_CHARSET));
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg, 1000000000);
System.out.printf("%s%n", sendResult);
// Shut down once the producer instance is not longer in use.
producer.shutdown();
跟到DefaultMQProducerImpl###sendDefaultImpl方法
代码语言:javascript复制private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//...
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
//....
//...发送消息
}
跟到DefaultMQProducerImpl###tryToFindTopicPublishInfo方法
代码语言:javascript复制private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//首先从本地缓存中获取,因为主题不存在,所以返回null
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//然后从NameServer获取,因为主题不存在,所以返回一个不Ok的TopicPublishInfo
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
//因为TopicPublishInfo不Ok
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//重新获取主题,该方法是重点,跟进去
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
跟到MQClientInstance###updateTopicRouteInfoFromNameServer方法 在该方法中获取默认的主题“TBW102”主题在NameServer的路由信息,把新主题的路由信息参考“TBW102”复制一份,此时在客户端上已经认为新主题已经创建好,不过在服务器端是没有创建好改主题的。
代码语言:javascript复制 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//获取默认主题defaultMQProducer.getCreateTopicKey(),即TBW102的路由信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
//省略。。。
}
//然后按照TBW102的topicRouteData把新主题的topicRouteData创建出来,此时客户端就有了新主题的路由信息(实际是TBW102的路由信息)
return false;
}
此时客户端就有新主题的路由信息了,但是路由信息对应的broker上是没有该主题的信息的,不过客户端此时已经知道把消息发给哪个IP了。
问题回答
客户端如果获取的主题信息不存在,会根据“TBW102”主题的信息创建新主题,然后把该新主题的信息存储到客户端本地,此时客户端知道给哪个IP发数据了,然后客户端就会和那个IP的Netty建立连接,然后发数据,Ok了。
问题2:broker收到消息后发现主题不存在,什么时候创建?
从哪开始打断点
首先你要会Netty,这样按照常理你就能知道逻辑在SimpleChannelInboundHandler里。 那么去哪找SimpleChannelInboundHandler呢,应该先找到NettyServer。NettyServer应该在Broker的启动源码里去找。 BrokerController###start方法里有下面的代码
代码语言:javascript复制if (this.remotingServer != null) {
this.remotingServer.start();
}
remotingServer的实现类选择NettyRemotingServer,里面的start方法里有如下代码
代码语言:javascript复制 ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
其中serverHandler就是MQ自定义的方法,顺藤摸瓜,就找到了NettyServerHandler的channelRead0方法 NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)
代码语言:javascript复制 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
开始跟源码
当客户端发送消息时,broker的断点会停在下面的processRequestCommand这一行
NettyRemotingAbstract###processMessageReceived方法,在processRequestCommand里打条件多线程断,条件是cmd.code == 310(RequestCode.SEND_MESSAGE_V2 = 310)
代码语言:javascript复制 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
NettyRemotingAbstract###processRequestCommand方法 RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd)把任务提交,会到下面代码里的run匿名类里
代码语言:javascript复制public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run =
new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback =
new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
System.out.println(response);
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor =
(AsyncNettyRequestProcessor) pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(
RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
//使用线程池把任务提交
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
}
然后跟SendMessageProcessor###asyncProcessRequest方法
代码语言:javascript复制public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
}
然后跟SendMessageProcessor###asyncProcessRequest方法
代码语言:javascript复制public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
//走这个分支
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
然后跟SendMessageProcessor###asyncSendMessage方法 方法里有一个preSend方法
代码语言:javascript复制private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = preSend(ctx, request, requestHeader);
//省略
}
然后跟SendMessageProcessor###asyncSendMessage方法
代码语言:javascript复制private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageRequestHeader requestHeader) {
//省略
//检查主题的问题
super.msgCheck(ctx, requestHeader, response);
//省略
}
跟进AbstractSendMessageProcessor###msgCheck方法
代码语言:javascript复制 protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
//省略
//broker上创建主题,跟进去
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
//省略
}
TopicConfigManager###createTopicInSendMessageMethod 该方法会创建主题并且持久化,此时主题在broker中存在但是NameServer不存在
代码语言:javascript复制public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
//创建新主题的topic信息
topicConfig = new TopicConfig(topic);
int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
}
}
if (topicConfig != null) {
//持久化
this.persist();
}
}
return topicConfig;
}
###ConfigManager###persist方法
代码语言:javascript复制public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
//我的值是C:Users25682storeconfigtopics.json
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " fileName " exception", e);
}
}
}
MixAll###string2File
代码语言:javascript复制//str为最新的全部topic信息
public static void string2File(final String str, final String fileName) throws IOException {
//先把str存到topics.json.tmp里
String tmpFile = fileName ".tmp";
string2FileNotSafe(str, tmpFile);
//把topics.json里的数据存储到topics.json.bk里
String bakFile = fileName ".bak";
String prevContent = file2String(fileName);
if (prevContent != null) {
string2FileNotSafe(prevContent, bakFile);
}
//删除topics.json
File file = new File(fileName);
file.delete();
//把topics.json.tmp重命名为topics.json
file = new File(tmpFile);
file.renameTo(new File(fileName));
}
TBW102主题的作用
Producer 在发送消息时,默认情况下,不需要提前创建好 Topic,如果 Topic 不存在,Broker 会自动创建 Topic。但是新创建的 Topic 它的权限是什么?读写队列数是多少呢?这个时候就需要用到TBW102 了,RocketMQ 会基于该 Topic 的配置创建新的 Topic。
参考
深度解析RocketMQ 主题的创建机制,为何生产建议关掉自动创建Topic
https://blog.csdn.net/a1036645146/article/details/109581499
TBW102主题的作用
https://www.modb.pro/db/130866