RocketMQ(三):面对高并发请求,如何高效持久化消息?

2024-09-18 09:09:30 浏览数 (1)

RocketMQ(三):面对高并发请求,如何高效持久化消息?

上篇文章我们分析完RocketMQ发送消息的原理,得到结果客户端会通过RPC组件向Broker进行通信

Broker收到请求后需要将消息进行持久化,一旦涉及到持久化,服务器的性能会急速降低,并且消费者进行消费时还需要读取消息,从磁盘的读取也是会大大降低性能的

本篇文章就要来分析,RocketMQ的Broker在这种频繁读写的场景,是如何进行高效读写的

存储文件

Broker这种频繁读写的场景中,提供三种文件满足持久化消息时的高效读写,分别为CommitLog、ConsumerQueue、IndexFile

CommitLog

为了避免消息持久化时的写入性能开销过大,Broker采用顺序写入,无论消息属于哪个Topic又或者是哪个队列,都顺序写入CommitLog文件

CommitLog文件目录下以起始偏移量进行命名,每个文件固定为1G,从00000000000000000000文件开始,接着是00000000001073741824,然后是00000000002147483648文件,后续以此类推

这些以偏移量命名的文件在源码中被定义为MappedFile,从名字可以看出它使用内存映射mmap,在频繁读写、大文件的场景,使用mmap避免数据拷贝的性能开销

由于消息大小不一,持久化到MappedFile时每条消息偏移量也不是固定的

MappedFile的创建是需要扩容时才创建的,一连串的MappedFile组成CommitLog文件

ConsumerQueue

虽然顺序写能够大大节省写入的开销,但是并不方便查询,因为消息被写顺序写入CommitLog时,并没有区分是哪个Topic、队列的

这样在进行消费时要读取消息,根据topic、队列、队列上的偏移量来获取消息时,遍历查找是不现实的,因此还要生成逻辑文件,便于进行查找

ConsumerQueue(消费队列),以Topic进行一级分类,然后再以Topic下的队列ID进行二级分类,队列下的每个文件(固定大小6,000,000 B)可以存储30W条ConsumerQueue记录(20 B)

并且命名也是以起始偏移量进行命名,比如00000000000000000000,00000000000006000000,00000000000012000000

ConsumerQueue记录固定大小为20B,其中8B记录对应消息在CommitLog上的偏移量、4B记录消息长度、8B记录tag哈希,其中依靠前两个字段可以快速找到CommitLog中的消息

ComsumerQueue与CommitLog文件的关系,类比MySQL数据库中的二级索引与聚簇索引

通过二级索引找到满足条件的记录后,回表快速定位到聚簇索引上的记录(如果不理解MySQL索引的同学可以查看MySQL进阶专栏)

最后一个字段tag哈希用于消息过滤,消费者拉取指定tag消息时,如果哈希不满足就不会进行拉取

在图中ConsumerQueue文件以Topic进行分类,分为Topic A、B两个文件,其中TopicA下根据队列ID存在0、1、2三个文件

文件中存储的记录由commitlog offset消息在commitlog的偏移量、size消息大小和tag哈希值组成

消费者组中的消费者A向队列0、1拉取消息,消费者B向队列2拉取消息

拉取消息时Broker根据ConsumerQueue记录的commitlog offset可以在CommitLog文件中找到需要的消息

需要注意的是,CommitLog与ConsumerQueue文件虽然占用空间不同,但底层都是使用MappedFile的,一个MappedFile相当于一个文件

IndexFile

IndexFile文件为哈希索引,文件分为三个部分:文件头、哈希槽、索引项

文件头用于存储通用数据并固定为40B

代码语言:java复制
public void load() {
    //最早消息存储到commitlog时间 8B 用来计算时间差
    this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
    //最晚时间 8B
    this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
    //消息存储到CommitLog最小偏移量 8B
    this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));
    //消息存储到CommitLog最大偏移量 8B
    this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));
	//最大可以存储的哈希槽数量
    this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));
    //以使用的索引数量
    this.indexCount.set(byteBuffer.getInt(indexCountIndex));
    //从1开始
    if (this.indexCount.get() <= 0) {
    	this.indexCount.set(1);
    }
}

哈希槽每个占用4B,用来寻址,能够找到索引项

