文章目录
- 生产者消息返回状态
- FLUSH_DISK_TIMEOUT
- FLUSH_SLAVE_TIMEOUT
- SLAVE_NOT_AVAILABLE
- SEND_OK
- 延迟消息
- 自定义消息发送规则 MessageQueueSelector
- Netty底层框架解析
- NettyRemotingServer
- start() 方法
- NettyRemotingClient
生产者消息返回状态
FLUSH_DISK_TIMEOUT
如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout (默认是 5 秒)设置的时间内完成刷盘,就会收到此状态码。
FLUSH_SLAVE_TIMEOUT
如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定时间内完成同步,就会收到此状态码。
SLAVE_NOT_AVAILABLE
如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。
SEND_OK
这个状态可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。需要注意的是,SEND_OK 并不意味着可靠,如果想严格确保没有消息丢失,需要开启 SYNC_MASTER or SYNC_FLUSH。 如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着消息会丢失,有2个选择,一是无所谓,适用于消息不需要保证不丢失的场景,二是重发,但可能产生消息重复,这就需要consumer进行去重控制。如果收到了 SLAVE_NOT_AVAILABLE 就要及时进行处理了。
延迟消息
延迟消息:消息发到Broker后,要特定的时间才会被Consumer消费。 目前只支持固定精度的定时消息,可以在rocketmq-store模块MessageStoreConfig配置类中看到对应的定时消息的配置。
代码语言:javascript复制private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
设置延迟发送消息等级:
代码语言:javascript复制// 创建消息
// 主题
Message message = new Message("test_quick_topic",
// 标签
"TagA",
// 用户自定义的key ,唯一的标识
"key" i,
// 消息内容实体(byte[])
("Hello RocketMQ" i).getBytes());
// 设置延迟等级为3,也就是10s发送一条
message.setDelayTimeLevel(3);
自定义消息发送规则 MessageQueueSelector
如何把消息发送到指定的队列(Message Queue)?
代码语言:javascript复制// 同步发送消息,直接获取发送结果(指定第二个队列)
SendResult sr = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNumber = (Integer)arg;
return mqs.get(queueNumber);
}
}, 2);
System.err.println(sr);
Netty底层框架解析
NettyRemotingServer
实现Netty服务器端功能,接受数据包,在服务器端处理后发送给客户端。
NettyRemotingClient
实现Netty客户端功能。
NettyRemotingServer
start() 方法
start方法主要启动Netty服务器,并在绑定端口后阻塞主线程。这里主要看看Netty服务器端装配了哪些ChannelHandler:
代码语言:javascript复制ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// SocketChannel添加ChannelHandler
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
HandshakeHandler:检测传输的包体是否使用TLS协议(传输层安全性协议,Transport Layer Security)传输 ,如果包体使用TSL协议,将会在Pipeline中加入处理TSL协议握手的Handler。
代码语言:javascript复制ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
NettyEncoder/NettyDecoder
:RocketMQ自定义的编解码Handler,其中编码器将RemotingCommand(RocketMQ的服务器端和客户端交互的数据结构)序列化,其中序列化的方式有json或者二进制,具体编解码方式这里不讨论了。而解码器NettyDecoder继承LengthFieldBasedFrameDecoder,基于长度编解码方式,将二进制反序列化为RemotingCommand。
IdleStateHandler
:Netty包中定义的心跳检测包。读写超时时间由NettyServerConfig.serverChannelMaxIdleTimeSeconds变量控制,默认时间120s。
NettyConnectManageHandler
:Channel连接的管理handler,当发生channel连接的激活、失效、超时和异常时,NettyRemotingServer会生成一个Netty事件,管理连接的组件相应的会处理事件。
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
NettyServerHandler
:处理RemotingCommand消息,并且返回相应的处理结果。具体实现如下:
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
而processMessageReceived()方法在NettyRemotingServer的父类NettyRemotingAbstract中实现:
代码语言:javascript复制public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
在看processRequestCommand和processResponseCommand这两个方法之前,先了解一下RemotingCommand这个对象:
代码语言:javascript复制public class RemotingCommand {
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
static {
// 获取配置的序列化方式
final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
if (!isBlank(protocol)) {
try {
serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
} catch (IllegalArgumentException e) {
throw new RuntimeException("parser specified protocol error. protocol=" protocol, e);
}
}
}
private int code; // 请求类型
private LanguageCode language = LanguageCode.JAVA;
private int version = 0; // RocketMQ版本编号
private int opaque = requestId.getAndIncrement(); // 请求序号
private int flag = 0; // 标记请求是普通请求,还是无回应的请求
private String remark; // 失败提示
private HashMap<String, String> extFields; // 参数字段的数值
private transient CommandCustomHeader customHeader; // 参数的类型
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body; // 解码时缓存的字节流
}
NettyRemotingClient
NettyRemotingClient的start()方法与NettyRemotingServer类似,在添加ChannelHandler处理包的handler是NettyClientHandler,其功能与NettyServerHandler一样。
代码语言:javascript复制 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (Exception e) {
// .....
this.closeChannel(addr, channel);
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
invokeSync方法根据发送命令的地址取出或创建一个Channel,然后调用父类的invokeSyncImpl()方法,阻塞等待返回结果。 当addr为null时,取出连接到namesrv的channel。也就是说,当调用NettyRemotingClient.invokeSync(null, request, 3000)时,请求会发送到namesrv。getAndCreateNameserverChannel()方法,从定时检测刷新的namesrvAddrList中按序取出自己绑定的nameserver的地址。然后从缓存中取出channel,如果缓存中的channel不存在或失活,那么重新连接。 类似的,createChannel()方法,主要逻辑实现连接到目标服务器,并将生成的channel放入到channelTables缓存中,下一次发送命令时,如果channel依然存活,那么从缓存中取出channel使用。