Netty中首先会经过OP_ACCEPT操作,再经过OP_READ事件,此时的操作是在processSelectionKeys中进行处理的,此时首先select出事件,然后执行处理操作。此时的read方法会会执行先后会执行两个事件,一个是连接事件16和一个读事件1。而在OP_WRITE则是在缓冲区写满的时候,才会去注册,等待通知去写。
一个普通的BIO的操作:
代码语言:javascript复制 public class BIOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(6666);
ExecutorService executorService = Executors.newCachedThreadPool();
while (true) {
System.out.println("等待客户端连接。。。。");
Socket socket = serverSocket.accept(); //阻塞
executorService.execute(() -> {
try {
InputStream inputStream = socket.getInputStream(); //阻塞
byte[] bytes = new byte[1024];
while (true){
int length = inputStream.read(bytes);
if(length == -1){
break;
}
System.out.println(new String(bytes, 0, length, "UTF-
8"));
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
从上面我们可以看到首先绑定端口,然后进行连接,然后执行read操作。
NIO操作:首先需要打开selector,同时打开serverSocketChannel,设置成非阻塞方式,此时进行通道获取,然后绑定bind端口,注册感兴趣的事件。接下来是在死循环中,进行轮询获取selectionKeys,然后对selectionKeys中的事件进行处理,处理完,进行移除。可以看到在处理的过程中,会涉及到连接事件和读事件的操作。
代码语言:javascript复制public class SelectorDemo {
/**
* 注册事件
*
* @return
*/
private Selector getSelector() throws Exception {
//获取selector对象
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); //非阻塞
//获取通道并且绑定端⼝
ServerSocket socket = serverSocketChannel.socket();
socket.bind(new InetSocketAddress(6677));
//注册感兴趣的事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
return selector;
}
public void listen() throws Exception {
Selector selector = this.getSelector();
while (true) {
selector.select(); //该方法会阻塞,直到至少有一个事件的发生
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
process(selectionKey, selector);
iterator.remove();
}
}
}
private void process(SelectionKey key, Selector selector) throws Exception {
if(key.isAcceptable()){ //新连接请求
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false); //阻塞
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){ //读数据
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println("form 客户端 " new String(byteBuffer.array(),
0, byteBuffer.position()));
}
}
public static void main(String[] args) throws Exception {
new SelectorDemo().listen();
}
}
而Netty,正是围绕NIO进行优化封装的。
也即Netty中,我们首先会启动服务,此时会将连接事件注册到NioEventLoop中,而这个过程首先是注册0,然后注册16这个事件,也即连接事件,接着注册读事件,如果不能写的时候,注册写事件,等待写通知。
在Dubbo中进行的封装:
dubbo暴露服务的流程如下
在dubbo中,经过invoker操作后,会调用协议进行dubbo协议适配:
代码语言:javascript复制Exporter<?> exporter = PROTOCOL.export(wrapperInvoker)
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
//进行网络通信操作,启动netty服务器
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" url.getParameter(INTERFACE_KEY)
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
openServer(url):
代码语言:javascript复制private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
可以看到这里使用了单例模式。
同时执行bind操作:
代码语言:javascript复制private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " str ", url: " url);
}
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " url ") " e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " str);
}
}
return new DubboProtocolServer(server);
}
可以看到绑定操作的实质:初始化和启动Netty服务器
代码语言:javascript复制@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
/**
* Init and start netty server
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
//boss线程组这里设置为1个
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
//worker线程组
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
同时可以看到dubbo中Netty的设置:将boss线程组设置为1个,避免过多的线程浪费。
禁用Nagle算法=> .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE),来减少网络卡顿。
设置编解码器,采用适配器的方式,适配协议对应的编解码,方便协议适配。
设置空闲处理handler参数:IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit)
中的读闲置时间为0,同时写闲置时间为0,充分保证性能。
代码语言:javascript复制 ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
设置闲置超时时间:idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3)
代码语言:javascript复制public static int getIdleTimeout(URL url) {
int heartBeat = getHeartbeat(url);
// idleTimeout should be at least more than twice heartBeat because possible retries of client.
int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
if (idleTimeout < heartBeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
return idleTimeout;
}
连接完成之后,不能无所事事,此时应该会执行业务处理。也即此时可以看到上面的NettyServerHandler。因此可以看到dubbo的线程模型:
配置 Dubbo 中的线程模型
如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。
但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。
如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。
因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:
代码语言:javascript复制<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
Dispatcher
all
所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。direct
所有消息都不派发到线程池,全部在 IO 线程上直接执行。message
只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。execution
只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。connection
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
ThreadPool
fixed
固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)cached
缓存线程池,空闲一分钟自动删除,需要时重建。limited
可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。eager
优先创建Worker
线程池。在任务数量大于corePoolSize
但是小于maximumPoolSize
时,优先创建Worker
来处理任务。当任务数量大于maximumPoolSize
时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException
。(相比于cached
:cached
在任务数量超过maximumPoolSize
时直接抛出异常而不是将任务放入阻塞队列)
下面来看NettyServerHandler中:执行的操作包括:连接操作connect、断开连接disconnect、received操作、写操作write和发送操作sent、关闭操作
代码语言:javascript复制/**
* NettyServerHandler.
*/
@io.netty.channel.ChannelHandler.Sharable
public class NettyServerHandler extends ChannelDuplexHandler {
//执行连接操作
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
handler.connected(channel);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
handler.disconnected(channel);
} finally {
NettyChannel.removeChannel(ctx.channel());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.received(channel, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
handler.sent(channel, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// server will close channel when server don't receive any heartbeat from client util timeout.
if (evt instanceof IdleStateEvent) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
logger.info("IdleStateEvent triggered, close channel " channel);
channel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.caught(channel, cause);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
public void handshakeCompleted(HandshakeCompletionEvent evt) {
// TODO
}
}
同时NettyServerClient里面也有这几个事件。