索引项每个占用20B,4B存储哈希值、8B存储消息在CommitLog中的偏移量、4B存储与前一个消息持久化的时间差(单位秒,用来时间范围查询)、4B存储下个索引项的偏移量,用于寻址(哈希冲突时链地址法)

文件大小 = 文件头 40B 哈希槽数量 * 哈希槽固定大小 4B 索引数量 * 索引固定大小 4B

代码语言:java复制
int fileTotalSize =  IndexHeader.INDEX_HEADER_SIZE   (hashSlotNum * hashSlotSize)   (indexNum * indexSize);

使用流程与哈希表类似:

  1. 通过topic # key的形式来计算哈希值,哈希值模上哈希槽的数量就找到对应的哈希槽
  2. 通过哈希槽找到对应的索引项,对比哈希值
  3. 哈希值相同则获取偏移量,再去CommitLog寻找
  4. 哈希值不相同则根据联表向后查找下一个索引项

Broker存储消息的流程中除了CommitLog、ConsumerQueue、IndexFile文件外,还会存储其他文件,后文用到了再说,最终Broker存储的架构图如下:

数据刷盘与同步

在分析原理前,再来说下数据刷盘和同步的几种方式:

数据刷盘:将操作系统内核缓冲区中的数据刷入磁盘

刷盘可以分为同步、异步两种方式,默认异步:

  1. 同步:向内核缓冲区写完数据后开始等待,直到对应的脏页刷入磁盘再返回;性能差,但可靠性好
  2. 异步:向内核缓冲区写完数据后立即返回,根据频率、页数等配置将脏页数据刷入磁盘;性能好,但可靠性差

数据同步:主Broker持久化数据后,再将数据同步给从Broker,也叫主从复制

主从复制也分为同步、异步两种方式,默认异步:

  1. 同步:向内核缓冲区写完数据后开始等待,直到从broker也持久化完数据;性能差,但可靠性好
  2. 异步:向内核缓冲区写完数据后直接返回,异步线程对从broker进行数据同步

后文将会从源码的角度进行分析

存储原理

因为上篇分析过生产者发送消息流程的原理,介绍完存储文件后,我们从Broker接收消息的源码开始分析,正好接上原理分析的流程

(Broker的源码去GitHub拉下来查看,如果要跑起来,还要配置Broker的一些参数)

Netty服务器接收请求

上篇文章发送消息RPC时说过RocketMQ网络通信使用的Netty框架,那么Broker作为服务端一定会有Netty的服务器

在这个Netty服务器中pipeline可以配置一些处理器用于处理请求、响应

NettyRemotingServer.start

在启动的流程中,会将NettyServerHandler加入pipeline处理请求

代码语言:java复制
	ServerBootstrap childHandler =
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
        .option(ChannelOption.SO_REUSEADDR, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                    .addLast(defaultEventExecutorGroup,
                        encoder,
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                        connectionManageHandler,
                        //NettyServerHandler 加入 pipeline     
                        serverHandler
                    );
            }
        });

不认识netty的同学也不要慌,就当它是个网络通信的组件

NettyServerHandler处理请求时会调用processMessageReceived方法

代码语言:java复制
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

processMessageReceived方法会根据类型分请求/响应两种情况进行处理

代码语言:java复制
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

调用processRequestCommand方法,会先通过请求的类型获取Pair

Pair中存在NettyRequestProcessor(处理请求)和ExecutorService(对应的线程池)

然后将任务提交到线程池,任务代码我先省略,待会查看,如果不支持请求类型会直接响应

代码语言:java复制
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //通过请求code获取Pair,NettyRequestProcessor为处理请求,ExecutorService为对应的线程池
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        //任务..

        try {
            //将任务提交到线程池
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
			//...
        }
    } else {
        //响应 请求类型不支持处理
        String error = " request type "   cmd.getCode()   " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
    }
}

任务中主要做几件事:

  1. 解析请求地址
  2. 执行处理前的钩子
  3. 封装异步回调,包括:执行处理后的钩子和写回响应
  4. 调用NettyRequestProcessor处理器的处理方法,如果是异步就携带回调,如果不是则执行完再调用下回调(流程类似)
