一、服务端provider发布
根据dubbo启动日志,provider的发布动作为以下几个步骤:
(1)暴露本地服务
Export dubbo service com.ywl.dubbo.TestApi to local registry, dubbo version: 2.0.0, current host: 127.0.0.1。
(2)暴露远程服务
Export dubbo service com.ywl.dubbo.TestApi to url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi...后面省略。
(3)启动netty
Start NettyClient yuwenlei.local/192.168.24.69 connect to the server /192.168.1.100:20041, dubbo version: 2.0.0, current host: 192.168.24.69。
(4)打开zk
Opening socket connection to server dailyzk.webuy.ai/192.168.49.11:2181。
(5)注册provider服务到zk
Register dubbo service com.ywl.dubbo.TestApi url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi? ...中间省略。
to registry registry://dailyzk.webuy.ai:7005/org.apache.dubbo.registry.RegistryService? ...后面省略。
(6)监听zk(订阅与通知)
Subscribe: provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。
Notify urls for subscribe url provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。
· 服务发布的目的
解析dubbo-provider.xml中的接口。将服务提供者向注册中心注册服务,以便服务消费者从注册中心查询并调用服务。
代码语言:javascript复制<dubbo:service interface="com.ywl.dubbo.TestApi" ref="testApi" retries="0"
cluster="failfast" timeout="3000"/>
二、provider发布原理探索
上一篇文章说明了dubbo.xml文件中的自定义元素都是通过schema来进行解析。解析service元素后会形成一个ServiceBean。而SerivceBean实现了ApplicationListener<ContextRefreshedEven>接口,该接口的目的为上下文刷新监听(即当TestApi的bean被初始化或刷新时,该事件被激活,执行实现类方法),dubbo也是在该实现方法中暴露服务。
· doExport
org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent—>
org.apache.dubbo.config.ServiceConfig#export—>
org.apache.dubbo.config.ServiceConfig#doExportUrls
暴露服务的代码如下:
代码语言:javascript复制private void doExportUrls() { //加载注册中心配置 List<URL> registryURLs = loadRegistries(true);
//遍历dubbo协议,默认采用的是dubbo协议 即tcp协议 for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
上述代码在暴露服务时,首先加载注册中心配置,然后根据dubbo协议进行遍历来暴露服务。
代码语言:javascript复制private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//...//获取服务作用域String scope = url.getParameter(Constants.SCOPE_KEY);
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
//暴露本地服务
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
//暴露远程服务
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
//...
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
//...
}
上述代码中属于暴露服务中的核心代码,根据配置的scope判断暴露本地服务还是暴露远程服务。如果没有配置,则默认先暴露本地服务,再暴露远程服务。因此doExport方法中主要做的事情就是暴露本地服务和暴露远程服务。
暴露本地服务和远程服务的区别:
(1)暴露本地服务表示在同一个JVM中,不用通过远程通信来调用。即,在同一个服务中,可以自己调用自己的接口。
(2)暴露远程服务表示暴露给远程客户端IP和端口号,需要通过远程通信来调用。
· 暴露本地服务
代码语言:javascript复制private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { //将远程暴露的URL协议 指定为本地暴露的Url协议
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST).setPort(0); //保存dubbo api-ref的calss,是一个简单的单例实现 ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); Exporter<?> exporter = protocol.export( //将TestApi封装成Invoker接口,进行暴露
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
//日志打印 logger.info("Export dubbo service " interfaceClass.getName() " to local registry");
}
}
ProxyFactory:
代码语言:javascript复制ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
ProxyFactory通过spi机制进行加载,默认编译方式为javassist,在getProxy和getInvoker方法被@Adaptive注解修饰,因此ProxyFactory会新生成一个adaptive动态代理类。
getInvoker,针对服务端,将服务对象,TestApiImpl包装成一个Invoker对象。
getProxy,针对客户端,将TestApi接口创建成一个动态代理对象。
代码语言:javascript复制public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { //创建代理对象 return (T) Proxy.getProxy(interfaces).newInstance (new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//首先将接口封装成wrapper对象
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
//封装成invoker对象 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
Invoker是一个可执行的对象,能根据方法名、参数得到相应的返回结果。Invoker后面单独写一篇知识点来讲解。
protocol.export:
代码语言:javascript复制public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
代码语言:javascript复制InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap; //最终会将本地暴露的invoke接口信息 放到exporterMap缓存中。 exporterMap.put(key, this);
}
以上就是暴露本地服务的全部内容,总结:dubbo的provider-api接口被暴露在本地服务时,会被封装成invoke对象,最终进入injvmExproter类中,将本地需要暴露的invoke接口信息放入到exporterMap中,map的key为接口全路径名。
· 暴露远程服务
在执行export方法之前的原理和本地服务的暴露一样,会将api封装成invoker对象。远程服务的暴露的实现类为RegistryProtocol。
代码语言:javascript复制public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//暴露远程服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//...
代码语言:javascript复制}
代码语言:javascript复制private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); //缓存判断 if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//调用dubboProtocol暴露远程服务 exporter = new ExporterChangeableWrapper<T>((Exporter<T>)
protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
步骤一:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export暴露远程服务
代码语言:javascript复制public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//key-api路径名 dubbo端口号
代码语言:javascript复制 String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //和本地远程服务一样,放到exportMap中 exporterMap.put(key, exporter);
//...
openServer(url);
optimizeSerialization(url);
return exporter;
}
暴露远程服务的原理和暴露本地服务的原理相似,都会将api封装成invoker对象,最终进入dubboProtocol类中,将需要暴露的远程服务invoke接口信息放入到exporterMap中,map的key与本地服务不同的是key为接口全路径名 dubbo端口号。
步骤二org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer启动Server。
暴露远程服务后需要调用openServer方法创建并启动Server。具体源码分析如下:
代码语言:javascript复制private void openServer(URL url) {
//从url信息中获取key - ip 端口
String key = url.getAddress();
//是否为客户端暴露的服务 默认为true
代码语言:javascript复制 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) { //从缓存中获取服务
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) { //创建服务 放到缓存中
serverMap.put(key, createServer(url));
}
}
} else {
// 服务重置
server.reset(url);
}
}
}
代码语言:javascript复制private ExchangeServer createServer(URL url) {
//... ExchangeServer server;
try { //服务信息交互 进入HeaderExchanger类
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " url ") " e.getMessage(), e);
}
//...
return server;
}
org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
在openServer方法中会调用createServer方法创建一个信息交换层对象ExchangeServer。该对象最终会进入到HeaderExchanger类中进行初始化创建。
代码语言:javascript复制public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
代码语言:javascript复制public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
//...//指定bindAddress0.0.0.0:20880
代码语言:javascript复制 bindAddress = new InetSocketAddress(bindIp, bindPort);
//指定accepts 默认为0 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
//指定idleTimeout 默认为600000 this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try { //暴露netty
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " getClass().getSimpleName() " bind " getBindAddress() ", export " getLocalAddress());
}
//...
}
以上述代码为exchanger对象的封装,如url信息、handler信息、连接超时时间(我配置的timeOut为3000毫秒)、bindAddress、accepts、idleTimeout等信息。
exchanger实际是一个信息交互层。主要用于封装请求响应服务,同步转异步。
步骤三:org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)暴露netty服务。
在步骤二的openServer中在封装信息交互层exchanger对象时,存在doOpen方法,该方法的目的实际为暴露netty服务。
代码分析如下:
代码语言:javascript复制protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
上述代码主要设置了NioSercerSocketChannelFactory、boss的线程池信息、worker的线程池信息,以及设置了编解码handler信息。最后调用bootstrap.bind来暴露netty服务。
因此步骤三的transporter属于网络传输层,用来抽象netty的统一接口,暴露netty。
步骤四:org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeServer#startHeartbeatTimer dubbo的心跳机制
步骤二的最后会将信息交互层和网络传输层封装的信息构造成一个HeadExhcangeServer返回,并且在初始化HeadExchangeServer对象时,存在心跳机制的启动。具体代码如下:
代码语言:javascript复制private void startHeartbeatTimer() { //先关闭原来的心跳定时器 stopHeartbeatTimer();
if (heartbeat > 0) { //开启一个心跳定时器
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
心跳定时器的目的是检测provider和consumer之间的连接是否有效,如果连接断了,需要作出响应的处理。
provider:如上图源码所示heartbeat设置了60s,heatbeatTimeOut为(180s),表示如果在60秒内没接受到消息,就会发送心跳消息,如果连着3次没有收到心跳响应,则会关闭连接channel。
consumer:表示如果在60秒内如果没有接收到消息,就会发送心跳消息,如果连着3次没有收到心跳响应,则尝试重连。
心跳线程池任务原理代码如下:
代码语言:javascript复制public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try { //获取最后一次读操作的时间
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
//获取最后一次写操作的时间 Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
//如果在heartbeat时间内没有操作读操作 或 写操作,则发送心跳请求 if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " channel.getRemoteAddress()
", cause: The channel has no data-transmission exceeds a heartbeat period: " heartbeat "ms");
}
} //正常消息和心跳在heartbeatTimeout设置的时间内都没接收到的话,进入if
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " channel
", because heartbeat read idle time out: " heartbeatTimeout "ms");
if (channel instanceof Client) {
try { //客户端则重连
((Client) channel).reconnect();
} catch (Exception e) {
}
} else { //服务端关闭channel
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " t.getMessage(), t);
}
}
三、provider发布总结
在ServiceBean初始化后监听到了spring刷新事件开始发布dubbo-provider服务,根据配置开始进行本地服务发布和远程服务发布,两者服务发布的原理有相似处,将api对象封装成invoker对象,本地服务发布的invoker对象会被封装为InjvmExporter对象放到exportMap中key为api的全路径名,远程服务发布的invoker对象会被封装为dubboExporter对象放到exportMap中并且key为api的全路径名 端口号来做区分。
远程服务发布好后,则会封装信息交换层exchanger对象和网络传输层transporter对象,在网络传输层对象的封装时,会调用doOpen方法来暴露netty服务。最后exchanger和transporter对象都会被封装成一个HeaderExchangeServer服务对象,并且初始化中会开启心跳机制的定时器,来管理服务端和客户端的心跳重连。
由于篇幅有限,本章只写了服务发布的暴露服务和暴露netty的原理,后面的打开zk、注册zk、监听zk放到后续的篇幅中。
补充:
在本地服务暴露和远程服务暴露时,都使用了protocol.export来暴露对象。Protocol协议可以在dubbo-provider.xml中被配置,如果没配置则默认使用http协议。
Protocol接口中存在两个关键方法:
(1)export:用于服务端暴露远程服务,实际上是将invoker对象通过协议暴露给外部。
(2)refer:用于客户端引用远程服务。