在上一篇RocketMQ与Dubbo相爱相杀引起的FullGC文章中,我们讲解了由于Dubbo接口调用耗时太久,而消息生产者发送的消息非常快,导致消息消费者不能及时消费消息,造成消息队列堆积,最终导致FullGC.
本篇文章,我们看一下RocketMQ线程和Dubbo线程如何协作的.
我们向MQ消费者发送一个消息,我们分析MQ线程是如何调用Dubbo的线程,以及接收到Dubbo的返回值之后,Dubbo线程又是如何与MQ线程交互的.
Dubbo调用者的配置如下
代码语言:javascript复制<dubbo:application name="infuq-dubbo-consumer" />
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" check="false" />
<dubbo:reference id="queryUserInfoFacade" interface="com.infuq.facade.QueryUserInfoFacade"
version="1.0.0" check="false" timeout="600000"/>
作为Dubbo调用者,我们会配置接口调用超时时间.上面我们配置timeout=600000ms,我们要看下这个timeout是给哪个线程使用的.配置成600000ms这么长是为了我们Debug分析的时候,不至于调用太快而受影响.
同时为了让我们通过工具可以观察线程的状态,我们也刻意让Dubbo提供者睡眠60s.Dubbo提供者的具体实现如下
代码语言:javascript复制@Override
public Object update(String address) {
System.out.println("[Dubbo提供者]:" Thread.currentThread().getName() "执行Dubbo接口调用");
try {
// 让线程睡眠60秒
Thread.sleep(1000_60);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "调用成功";
}
先启动Dubbo提供者,再启动MQ消费者(它的内部会调用Dubbo提供者的接口),最后给MQ消费者发送一条消息.
MQ线程在接收到消息之后,最终会回调到我们设置的监听器
可以看到,当前线程是ConsumeMessageThread_1这个线程,它是MQ的线程,接着准备调用Dubbo提供者的接口,继续进入
最终会调用到Dubbo的代码中.线程依然是ConsumeMessageThread_1,同时我们也看到timeout=600000,就是我们在Dubbo消费者配置文件中配置的超时时间.request方法内部会使用Netty将请求发送给Dubbo提供者.最后MQ线程会调用get()方法.
MQ线程最后使用我们配置的超时时间timeout=600000,调用await(timeout)阻塞.
代码语言:javascript复制private final Lock lock = new ReentrantLock();
private final Condition done = this.lock.newCondition();
我们再使用JDK自带的jvisualvm工具将线程堆栈信息dump.
观察线程堆栈信息,ConsumeMessageThread_1线程处于TIMED_WAITING状态.也就是说,MQ线程在调用Dubbo接口的时候,如果一直没有返回结果,那么MQ线程就会一直阻塞,直到超时.
当收到Dubbo提供者返回的结果时
线程DubboClientHandler-192.168.0.102:20880-thread-1(它是Dubbo的线程)会唤醒之前被阻塞的MQ线程.
最后,MQ线程拿到返回结果,继续后面的逻辑处理. MQ线程通过ReentrantLock和Condition与Dubbo线程完成阻塞和唤醒. 同时timeout这个值是一个重要的调优参数,如果Dubbo接口调用耗时很久,而timeout设置的又很大,就会严重阻塞MQ线程.所以,timeout这个值是需要特别关注的.