代码语言:java复制
Runnable run = new Runnable() {
    @Override
    public void run() {
        try {
            //解析请求地址
            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            //rpc处理前的钩子
            doBeforeRpcHooks(remoteAddr, cmd);
            
            //异步回调 处理后的钩子和写响应
            final RemotingResponseCallback callback = new RemotingResponseCallback() {
                @Override
                public void callback(RemotingCommand response) {
                    //处理后的钩子
                    doAfterRpcHooks(remoteAddr, cmd, response);
                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
                            try {
                                //写响应
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                //...
                            }
                        } 
                    }
                }
            };
            //调用处理器的方法
            if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                processor.asyncProcessRequest(ctx, cmd, callback);
            } else {
                NettyRequestProcessor processor = pair.getObject1();
                RemotingCommand response = processor.processRequest(ctx, cmd);
                callback.callback(response);
            }
        } catch (Throwable e) {
            //...
        }
    }
};
SendMessageProcessor 处理请求
代码语言:java复制
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
}

public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
}

处理请求的处理器SendMessageProcessor是异步的,因此会调用asyncProcessRequest方法

代码语言:java复制
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
    asyncProcessRequest(ctx, request).
        thenAcceptAsync(responseCallback::callback, this.brokerController.getPutMessageFutureExecutor());
}

asyncProcessRequest 处理请求分成两种情况:

  1. 消费者发送的,调用 asyncConsumerSendMsgBack
  2. 剩下的情况就是生产者发送的消息,需要解析请求头、执行处理发送消息前的钩子、查看是否批量消息(分情况处理)
代码语言:java复制
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                              RemotingCommand request) throws RemotingCommandException {
    final SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.asyncConsumerSendMsgBack(ctx, request);
        default:
            //解析请求头
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return CompletableFuture.completedFuture(null);
            }
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            //执行处理发送消息前的钩子
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            //是否批量消息 分情况处理
            if (requestHeader.isBatch()) {
                return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
            }
    }
}

查看普通消息的情况,调用 asyncSendMessage,才开始真正处理发送消息的请求:

  1. 获取消息体、队列ID、Topic等信息,将数据注入MessageExtBrokerInner
  2. 是否为事务消息,使用MessageExtBrokerInner后续执行
代码语言:java复制
private CompletableFuture<RemotingCommand> asyncSendMessage(
    ChannelHandlerContext ctx, RemotingCommand request,
    SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {
    
    //生成通用响应
    final RemotingCommand response = preSend(ctx, request, requestHeader);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    if (response.getCode() != -1) {
        return CompletableFuture.completedFuture(response);
    }

    //获取消息体、队列ID、Topic等信息
    final byte[] body = request.getBody();
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    //将信息封装到MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    msgInner.setBody(body);
    //set...
    
    CompletableFuture<PutMessageResult> putMessageResult = null;
    //分情况处理,一个是事务消息,另一个是普通消息
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (Boolean.parseBoolean(transFlag)) {
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    //写响应
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

继续查看普通消息的后续流程,this.brokerController.getMessageStore().asyncPutMessage(msgInner)

getMessageStore会获取消息存储的组件MessageStore进行后续的存储流程

MessageStore 存储消息

MessageStore负责消息的存储,除了写CommitLog文件外,还要去写ConsumerQueue、IndexFile等文件

采用DefaultMessageStore处理消息:

(我们先把写CommitLog的主流程继续走完)

  1. 先检查存储、消息、本地MQ是否正常,这里的本地MQ并不是指RocketMQ,而是本地内存MQ
  2. 调用CommitLog处理消息(历尽千辛万苦终于看到熟悉的CommitLog了~)
  3. 处理完统计耗时、失败次数等
代码语言:java复制
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    //检查存储、消息、本地MQ
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
	//...
    
    long beginTime = this.getSystemClock().now();
    //调用CommitLog处理消息
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
	
    //处理后要统计下耗时,在哪个时间段,累计失败次数等
    putResultFuture.thenAccept(result -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().add(1);
        }
    });

    return putResultFuture;
}
写CommitLog
CommitLog追加消息

CommitLog.asyncPutMessage

终于到达CommitLog存储消息的流程,这里的流程还是比较重要的:

  1. 写前加锁 (由于会顺序写同一个文件,那一定会出现并发写数据的情况,因此要加锁)
  2. 获取最后一个MappedFile,没有就创建 (MappedFile就是对应以偏移量命名的文件,第一个文件不一定为00000000000000000000,因为消费完会删除,第一个文件为偏移量最小的)
  3. 使用MappedFile追加消息
  4. 处理结果,写后释放锁
  5. 提交刷盘请求 (mmap只是将数据写到page cache,还需要根据同步/异步刷盘策略再进行刷盘)
  6. 提交主从复制的请求 (也是同步/异步的策略进行主从复制)
  7. 刷盘、主从复制请求完成后才返回
