RocketMQ与Dubbo之间线程之间如何阻塞和唤醒

2022-06-02 14:03:04 浏览数 (1)

在上一篇RocketMQ与Dubbo相爱相杀引起的FullGC文章中,我们讲解了由于Dubbo接口调用耗时太久,而消息生产者发送的消息非常快,导致消息消费者不能及时消费消息,造成消息队列堆积,最终导致FullGC.

本篇文章,我们看一下RocketMQ线程和Dubbo线程如何协作的.

我们向MQ消费者发送一个消息,我们分析MQ线程是如何调用Dubbo的线程,以及接收到Dubbo的返回值之后,Dubbo线程又是如何与MQ线程交互的.

Dubbo调用者的配置如下

代码语言:javascript复制
<dubbo:application name="infuq-dubbo-consumer" />
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" check="false" />

<dubbo:reference id="queryUserInfoFacade" interface="com.infuq.facade.QueryUserInfoFacade" 
                 version="1.0.0" check="false" timeout="600000"/>

作为Dubbo调用者,我们会配置接口调用超时时间.上面我们配置timeout=600000ms,我们要看下这个timeout是给哪个线程使用的.配置成600000ms这么长是为了我们Debug分析的时候,不至于调用太快而受影响.

同时为了让我们通过工具可以观察线程的状态,我们也刻意让Dubbo提供者睡眠60s.Dubbo提供者的具体实现如下

代码语言:javascript复制
@Override
public Object update(String address) {

    System.out.println("[Dubbo提供者]:"   Thread.currentThread().getName()   "执行Dubbo接口调用");
    
    try {
        // 让线程睡眠60秒
        Thread.sleep(1000_60);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    return "调用成功";
}

先启动Dubbo提供者,再启动MQ消费者(它的内部会调用Dubbo提供者的接口),最后给MQ消费者发送一条消息.

MQ线程在接收到消息之后,最终会回调到我们设置的监听器

可以看到,当前线程是ConsumeMessageThread_1这个线程,它是MQ的线程,接着准备调用Dubbo提供者的接口,继续进入

最终会调用到Dubbo的代码中.线程依然是ConsumeMessageThread_1,同时我们也看到timeout=600000,就是我们在Dubbo消费者配置文件中配置的超时时间.request方法内部会使用Netty将请求发送给Dubbo提供者.最后MQ线程会调用get()方法.

MQ线程最后使用我们配置的超时时间timeout=600000,调用await(timeout)阻塞.

代码语言:javascript复制
private final Lock lock = new ReentrantLock();
private final Condition done = this.lock.newCondition();

我们再使用JDK自带的jvisualvm工具将线程堆栈信息dump.

观察线程堆栈信息,ConsumeMessageThread_1线程处于TIMED_WAITING状态.也就是说,MQ线程在调用Dubbo接口的时候,如果一直没有返回结果,那么MQ线程就会一直阻塞,直到超时.

当收到Dubbo提供者返回的结果时

线程DubboClientHandler-192.168.0.102:20880-thread-1(它是Dubbo的线程)会唤醒之前被阻塞的MQ线程.

最后,MQ线程拿到返回结果,继续后面的逻辑处理. MQ线程通过ReentrantLock和Condition与Dubbo线程完成阻塞和唤醒. 同时timeout这个值是一个重要的调优参数,如果Dubbo接口调用耗时很久,而timeout设置的又很大,就会严重阻塞MQ线程.所以,timeout这个值是需要特别关注的.

0 人点赞