Apache Dubbo 2.7孵化版整理 顶

2019-10-30 11:48:46 浏览数 (1)

分析Dubbo当然要从注册开始,2.7的注册加入了非常多的方式,已经不限于Zookeeper.

基本上现在主流的注册模式都有了。

这种一看就是模版方法模式,其中Registry接口继承与两个接口Node,RegistryService,它自己本身没有任何方法。

代码语言:javascript复制
/**
 * Node. (API/SPI, Prototype, ThreadSafe)
 */
public interface Node {

    /**
     * get url.
     *
     * @return url.
     */
    URL getUrl();

    /**
     * is available.
     *
     * @return available.
     */
    boolean isAvailable();

    /**
     * destroy.
     */
    void destroy();

}
代码语言:javascript复制
/**
 * RegistryService. (SPI, Prototype, ThreadSafe)
 *
 * @see org.apache.dubbo.registry.Registry
 * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
 */
public interface RegistryService {

    /**
     * 注册
     */
    void register(URL url);

    /**
     * 反注册
     */
    void unregister(URL url);

    /**
     * 订阅
     */
    void subscribe(URL url, NotifyListener listener);

    /**
     * 取消订阅
     */
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * 查找
     */
    List<URL> lookup(URL url);

}

AbstractRegistry是一个抽象类,实现了Registry接口的注册,订阅,查找等方法,我们来看一下注册方法。

代码语言:javascript复制
private final Set<URL> registered = new ConcurrentHashSet<>();

这个URL是Dubbo自己封装的一个类,其中包含了例如协议、用户名、密码、主机、端口号、路径、参数等的属性。

代码语言:javascript复制
@Override
public void register(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("register url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Register: "   url);
    }
    registered.add(url);
}

这个是Apache孵化后的写法。意思就是把提供者或者消费者各自的资源放到该集合中,此处是一个线程安全的集合。

取消注册

代码语言:javascript复制
@Override
public void unregister(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("unregister url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Unregister: "   url);
    }
    registered.remove(url);
}

订阅

代码语言:javascript复制
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
代码语言:javascript复制
public interface NotifyListener {

    /**
     * 给一组URL发通知
     */
    void notify(List<URL> urls);

}

NotifyListener是一个专门用监听通知的接口,它的作用就是专门用来发通知的,至于怎么发通知需要到Registry最终实现里面去具体实现。

此处可以看到订阅对象是一个URL对应一组监听器

代码语言:javascript复制
@Override
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: "   url);
    }
    //如果订阅对象中没有该url的键,则创建该键对应的集合,集合添加监听器listener
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    listeners.add(listener);
}

取消订阅

代码语言:javascript复制
@Override
public void unsubscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("unsubscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("unsubscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Unsubscribe: "   url);
    }
    //找出订阅到所有监听器,在这些监听器集合中删除对应多监听器
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners != null) {
        listeners.remove(listener);
    }
}

查找

代码语言:javascript复制
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();

第一个URL为消费者的URL,Map的String为类型,基本为生产者,List<URL>为消费者订阅生产者的URL集合

lookup即为查找一个消费者订阅了哪些生产者URL

代码语言:javascript复制
@Override
public List<URL> lookup(URL url) {
    List<URL> result = new ArrayList<>();
    //获取几组已通知的生产者url,根据String类型的不同,可能会有几组url
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    //如果存在这么几组已通知的生产者url
    if (notifiedUrls != null && notifiedUrls.size() > 0) {
        //遍历所有的组,获取每一组的url集合
        for (List<URL> urls : notifiedUrls.values()) {
            //遍历集合每一个url
            for (URL u : urls) {
                if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    //将所有协议不为空的url添加到结果集合中
                    result.add(u);
                }
            }
        }
    } else {
        //如果不存在任何已通知的url
        final AtomicReference<List<URL>> reference = new AtomicReference<>();
        //这段代码的意思等同与new NotifyListener() {
        //    void notify(List<URL> urls) {
        //        reference.set(urls);
        //    }
        //}
        //这里就是实现了一个匿名类订阅监听器对象
        NotifyListener listener = reference::set;
        //将该匿名类订阅监听器对象放入订阅对象中,这里是订阅了,有没有通知不知道,看多线程的
        subscribe(url, listener); // Subscribe logic guarantees the first notify to return
        //如果有通知的url就取出
        List<URL> urls = reference.get();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    //遍历所有有通知的url,且协议不为空协议的,添加到结果集
                    result.add(u);
                }
            }
        }
    }
    return result;
}
代码语言:javascript复制
public Map<URL, Map<String, List<URL>>> getNotified() {
    //返回一个不可变的映射,该映射不可修改
    return Collections.unmodifiableMap(notified);
}

0 人点赞