代码语言:java复制
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        //处理事务消息 省略...

        //...
    
    	//加锁 自旋锁或可重入锁
        putMessageLock.lock(); 
        try {
            //获取mappedFileQueue队列中最后一个MappedFile
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            
            //mappedFile为空要创建
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            
			//mappedFile追加消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            
            //处理结果
            switch (result.getStatus()) {
                //...
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        } finally {
            beginTimeInLock = 0;
            //释放锁
            putMessageLock.unlock();
        }

    	//...
    
        //提交刷盘请求
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    	//提交传输从节点数据请求
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    
    	//都提交后返回
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
        });
    }

mappedFile追加消息就是计算偏移量,再将数据写入缓冲区

创建MappedFile

CommitLog追加消息的流程中,如果MappedFile不存在,则会创建,最终调用doCreateMappedFile

代码语言:java复制
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
    MappedFile mappedFile = null;

    //采用allocateMappedFileService创建
    if (this.allocateMappedFileService != null) {
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
    } else {
        //否则自己创建
        try {
            mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
            log.error("create mappedFile exception", e);
        }
    }

    return mappedFile;
}

AllocateMappedFileService是负责创建MappedFile的组件,会将请求放入内存队列this.requestQueue.offer后续由它的线程取出请求进行创建

刷盘

追加完消息后会提交两个请求:

submitFlushRequest提交刷盘请求,主要分两种情况:同步、异步

submitReplicaRequest提交主从复制请求,也是分为同步情况与刷盘类似,只是增加HA的组件

代码语言:java复制
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // Synchronization flush 同步
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset()   result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            flushDiskWatcher.add(request);
            //GroupCommitService添加请求
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else  {
            //异步唤醒
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

如果是同步,则会使用GroupCommitService进行同步刷盘,将请求放入它的队列

如果是异步,则唤醒FlushRealTimeService进行刷盘

默认情况下,刷盘策略为异步

返回后,由外层的 DefaultMessageStorewaitForPutResult 进行阻塞等待结果

代码语言:java复制
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    return waitForPutResult(asyncPutMessage(msg));
}
同步刷盘

CommitLog 在构造时根据字段 flushDiskType 判断刷盘策略是同步还是异步(默认异步)

同步使用GroupCommitService,异步使用FlushRealTimeService

代码语言:java复制
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    //...
    
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        this.flushCommitLogService = new FlushRealTimeService();
    }
}

GroupCommitService负责同步刷盘,提供两个队列

requestsWrite负责写,提交刷盘任务时会被放入队列中

requestsRead负责读,每次刷盘时取出请求进行刷盘

代码语言:java复制
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();

GroupCommitService 会轮询取出请求进行刷盘

