Dubbo的底层通信使用的是Netty.
关于Dubbo的服务暴露流程,网络上已经有很多优质的文章.此篇文章以Dubbo的服务暴露为主线(不会详细讲解),观察一下,Netty在服务暴露过程中何时被使用.
代码语言:javascript复制// 服务暴露的起点
com.alibaba.dubbo.config.spring.ServiceBean#onApplicationEvent
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " getInterface());
}
// 服务暴露
export();
}
}
流程会走到如下代码
代码语言:javascript复制private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
registryURLs的值如下(样例)
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=infuq-dubbo-provider&check=false&dubbo=2.0.2&pid=4916®istry=zookeeper×tamp=1609052431702
protocols的值如下(样例)
<dubbo:protocol name="dubbo" threads="200" port="20880" id="dubbo" />
流程继续走到如下代
码
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
基于SPI机制,protocol会调用com.alibaba.dubbo.registry.integration.RegistryProtocol#export,源码如下
代码语言:javascript复制@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
...
}
第一行的doLocalExport方法,继续跟进
代码语言:javascript复制private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 暴露
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
基于SPI机制,protocol会调用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#export,源码如下
代码语言:javascript复制public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
...
// 开启服务器
openServer(url);
optimizeSerialization(url);
return exporter;
}
继续跟进openServer方法
代码语言:javascript复制private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 创建服务器
serverMap.put(key, createServer(url));
} else {
server.reset(url);
}
}
}
url如下(样例)
dubbo://192.168.0.102:20880/com.infuq.facade.QueryUserInfoFacade?anyhost=true&application=infuq-dubbo-provider&bean.name=com.infuq.facade.QueryUserInfoFacade&bind.ip=192.168.0.102&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.infuq.facade.QueryUserInfoFacade&methods=update&pid=13528&revision=1.0.0&side=provider&threads=200×tamp=1609053786543&version=1.0.0
一直跟进,会依次调用如下几个主要方法
代码语言:javascript复制server = Exchangers.bind(url, requestHandler);
getExchanger(url).bind(url, handler);
new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
getTransporter().bind(url, handler);
new NettyServer(url, listener);
在创建NettyServer时候,最终会调用到如下代码
代码语言:javascript复制@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
上面的代码就是很熟悉的Netty创建服务端的代码.最终会创建并启动服务端.
也就是说,在暴露服务的过程中,在进行doLocalExport本地暴露的时候,会分别经过RegistryProtocol#export和DubboProtocol#export,最后通过Netty创建一个服务端,监听外部的接口调用请求.
虽然本地服务已经暴露,但是还需要将服务注册到注册中心(例如ZK)
在没有注册到ZK之前,查看下ZK信息
是没有dubbo节点信息的.
代码语言:javascript复制@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 本地暴露
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
// 注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
...
}
registryUrl内容如下(样例)
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=infuq-dubbo-provider&check=false&dubbo=2.0.2&export=dubbo://192.168.0.102:20880/com.infuq.facade.QueryUserInfoFacade?anyhost=true&application=infuq-dubbo-provider&bean.name=com.infuq.facade.QueryUserInfoFacade&bind.ip=192.168.0.102&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.infuq.facade.QueryUserInfoFacade&methods=update&pid=9892&revision=1.0.0&side=provider&threads=200×tamp=1609055193386&version=1.0.0&pid=9892×tamp=1609055193386
registeredProviderUrl内容如下(样例)
dubbo://192.168.0.102:20880/com.infuq.facade.QueryUserInfoFacade?anyhost=true&application=infuq-dubbo-provider&bean.name=com.infuq.facade.QueryUserInfoFacade&dubbo=2.0.2&generic=false&interface=com.infuq.facade.QueryUserInfoFacade&methods=update&pid=9892&revision=1.0.0&side=provider&threads=200×tamp=1609055193386&version=1.0.0
当执行完register方法之后,再查看ZK信息
已经有dubbo节点信息了,说明提供者已经注册到ZK上了.
总结
Dubbo在暴露服务的过程中,首先会通过Netty创建并启动服务端,监听外部调用接口的请求.紧接着会将服务注册到注册中心(例如Zookeeper).