云原生中间件RocketMQ-生产者消息返回状态,延迟消息,自定义消息发送规则,netty框架部分代码分析

2022-11-28 15:54:05 浏览数 (1)

文章目录

    • 生产者消息返回状态
      • FLUSH_DISK_TIMEOUT
      • FLUSH_SLAVE_TIMEOUT
      • SLAVE_NOT_AVAILABLE
      • SEND_OK
    • 延迟消息
    • 自定义消息发送规则 MessageQueueSelector
    • Netty底层框架解析
      • NettyRemotingServer
        • start() 方法
      • NettyRemotingClient

生产者消息返回状态

FLUSH_DISK_TIMEOUT

如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout (默认是 5 秒)设置的时间内完成刷盘,就会收到此状态码。

FLUSH_SLAVE_TIMEOUT

如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定时间内完成同步,就会收到此状态码。

SLAVE_NOT_AVAILABLE

如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。

SEND_OK

这个状态可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。需要注意的是,SEND_OK 并不意味着可靠,如果想严格确保没有消息丢失,需要开启 SYNC_MASTER or SYNC_FLUSH。 如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着消息会丢失,有2个选择,一是无所谓,适用于消息不需要保证不丢失的场景,二是重发,但可能产生消息重复,这就需要consumer进行去重控制。如果收到了 SLAVE_NOT_AVAILABLE 就要及时进行处理了。

延迟消息

延迟消息:消息发到Broker后,要特定的时间才会被Consumer消费。 目前只支持固定精度的定时消息,可以在rocketmq-store模块MessageStoreConfig配置类中看到对应的定时消息的配置。

代码语言:javascript复制
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

设置延迟发送消息等级:

代码语言:javascript复制
//	创建消息
//	主题
Message message = new Message("test_quick_topic",
                              //	标签
                              "TagA",
                              // 	用户自定义的key ,唯一的标识
                              "key"   i,
                              //	消息内容实体(byte[])
                              ("Hello RocketMQ"   i).getBytes());	
// 设置延迟等级为3,也就是10s发送一条
message.setDelayTimeLevel(3);

自定义消息发送规则 MessageQueueSelector

如何把消息发送到指定的队列(Message Queue)?

代码语言:javascript复制
// 同步发送消息,直接获取发送结果(指定第二个队列)
SendResult sr = producer.send(message, new MessageQueueSelector() {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer queueNumber = (Integer)arg;
        return mqs.get(queueNumber);
    }
}, 2);
System.err.println(sr);

Netty底层框架解析

NettyRemotingServer实现Netty服务器端功能,接受数据包,在服务器端处理后发送给客户端。 NettyRemotingClient实现Netty客户端功能。

NettyRemotingServer

start() 方法

start方法主要启动Netty服务器,并在绑定端口后阻塞主线程。这里主要看看Netty服务器端装配了哪些ChannelHandler:

代码语言:javascript复制
ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            // SocketChannel添加ChannelHandler
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

HandshakeHandler:检测传输的包体是否使用TLS协议(传输层安全性协议,Transport Layer Security)传输 ,如果包体使用TSL协议,将会在Pipeline中加入处理TSL协议握手的Handler。

代码语言:javascript复制
ctx.pipeline()
    .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
    .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());

NettyEncoder/NettyDecoder:RocketMQ自定义的编解码Handler,其中编码器将RemotingCommand(RocketMQ的服务器端和客户端交互的数据结构)序列化,其中序列化的方式有json或者二进制,具体编解码方式这里不讨论了。而解码器NettyDecoder继承LengthFieldBasedFrameDecoder,基于长度编解码方式,将二进制反序列化为RemotingCommand。 IdleStateHandler:Netty包中定义的心跳检测包。读写超时时间由NettyServerConfig.serverChannelMaxIdleTimeSeconds变量控制,默认时间120s。 NettyConnectManageHandler:Channel连接的管理handler,当发生channel连接的激活、失效、超时和异常时,NettyRemotingServer会生成一个Netty事件,管理连接的组件相应的会处理事件。

代码语言:javascript复制
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));

NettyServerHandler:处理RemotingCommand消息,并且返回相应的处理结果。具体实现如下:

代码语言:javascript复制
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

而processMessageReceived()方法在NettyRemotingServer的父类NettyRemotingAbstract中实现:

代码语言:javascript复制
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

在看processRequestCommand和processResponseCommand这两个方法之前,先了解一下RemotingCommand这个对象:

代码语言:javascript复制
public class RemotingCommand {
    private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
    static {
        // 获取配置的序列化方式
        final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
        if (!isBlank(protocol)) {
            try {
                serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
            } catch (IllegalArgumentException e) {
                throw new RuntimeException("parser specified protocol error. protocol="   protocol, e);
            }
        }
    }
 
    private int code;  // 请求类型
    private LanguageCode language = LanguageCode.JAVA;
    private int version = 0; // RocketMQ版本编号
    private int opaque = requestId.getAndIncrement(); // 请求序号
    private int flag = 0;  // 标记请求是普通请求,还是无回应的请求
    private String remark; // 失败提示
    private HashMap<String, String> extFields;  // 参数字段的数值
    private transient CommandCustomHeader customHeader;  // 参数的类型
 
    private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
 
    private transient byte[] body;  // 解码时缓存的字节流
}

NettyRemotingClient

NettyRemotingClient的start()方法与NettyRemotingServer类似,在添加ChannelHandler处理包的handler是NettyClientHandler,其功能与NettyServerHandler一样。

代码语言:javascript复制
 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (Exception e) {
                // .....
                this.closeChannel(addr, channel);
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

invokeSync方法根据发送命令的地址取出或创建一个Channel,然后调用父类的invokeSyncImpl()方法,阻塞等待返回结果。 当addr为null时,取出连接到namesrv的channel。也就是说,当调用NettyRemotingClient.invokeSync(null, request, 3000)时,请求会发送到namesrv。getAndCreateNameserverChannel()方法,从定时检测刷新的namesrvAddrList中按序取出自己绑定的nameserver的地址。然后从缓存中取出channel,如果缓存中的channel不存在或失活,那么重新连接。 类似的,createChannel()方法,主要逻辑实现连接到目标服务器,并将生成的channel放入到channelTables缓存中,下一次发送命令时,如果channel依然存活,那么从缓存中取出channel使用。

0 人点赞