代码语言:java复制
public void run() {

    while (!this.isStopped()) {
        try {
            //等待 其中会交换队列
            this.waitForRunning(10);
            //取出请求 刷盘
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName()   " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn(this.getServiceName()   " Exception, ", e);
    }

    //结束前再刷盘兜底
    synchronized (this) {
        this.swapRequests();
    }
    this.doCommit();
}

waitForRunning 等待期间会调用交互请求队列,将读写队列互换

代码语言:java复制
private void swapRequests() {
    lock.lock();
    try {
        LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    } finally {
        lock.unlock();
    }
}

doCommit 会遍历读队列进行刷盘,通过比较偏移量判断是否刷盘成功,刷盘可能涉及两个mappedFile,因此可能循环两次

代码语言:java复制
private void doCommit() {
    if (!this.requestsRead.isEmpty()) {
        for (GroupCommitRequest req : this.requestsRead) {
            // There may be a message in the next file, so a maximum of
            // two times the flush
            //通过比较偏移量判断是否刷盘成功
            boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            //刷盘可能涉及两个mappedFile因此可能要循环两次
            for (int i = 0; i < 2 && !flushOK; i  ) {
                CommitLog.this.mappedFileQueue.flush(0);
                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            }

            //填充结果:成功 或 刷盘超时
            req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }

        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
        if (storeTimestamp > 0) {
            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
        }
        this.requestsRead = new LinkedList<>();
    } else {
        // Because of individual messages is set to not sync flush, it
        // will come to this process
        // 读队列没数据的情况下直接刷
        CommitLog.this.mappedFileQueue.flush(0);
    }
}

其中 flush(0) 表示刷盘没有最低页数的要求,会尽力刷盘

最终调用 force 进行刷盘,fileChannel 是文件的channel,mappedByteBuffer 是它的直接内存缓冲区

代码语言:java复制
if (writeBuffer != null || this.fileChannel.position() != 0) {
    this.fileChannel.force(false);
} else {
    this.mappedByteBuffer.force();
}

值得思考的是读写队列都采用线程不安全的LinkedList,在并发提交刷盘任务的情况下需要加锁同步,那为啥不使用JUC下的队列呢?

也许是因为刷盘读队列消费时为单线程并不需要使用同步手段,最终才选择LinkedList

异步刷盘

FlushRealTimeService 负责异步刷盘

当使用异步刷盘时,也是通过轮询刷盘,可以通过配置参数调整刷盘频率、每次刷盘最少的页数等

代码语言:java复制
public void run() {
    CommitLog.log.info(this.getServiceName()   " service started");

    while (!this.isStopped()) {
        //是否使用Thread.sleep 默认true
        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
		//刷盘频率 默认500ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        //每次最少刷屏页数 默认4页
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
		
        //...

        try {
            //等待 刷盘频率的时间
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }
            //刷盘
            CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            //...
        } catch (Throwable e) {
            CommitLog.log.warn(this.getServiceName()   " service has exception. ", e);
            this.printFlushProgress();
        }
    }

    // Normal shutdown, to ensure that all the flush before exit
    //结束前 兜底刷盘
    boolean result = false;
    for (int i = 0; i < RETRY_TIMES_OVER && !result; i  ) {
        result = CommitLog.this.mappedFileQueue.flush(0);
        CommitLog.log.info(this.getServiceName()   " service shutdown, retry "   (i   1)   " times "   (result ? "OK" : "Not OK"));
    }

    this.printFlushProgress();

    CommitLog.log.info(this.getServiceName()   " service end");
}

与同步刷盘不同:刷盘频率很少(等待时间久)、每次刷盘并不是尽力刷而是根据最少页数进行刷盘

小结

至此CommitLog文件被刷入磁盘,主从复制的“支线”后续文章再来分析

由于消息写入CommitLog流程漫长、复杂,我们总结核心流程,简化流程:

  1. Netty服务器接收请求
  2. SendMessageProcessor将请求转化为MessageExtBrokerInner (Broker内部处理的消息)
  3. MessageStore 检查消息、存储状态,调用CommitLog持久化
  4. CommitLog 写前加锁防并发写
  5. CommitLog 没有MappedFile或者已满,将请求放入内存队列,通过AllocateMappedFileService异步创建
  6. CommitLog 使用最后一个MappedFile,追加数据,最终通过fileChannel或它的缓冲区mappedByteBuffer进行force刷盘
  7. CommitLog 写完释放锁
  8. CommitLog 提交刷盘请求:默认异步刷盘根据配置的频率和每次刷盘页数进行刷盘,同步刷盘会将请求提交到写队列,循环消费,消费前置换读写队列,取出请求尽量刷盘
  9. CommitLog 提交主从复制请求
  10. MessageStore 刷盘、主从复制请求完成后返回

再精简下流程:Netty -> SendMessageProcessor -> MessageStore -> CommitLog -> MappedFile -> fileChannel/mappedByteBuffer force

当消费者获取消息后,CommitLog中存储的消息就不需要保存了,因此会有清理线程来进行定时清理

实现定时清理的组件是MessageStore下的CleanCommitLogService,实际上就是操作MappedFile,销毁关闭资源,这里就不过多叙述

写ConsumerQueue

整理完“主线”,消息持久化并没有结束,还有部分的“支线”要挖掘

(就像黑神话悟空一样,要把图舔干净才有美妙的探索感~)

与CommitLog文件对标的ConsumerQueue、IndexFile文件似乎没有出现主流程的源码中,它们是啥时候生成的?接下来让我们继续分析:

ReputMessageService 重投消息

DefaultMessageStore下的**ReputMessageService用于重投消息,它会根据CommitLog上的偏移量封装成请求,重投给其他CommitLogDispatcher进行后续处理**

常见的CommitLogDispatcher有写ConsumerQueue的CommitLogDispatcherBuildConsumeQueue、写IndexFile的CommitLogDispatcherBuildIndex

