分析Dubbo当然要从注册开始,2.7的注册加入了非常多的方式,已经不限于Zookeeper.
基本上现在主流的注册模式都有了。
这种一看就是模版方法模式,其中Registry接口继承与两个接口Node,RegistryService,它自己本身没有任何方法。
代码语言:javascript复制/**
* Node. (API/SPI, Prototype, ThreadSafe)
*/
public interface Node {
/**
* get url.
*
* @return url.
*/
URL getUrl();
/**
* is available.
*
* @return available.
*/
boolean isAvailable();
/**
* destroy.
*/
void destroy();
}
代码语言:javascript复制/**
* RegistryService. (SPI, Prototype, ThreadSafe)
*
* @see org.apache.dubbo.registry.Registry
* @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
*/
public interface RegistryService {
/**
* 注册
*/
void register(URL url);
/**
* 反注册
*/
void unregister(URL url);
/**
* 订阅
*/
void subscribe(URL url, NotifyListener listener);
/**
* 取消订阅
*/
void unsubscribe(URL url, NotifyListener listener);
/**
* 查找
*/
List<URL> lookup(URL url);
}
AbstractRegistry是一个抽象类,实现了Registry接口的注册,订阅,查找等方法,我们来看一下注册方法。
代码语言:javascript复制private final Set<URL> registered = new ConcurrentHashSet<>();
这个URL是Dubbo自己封装的一个类,其中包含了例如协议、用户名、密码、主机、端口号、路径、参数等的属性。
代码语言:javascript复制@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " url);
}
registered.add(url);
}
这个是Apache孵化后的写法。意思就是把提供者或者消费者各自的资源放到该集合中,此处是一个线程安全的集合。
取消注册
代码语言:javascript复制@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " url);
}
registered.remove(url);
}
订阅
代码语言:javascript复制private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
代码语言:javascript复制public interface NotifyListener {
/**
* 给一组URL发通知
*/
void notify(List<URL> urls);
}
NotifyListener是一个专门用监听通知的接口,它的作用就是专门用来发通知的,至于怎么发通知需要到Registry最终实现里面去具体实现。
此处可以看到订阅对象是一个URL对应一组监听器
代码语言:javascript复制@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " url);
}
//如果订阅对象中没有该url的键,则创建该键对应的集合,集合添加监听器listener
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}
取消订阅
代码语言:javascript复制@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " url);
}
//找出订阅到所有监听器,在这些监听器集合中删除对应多监听器
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
}
查找
代码语言:javascript复制private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
第一个URL为消费者的URL,Map的String为类型,基本为生产者,List<URL>为消费者订阅生产者的URL集合
lookup即为查找一个消费者订阅了哪些生产者URL
代码语言:javascript复制@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<>();
//获取几组已通知的生产者url,根据String类型的不同,可能会有几组url
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
//如果存在这么几组已通知的生产者url
if (notifiedUrls != null && notifiedUrls.size() > 0) {
//遍历所有的组,获取每一组的url集合
for (List<URL> urls : notifiedUrls.values()) {
//遍历集合每一个url
for (URL u : urls) {
if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
//将所有协议不为空的url添加到结果集合中
result.add(u);
}
}
}
} else {
//如果不存在任何已通知的url
final AtomicReference<List<URL>> reference = new AtomicReference<>();
//这段代码的意思等同与new NotifyListener() {
// void notify(List<URL> urls) {
// reference.set(urls);
// }
//}
//这里就是实现了一个匿名类订阅监听器对象
NotifyListener listener = reference::set;
//将该匿名类订阅监听器对象放入订阅对象中,这里是订阅了,有没有通知不知道,看多线程的
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
//如果有通知的url就取出
List<URL> urls = reference.get();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
//遍历所有有通知的url,且协议不为空协议的,添加到结果集
result.add(u);
}
}
}
}
return result;
}
代码语言:javascript复制public Map<URL, Map<String, List<URL>>> getNotified() {
//返回一个不可变的映射,该映射不可修改
return Collections.unmodifiableMap(notified);
}