问题的由来:
如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识, 则直接在 IO 线程上处理更快,因为减少了线程池调度。
但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到 线程池,否则 IO 线程阻塞,将导致不能接收其它请求。
如果用 IO 线程处理事件,又在事件处理过程中发起新的 IO 请求,比如在连接事件中发起登 录请求,会报“可能引发死锁”异常,但不会真死锁。
因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景。
这里说的IO线程(以netty为例)是netty启动服务时指定的boss/worker执行器中的woker线程。
具体配置方式如下两种:
代码语言:javascript复制<dubbo:provider dispatcher="xxx" />
或者
<dubbo:protocol name="dubbo" dispatcher="xxx" />
目前dubbo提供的Dispatcher扩展实现有如下5种实现,默认派发方式是all
代码语言:javascript复制all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
在分析源码之前,这里再温习下装饰模式,因为dubbo从交换层到传输层通过装饰模式完成了多消息的接收处理,心跳,线程派发,消息解码,请求响应消息的处理逻辑。最外层装饰总优于里层的方法的调用。 本文虽说是要分析线程派发模型,但会从连接接处理基本handler开始,层层分析包裹在它外层的装饰类。
装饰模式类关系图如下
如图装饰模式主要包含以下几种类: 业务接口类,定义要装饰的业务操作 业务实现类,也就是要被装饰的类 装饰类父类,它同样实现了被装饰的业务接口,同时它通过构造函数,内部持有一个装饰接口类型的对象,一般这个对象提供接口方法默认实现。 具体装饰类,要继承装饰类父类,不同的装饰类,可以重写父类方式完成具体的装饰操作。 有时也可以没有装饰类父类,直接有装饰类实现接口完成装饰。
接下来就从装饰模式的角度分析源码
由于dispatcher配置是服务端的,这里从服务暴露流程中分析dubbo的实现。
具体可以看下DubboProtocol类里的私有变量
requestHandler,它就是被装饰的类。
定义如下:
代码语言:javascript复制 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
//调用服务端实现方法,返回结果。
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//获取暴露的服务代理
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " inv.getMethodName() " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" invoker.getUrl()) " ,invocation is :" inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//通过代理服务,执行方法
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " message == null ? null : (message.getClass().getName() ": " message) ", channel: consumer: " channel.getRemoteAddress() " --> provider: " channel.getLocalAddress());
}
@Override
//接受消息处理方法
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
//调用消息,调用replay
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
//客户端连接后处理方法
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
//断开连接后处理方法
public void disconnected(Channel channel) throws RemotingException {
if (logger.isInfoEnabled()) {
logger.info("disconected from " channel.getRemoteAddress() ",url:" channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
//调用过程
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " invocation.getMethodName() "(), cause: " t.getMessage(), t);
}
}
}
//从url中创建调用对象
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
它到是个匿名类,通过实现抽象类ExchangeHandlerAdapter定义来实例化得到,ExchangeHandlerAdapter继承关系如下
可以看到它和它的祖先类,实现了ChannelHandler接口5个关键方法,连接,断开连接,发送消息,接受消息和异常处理方法。也是rpc调用的常用处理方法。 同时也是线程派发处理关注的方法。
所以ChannelHandler就是,装饰模式里的业务接口类。
接下来,就是找装饰类的过程了。 可以找到requestHandler对象第一被使用是在DubboProtocol的createServer方法中
代码语言:javascript复制try {
//Exchangers是门面类,里面封装了具体交换层实现
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " url ") " e.getMessage(), e);
}
代码语言:javascript复制//跟到Exchangers.bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//通过spi会走HeaderExchanger的bind逻辑
return getExchanger(url).bind(url, handler);
}
//HeaderExchanger的bind方法
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//可以看到这时原始handler第一被装饰
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchangeHandler装饰类
可以看下类的继承关系
可以看到HeaderExchangeHandler实现了ChannelHandler接口,符合装饰模式要求。
看下它的构造函数:
代码语言:javascript复制 public HeaderExchangeHandler(ExchangeHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
这里的ExchangeHandler是ChannelHandler子接口,符合装饰模式通过构造函数持有接口类型对象引用。 下面看下它对主要几个rpc方法的装饰实现:
代码语言:javascript复制//连接处理逻辑
public void connected(Channel channel) throws RemotingException {
//添加一些心跳时间参数
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//通过被包装类对应方法处理
handler.connected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
//断开逻辑,
public void disconnected(Channel channel) throws RemotingException {
//添加一些心跳时间参数
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//通过被包装类对应方法处理
handler.disconnected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
//发送数据
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//调用被包装类对应方法处理。
handler.sent(exchangeChannel, message);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
//发送消息,若是请求消息,有个异步发送重试逻辑
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request);
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
/***
* 接受请求数据,通过handleRequest方法处理后得到处理结果。
* @param channel channel.
* @param message message.
* @throws RemotingException
*/
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {//是往返消息,调用私有方法handleRequest处理得到结果
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {//不需要回复的消息调用
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//处理响应消息的逻辑
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " message " in channel: " channel ", url: " channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
//异常处理
public void caught(Channel channel, Throwable exception) throws RemotingException {
if (exception instanceof ExecutionException) {
ExecutionException e = (ExecutionException) exception;
Object msg = e.getRequest();
if (msg instanceof Request) {
Request req = (Request) msg;
if (req.isTwoWay() && !req.isHeartbeat()) {//有往返要求,就回复消息
Response res = new Response(req.getId(), req.getVersion());
res.setStatus(Response.SERVER_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
return;
}
}
}
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//调用对应被装饰类方法
handler.caught(exchangeChannel, exception);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
HeaderExchangeHandler装饰类,在Request/Response层面定义了请求响应消息的处理逻辑。
第二个装饰类DecodeHandler
通过源码可以看到DecodeHandler它和它的父类AbstractChannelHandlerDelegate共同完成了对ChannelHandler接口方法的装饰,看下DecodeHandler具体装饰的received方法:
代码语言:javascript复制 public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
DecodeHandler类如它的名称,主要通过对received方法的装饰处理,完成完成消息解码的处理。
接着
代码语言:javascript复制return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
这句继续跟踪方法
代码语言:javascript复制 //Transporters.bind方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//根据spi 这里具体走NettyTransporter.bind方法
return getTransporter().bind(url, handler);
}
//NettyTransporter的bind方法
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
//这里是创建NettyServer实例
return new NettyServer(url, listener);
}
//NettyServer构造器
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
//这里看下ChannelHandlers.wrap方法
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
//ChannelHandlers.wrap方法
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
//调用内部wrapInternal方法
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
//wrapInternal方法
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
//这里终于看到通过spi获取Dispatcher实现的代码
//还能看到通过Dispatcher.dispatch方法返回的handler后又经过了两层装饰,HeartbeatHandler然后MultiMessageHandler类
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
这里再分析下ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()的代码实现:
代码语言:javascript复制public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher {
public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
//默认是all实现方案
String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" url.toString() ") use keys([dispatcher, dispather, channel.handler])");
com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName);
//调用接口Dispatcher实现的dispatch方法返回ChannelHandler对象
return extension.dispatch(arg0, arg1);
}
}
用户手册说明:
下面就具体对照用户手册上关于派发实现的说明,分别对照源码分析下:
1, all实现,用户手册说,所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
看下实现类AllDispatcher
代码语言:javascript复制public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
//使用AllChannelHandler类实现
return new AllChannelHandler(handler, url);
}
}
AllChannelHandler类,通过类结构可以看到它也是ChannelHandler的装饰类。
装饰类结构清晰。通过代码可知,其他几种线程分派模型实现装饰类,都遵循同样的继承机构,都会继承
代码语言:javascript复制WrappedChannelHandler
看下它对装饰方法的实现
代码语言:javascript复制//连接事件放入线程池
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() " error when process connected event .", t);
}
}
//断开事件连接放入线程池
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() " error when process disconnected event .", t);
}
}
//接受请求(包含回复消息处理)消息放入线程池
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
//fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" url.getIp() "," url.getPort() ") threadpool is exhausted ,detail msg:" t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() " error when process received event .", t);
}
}
//异常处理线程池
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() " error when process caught event .", t);
}
}
通过实现看到,它把所有操作都放入了线程池中执行。但是心跳消息的接受和发送没有进入线程池。
2,direct 分配实现,文档上说,所有消息都不派发到线程池,全部在 IO 线程上直接执行。
实现类DirectDispatcher
代码语言:javascript复制public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
//直接返回原生的handler不进行另外装饰
return handler;
}
}
如它文档所说一样,所有消息处理不派发线程池。
3,message 手册上说,只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
实现类MessageOnlyDispatcher
代码语言:javascript复制public class MessageOnlyDispatcher implements Dispatcher {
public static final String NAME = "message";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
//通过MessageOnlyChannelHandler装饰类处理
return new MessageOnlyChannelHandler(handler, url);
}
}
这里在贴下它的继承图:
具体装饰实现
代码语言:javascript复制public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
//接收请求(包括响应)消息放在线程池。
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() " error when process received event .", t);
}
}
}
如文档所说,只有请求(响应发送)消息放入线程池执行。
4,execution 手册上说,只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
实现类ExecutionDispatcher
代码语言:javascript复制public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
//通过装饰类ExecutionChannelHandler实现
return new ExecutionChannelHandler(handler, url);
}
}
实现如下:
代码语言:javascript复制public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
//连接事件放入线程池
public void connected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
}
//断开事件连接放入线程池
public void disconnected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
}
//消息接受(响应消息发送)放入线程池
public void received(Channel channel, Object message) throws RemotingException {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
//fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
if(message instanceof Request &&
t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" url.getIp() "," url.getPort() ") threadpool is exhausted ,detail msg:" t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() " error when process received event .", t);
}
}
//异常消息放入线程池
public void caught(Channel channel, Throwable exception) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
}
}
通过实现可以看到,它同all派发实现一样,并不是只有请求放入线程池。这个手册上说的不一样,手册有误,还是没有实现!!
5,connection实现,手册说,在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
实现类ConnectionOrderedDispatcher
代码语言:javascript复制public class ConnectionOrderedDispatcher implements Dispatcher {
public static final String NAME = "connection";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
//装饰类ConnectionOrderedChannelHandler实现
return new ConnectionOrderedChannelHandler(handler, url);
}
}
ConnectionOrderedChannelHandler实现:
代码语言:javascript复制public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queuewarninglimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
//通过定义只有一个线程的线程池,保证执行的顺序
//用LinkedBlockingQueue保存待处理的任务
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME 没有地方释放connectExecutor!
//这是等待队列报警大小
queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
//连接事件放入队列
public void connected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() " error when process connected event .", t);
}
}
//断开事件放入队列
public void disconnected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass() " error when process disconnected event .", t);
}
}
//放入线程池
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" url.getIp() "," url.getPort() ") threadpool is exhausted ,detail msg:" t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() " error when process received event .", t);
}
}
//放入线程池
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() " error when process caught event .", t);
}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queuewarninglimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " connectionExecutor.getQueue().size() " exceed the warning limit number :" queuewarninglimit));
}
}
}
通过代码分析,可以看到本实现如文档说的一样把连接断事件处理放入队列,有序执行,其他放入线程池。
以上就是具体线程派发模型的分析。
最后两个装饰类
最后再看下上面提到的最后两个装饰类,
HeartbeatHandler装饰类
代码语言:javascript复制public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
public HeartbeatHandler(ChannelHandler handler) {
super(handler);
}
public void connected(Channel channel) throws RemotingException {
setReadTimestamp(channel);
setWriteTimestamp(channel);
handler.connected(channel);
}
public void disconnected(Channel channel) throws RemotingException {
clearReadTimestamp(channel);
clearWriteTimestamp(channel);
handler.disconnected(channel);
}
public void sent(Channel channel, Object message) throws RemotingException {
setWriteTimestamp(channel);
handler.sent(channel, message);
}
/***
* 心跳消息的接受和发送
*
* @param channel
* @param message
* @throws RemotingException
*/
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " channel.getRemoteAddress()
", cause: The channel has no data-transmission exceeds a heartbeat period"
(heartbeat > 0 ? ": " heartbeat "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
//非心跳消息的接受,走派发装饰类
handler.received(channel, message);
}
}
可以看到HeartbeatHandler对received方法进行了处理,所以消息的接受和发送是不会派发到线程池的。
MultiMessageHandler装饰类
代码语言:javascript复制public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
//多个消息类型时,循环接受
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
}
此装饰类,主要完成多消息类型的循环解析接收。
所以到了NettyServer类,原始的handler已经经过的5层的装饰。 这里在其父类AbstractServer的构造方法中加断点,截图看下handler对象图
可以印证。