线程循环执行,每次循环等待1ms,然后调用doReput,其中reputFromOffset为它记录的偏移量

  1. 根据重投偏移量获取CommitLog(封装为SelectMappedBufferResult)
  2. 然后每次循环检查、解析每条消息,封装为DispatchRequest
  3. 如果成功就使用对应处理器进行调用 doDispatch
代码语言:java复制
	private void doReput() {
        	//如果reputFromOffset偏移量比commitLog最小偏移量还小,就从最小偏移量开始重投
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
        
        	//偏移量没超过commitLog最大就一直循环重投
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

				//根据偏移量获取MappedFile以及缓冲池
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();
						//每次获取一条消息进行调用
                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            //检查并解析每条消息
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    //成功进行调用
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                                    
                                    //如果不止从节点且 开启长轮询 且 消息到达监听器不为空 会调用消息到达监听器 用于消费的长轮询(下篇文章再说消息消费)
									if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
                                            && DefaultMessageStore.this.messageArrivingListener != null) {
                                        //调用消息到达监听器
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset()   1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                        notifyMessageArrive4MultiQueue(dispatchRequest);
                                    }
                                    //...
                                } else if (size == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } 
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }

doDispatch方法会遍历 CommitLogDispatcher 进行处理

代码语言:java复制
public void doDispatch(DispatchRequest req) {
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}
CommitLogDispatcherBuildConsumeQueue 追加数据

CommitLogDispatcherBuildConsumeQueue 会调用putMessagePositionInfo

代码语言:java复制
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}

先通过topic、队列id获取对应的ConsumerQueue,然后调用putMessagePositionInfo

先在ConsumerQueue的缓冲区上写ConsumerQueue记录(消息偏移量、消息大小、tag哈希),然后获取映射文件MappedFile,最后再往文件里追加数据

代码语言:java复制
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    //先在自己的缓冲区上写数据
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    //8B 消息在CommitLog偏移量
    this.byteBufferIndex.putLong(offset);
    //4B 消息大小
    this.byteBufferIndex.putInt(size);
    //8B tag哈希
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    //获取MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        //...
        
        //往文件里追加消息
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

注意MappedFile是映射文件,虽然ConsumerQueue与CommitLog都是往MappedFile追加数据,但它们的映射的文件不同

其实流程是类似的,这里只追加了数据,剩下的刷盘还要其他组件异步去做

FlushConsumeQueueService 异步刷盘

FlushConsumeQueueService 负责ConsumerQueue文件的异步刷盘

根据配置flushIntervalConsumeQueue刷盘频率默认1000ms,也就是循环中的等待时间,等待后调用doFlush进行刷盘

流程与异步刷盘类型,也是读取刷盘页数、频率、间隔等参数进行刷盘

不同的是ConsumerQueue文件要从Topic、队列ID目录一层一层遍历刷盘

代码语言:java复制
private void doFlush(int retryTimes) {
    //刷盘页数 默认2页
    int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();

    if (retryTimes == RETRY_TIMES_OVER) {
        flushConsumeQueueLeastPages = 0;
    }

    long logicsMsgTimestamp = 0;
	//两次刷盘间隔 默认60s
    int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
    long currentTimeMillis = System.currentTimeMillis();
    if (currentTimeMillis >= (this.lastFlushTimestamp   flushConsumeQueueThoroughInterval)) {
        //如果超过间隔时间 则尽量刷盘
        this.lastFlushTimestamp = currentTimeMillis;
        flushConsumeQueueLeastPages = 0;
        logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
    }

    //Key为Topic  Value中的Key则是Topic下的队列ID,ConsumerQueue则是队列ID目录下的ConsumerQueue文件
    ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

    //遍历Topic
    for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
        //遍历队列
        for (ConsumeQueue cq : maps.values()) {
            //遍历队列下的ConsumerQueue文件
            boolean result = false;
            for (int i = 0; i < retryTimes && !result; i  ) {
                //ConsumerQueue文件刷盘
                result = cq.flush(flushConsumeQueueLeastPages);
            }
        }
    }

    //...
}

ConsumerQueue刷盘优先对自己进行刷盘,如果开启扩展文件(consumerqueue_ext)也会对其进行刷盘

代码语言:java复制
public boolean flush(final int flushLeastPages) {
    boolean result = this.mappedFileQueue.flush(flushLeastPages);
    if (isExtReadEnable()) {
        result = result & this.consumeQueueExt.flush(flushLeastPages);
    }

    return result;
}

this.mappedFileQueue.flush刷盘上文以及说过,就不再分析了

ConsumerQueue记录对应的消息被消费完就不需要进行存储,因此也会有清理文件的组件

清理文件的组件是MessageStore下的CleanConsumeQueueService,同时它也会去清理InfexFile文件(流程类似,这里就不多说了)

至此ConsumerQueue文件也进行了持久化,IndexFile文件追加数据的原理类似

写IndexFile

写IndexFile也是由ReputMessageService重投的,由CommitLogDispatcherBuildIndex进行处理

会先获取IndexFile,最终调用getAndCreateLastIndexFile,主要流程为:

  1. 先从列表中获取最后一个IndexFile文件 (期间加读锁,因为列表是线程不安全的)
  2. 如果没获取到或者已满,则创建IndexFile后,加写锁放入列表再解锁
  3. 如果创建IndexFile要启动刷盘的线程持久化上一个IndexFile
代码语言:java复制
public IndexFile getAndCreateLastIndexFile() {
    IndexFile indexFile = null;
    IndexFile prevIndexFile = null;
    long lastUpdateEndPhyOffset = 0;
    long lastUpdateIndexTimestamp = 0;

    {
        //如果列表中有IndexFile则取最后一个IndexFile 期间加读锁
        this.readWriteLock.readLock().lock();
        if (!this.indexFileList.isEmpty()) {
            IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
            if (!tmp.isWriteFull()) {
                indexFile = tmp;
            } else {
                //已满也要创建
                lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                prevIndexFile = tmp;
            }
        }

        this.readWriteLock.readLock().unlock();
    }

    if (indexFile == null) {
        try {
            //如果没获取到则创建IndexFile
            String fileName =
                this.storePath   File.separator
                      UtilAll.timeMillisToHumanString(System.currentTimeMillis());
            indexFile =
                new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                    lastUpdateIndexTimestamp);
            
            //并把它放入列表中 期间加写锁
            this.readWriteLock.writeLock().lock();
            this.indexFileList.add(indexFile);
        } catch (Exception e) {
            log.error("getLastIndexFile exception ", e);
        } finally {
            this.readWriteLock.writeLock().unlock();
        }

        if (indexFile != null) {
            //开启负责刷盘的线程 将上一个文件进行刷盘
            final IndexFile flushThisFile = prevIndexFile;
            Thread flushThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    IndexService.this.flush(flushThisFile);
                }
            }, "FlushIndexFileThread");

            flushThread.setDaemon(true);
            flushThread.start();
        }
    }

    return indexFile;
}

获取IndexFile后,就会调用putKey构建哈希索引

上篇文章说过,在默认消息发送的实现中,如果消息不是批量消息则会设置唯一标识

这里的Key是通过topic、#、唯一标识拼接而成的

代码语言:java复制
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        int keyHash = indexKeyHashMethod(key);
        //哈希槽的位置
        int slotPos = keyHash % this.hashSlotNum;
        //哈希槽偏移量
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE   slotPos * hashSlotSize;

        try {
			//哈希槽的值 可以找到
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            //计算时间差 单位秒
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
            timeDiff = timeDiff / 1000;
            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            //计算要写入索引项的偏移量位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE   this.hashSlotNum * hashSlotSize
                      this.indexHeader.getIndexCount() * indexSize;

            //写入索引项记录 
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos   4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos   4   8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos   4   8   4, slotValue);

            //覆盖哈希槽的值 下次再通过哈希槽找到的是这个新的索引项 头插法
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            //...
        } catch (Exception e) {
            log.error("putKey exception, Key: "   key   " KeyHashCode: "   key.hashCode(), e);
        }
    } 
    //...
}
小结

至此已经描述完从接收消息到消息持久化CommitLog的主流程,以及异步持久化ConsumerQueue与IndexFile文件的流程

为了达到高性能,在这个持久化的过程中并不是同步的,也不是原子操作,这种持久化设计采用的是数据最终一致性,一旦节点宕机,文件漏写就会导致数据不一致

为了解决这个问题,需要在broker启动时判断上次是否为异常宕机,持久化文件是否写齐,如果不齐需要有对应的数据恢复过程(本文就不分析这部分的源码了,后续文章再说)

因此还会涉及到消息的重投,从而引起重复消费,这也是RocketMQ在保证消息不丢的同时,不能保证消息不重复

最终将相关操作绘制成一张流程图,如下:

总结

Broker为了持久化消息会写很多文件,其中主要为CommitLog、ConsumerQueue、IndexFile文件

为了实现高性能的写入,写入文件通常都是使用mmap(内存映射)对应源码中的MappedFile

CommitLog为消息顺序写入的文件,通过顺序写的方式提高写入性能,文件名以起始偏移量命名,固定1G,消息被消费后会删除

ConsumerQueue文件为消息的逻辑文件,会以Topic、队列ID、偏移量进行多级目录分类,每个也是以起始偏移量命名,固定6MB可以存储30W条ConsumerQueue记录

ConsumerQueue记录固定为20B,8B记录消息在CommitLog上的偏移量,4B记录消息大小,8B记录tag哈希值,通过前两个字段可以快速在CommitLog中找到对应消息,tag哈希值用于消息过滤,要拉取指定tag消息时,如果tag哈希值不对则过滤

IndexFile索引文件,可以看成消息Key和消息在CommitLog上偏移量的哈希索引文件,key由topic和消息唯一标识组成,通过key的哈希值模上哈希槽数量得到对应的哈希槽,根据哈希槽找到对应索引项,索引项上存储消息偏移量,能够快速找到消息(如果冲突则根据指针寻找下一个索引项)

默认刷盘和主从复制的方式都是异步,性能好但可靠性不好,同步虽然性能差但可靠性较好

Broker会使用Netty接收请求,再通过各种Processor对各种请求进行处理,如果是发送消息的请求最终会使用MessageStore进行存储

MessageStore复制消息的存储,读、写、清理、管理各种消息持久化相关文件

MessageStore会调用CommitLog进行写消息,CommitLog则是通过它的MappedFile进行写数据,在此期间可能多个线程需要写同一个MappedFile,因此需要加锁

如果没有或需要扩容,就要创建MappedFile,CommitLog将创建MappedFile的操作异步交给AllocateMappedFileService初始化MappedFile

MappedFile写完数据,数据就被映射在内核缓冲区,但此时还没有刷入磁盘,写完数据,还需要提交刷盘请求和同步请求

如果使用的是异步刷盘,则唤醒FlushRealTimeService的线程,根据配置的频率、页数、间隔等参数进行异步刷盘

如果使用的是同步刷盘,则将请求放入GroupCommitService的写队列中(需要使用同步手段防止并发),后续如果没返回值会阻塞

GroupCommitService会每隔10ms循环消费读队列中的请求进行刷盘,消费前等待时会将读写队列转换(因为写队列可能多线程写,而读队列只有一个线程读)

同步请求会交给HAService进行处理,流程类型刷盘,只不过过程是网络通信

最后主流程会等待刷盘、同步请求执行完,同步的话会阻塞,如果期间超时则会返回对应的响应状态,标识消息持久化可能失败(CommitLog持久化流程结束)

消费被消费后则不需要再存储,MessageStore会使用CleanCommitLogService定时清理

写完主要的CommitLog文件后,MessageStore会异步使用ReputMessageService重投消息,根据它的偏移量获取CommitLog文件,然后封装每一条消息交给CommitLogDispatcher调度器执行

写ConsumerQueue文件由CommitLogDispatcherBuildConsumeQueue进行调度,通过消息的Topic和队列ID找到对应的ConsumerQueue,再往对应MappedFile上追加数据

对于ConsumerQueue文件的刷盘,MessageStore会使用FlushConsumeQueueService读取配置异步进行刷盘,由于ConsumerQueue根据Topic、队列ID进行多级分类,因此刷盘时也要多层遍历刷盘

同时MessageStore会使用CleanConsumeQueueService定时清理过期的ConsumerQueue、IndexFile文件

写IndexFile文件由CommitLogDispatcherBuildIndex调度,它会调用IndexFileService对IndexFile文件进行追加数据,期间创建新的IndexFile才对上一个文件进行异步刷盘

为了高性能,在消息持久化的过程中使用顺序写、MMap、最终一致性等特点,节点宕机可能导致文件数据未写入,因此启动时会检查文件是否写入成功,如果写入文件不齐全,还会涉及到恢复文件,消息重投,可能导致消息重复消费

最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

0 人点赞