dubbo学习(四)provider服务发布

2020-10-09 15:28:42 浏览数 (1)

一、服务端provider发布

根据dubbo启动日志,provider的发布动作为以下几个步骤:

(1)暴露本地服务

Export dubbo service com.ywl.dubbo.TestApi to local registry, dubbo version: 2.0.0, current host: 127.0.0.1。

(2)暴露远程服务

Export dubbo service com.ywl.dubbo.TestApi to url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi...后面省略。

(3)启动netty

Start NettyClient yuwenlei.local/192.168.24.69 connect to the server /192.168.1.100:20041, dubbo version: 2.0.0, current host: 192.168.24.69。

(4)打开zk

Opening socket connection to server dailyzk.webuy.ai/192.168.49.11:2181。

(5)注册provider服务到zk

Register dubbo service com.ywl.dubbo.TestApi url dubbo://192.168.24.69:20880/com.ywl.dubbo.TestApi? ...中间省略。

to registry registry://dailyzk.webuy.ai:7005/org.apache.dubbo.registry.RegistryService? ...后面省略。

(6)监听zk(订阅与通知)

Subscribe: provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

Notify urls for subscribe url provider://192.168.24.69:20880/com.ywl.dubbo.TestApi?...后面省略。

· 服务发布的目的

解析dubbo-provider.xml中的接口。将服务提供者向注册中心注册服务,以便服务消费者从注册中心查询并调用服务。

代码语言:javascript复制
<dubbo:service interface="com.ywl.dubbo.TestApi" ref="testApi" retries="0"
               cluster="failfast" timeout="3000"/>

二、provider发布原理探索

上一篇文章说明了dubbo.xml文件中的自定义元素都是通过schema来进行解析。解析service元素后会形成一个ServiceBean。而SerivceBean实现了ApplicationListener<ContextRefreshedEven>接口,该接口的目的为上下文刷新监听(即当TestApi的bean被初始化或刷新时,该事件被激活,执行实现类方法),dubbo也是在该实现方法中暴露服务。

· doExport

org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent—>

org.apache.dubbo.config.ServiceConfig#export—>

org.apache.dubbo.config.ServiceConfig#doExportUrls

暴露服务的代码如下:

代码语言:javascript复制
private void doExportUrls() {    //加载注册中心配置    List<URL> registryURLs = loadRegistries(true);
    //遍历dubbo协议,默认采用的是dubbo协议 即tcp协议    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

上述代码在暴露服务时,首先加载注册中心配置,然后根据dubbo协议进行遍历来暴露服务。

代码语言:javascript复制
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//...//获取服务作用域String scope = url.getParameter(Constants.SCOPE_KEY);
    
    if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
        //暴露本地服务
        if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        //暴露远程服务
        if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
           //...
           Exporter<?> exporter = protocol.export(wrapperInvoker);
           exporters.add(exporter);
        }
    }
    //...
}

上述代码中属于暴露服务中的核心代码,根据配置的scope判断暴露本地服务还是暴露远程服务。如果没有配置,则默认先暴露本地服务,再暴露远程服务。因此doExport方法中主要做的事情就是暴露本地服务和暴露远程服务。

暴露本地服务和远程服务的区别:

(1)暴露本地服务表示在同一个JVM中,不用通过远程通信来调用。即,在同一个服务中,可以自己调用自己的接口。

(2)暴露远程服务表示暴露给远程客户端IP和端口号,需要通过远程通信来调用。

· 暴露本地服务

代码语言:javascript复制
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {        //将远程暴露的URL协议 指定为本地暴露的Url协议
        URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)
                .setHost(LOCALHOST).setPort(0);        //保存dubbo api-ref的calss,是一个简单的单例实现        ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));        Exporter<?> exporter = protocol.export(                //将TestApi封装成Invoker接口,进行暴露
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        //日志打印        logger.info("Export dubbo service "   interfaceClass.getName()   " to local registry");
    }
}

ProxyFactory:

代码语言:javascript复制
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

ProxyFactory通过spi机制进行加载,默认编译方式为javassist,在getProxy和getInvoker方法被@Adaptive注解修饰,因此ProxyFactory会新生成一个adaptive动态代理类。

getInvoker,针对服务端,将服务对象,TestApiImpl包装成一个Invoker对象。

getProxy,针对客户端,将TestApi接口创建成一个动态代理对象。

代码语言:javascript复制
public class JavassistProxyFactory extends AbstractProxyFactory {
    @Override
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {        //创建代理对象        return (T) Proxy.getProxy(interfaces).newInstance                        (new InvokerInvocationHandler(invoker));
    }
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        //首先将接口封装成wrapper对象
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                //封装成invoker对象                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

Invoker是一个可执行的对象,能根据方法名、参数得到相应的返回结果。Invoker后面单独写一篇知识点来讲解。

protocol.export:

代码语言:javascript复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
代码语言:javascript复制
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;    //最终会将本地暴露的invoke接口信息 放到exporterMap缓存中。    exporterMap.put(key, this);
}

以上就是暴露本地服务的全部内容,总结:dubbo的provider-api接口被暴露在本地服务时,会被封装成invoke对象,最终进入injvmExproter类中,将本地需要暴露的invoke接口信息放入到exporterMap中,map的key为接口全路径名。

· 暴露远程服务

在执行export方法之前的原理和本地服务的暴露一样,会将api封装成invoker对象。远程服务的暴露的实现类为RegistryProtocol。

代码语言:javascript复制
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    //暴露远程服务
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    //...
代码语言:javascript复制
}
代码语言:javascript复制
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);    //缓存判断    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                //调用dubboProtocol暴露远程服务                exporter = new ExporterChangeableWrapper<T>((Exporter<T>)
                    protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

步骤一:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export暴露远程服务

代码语言:javascript复制
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    //key-api路径名 dubbo端口号
代码语言:javascript复制
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);    //和本地远程服务一样,放到exportMap中    exporterMap.put(key, exporter);
    //...
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

暴露远程服务的原理和暴露本地服务的原理相似,都会将api封装成invoker对象,最终进入dubboProtocol类中,将需要暴露的远程服务invoke接口信息放入到exporterMap中,map的key与本地服务不同的是key为接口全路径名 dubbo端口号。

步骤二org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer启动Server。

暴露远程服务后需要调用openServer方法创建并启动Server。具体源码分析如下:

代码语言:javascript复制
private void openServer(URL url) {
    //从url信息中获取key - ip 端口
    String key = url.getAddress();
    //是否为客户端暴露的服务 默认为true
代码语言:javascript复制
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    if (isServer) {        //从缓存中获取服务
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {                    //创建服务 放到缓存中
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // 服务重置
            server.reset(url);
        }
    }
}
代码语言:javascript复制
private ExchangeServer createServer(URL url) {
    //...    ExchangeServer server;
    try {        //服务信息交互 进入HeaderExchanger类
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: "   url   ") "   e.getMessage(), e);
    }
    //...
    return server;
}

org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind

在openServer方法中会调用createServer方法创建一个信息交换层对象ExchangeServer。该对象最终会进入到HeaderExchanger类中进行初始化创建。

代码语言:javascript复制
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url,            new DecodeHandler(new HeaderExchangeHandler(handler))));
}
代码语言:javascript复制
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
//...//指定bindAddress0.0.0.0:20880
代码语言:javascript复制
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    //指定accepts 默认为0    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    //指定idleTimeout 默认为600000    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {        //暴露netty
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start "   getClass().getSimpleName()   " bind "   getBindAddress()   ", export "   getLocalAddress());
        }
    //...
}

以上述代码为exchanger对象的封装,如url信息、handler信息、连接超时时间(我配置的timeOut为3000毫秒)、bindAddress、accepts、idleTimeout等信息。

exchanger实际是一个信息交互层。主要用于封装请求响应服务,同步转异步。

步骤三:org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)暴露netty服务。

在步骤二的openServer中在封装信息交互层exchanger对象时,存在doOpen方法,该方法的目的实际为暴露netty服务。

代码分析如下:

代码语言:javascript复制
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}

上述代码主要设置了NioSercerSocketChannelFactory、boss的线程池信息、worker的线程池信息,以及设置了编解码handler信息。最后调用bootstrap.bind来暴露netty服务。

因此步骤三的transporter属于网络传输层,用来抽象netty的统一接口,暴露netty。

步骤四:org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeServer#startHeartbeatTimer dubbo的心跳机制

步骤二的最后会将信息交互层和网络传输层封装的信息构造成一个HeadExhcangeServer返回,并且在初始化HeadExchangeServer对象时,存在心跳机制的启动。具体代码如下:

代码语言:javascript复制
private void startHeartbeatTimer() {    //先关闭原来的心跳定时器    stopHeartbeatTimer();
    if (heartbeat > 0) {        //开启一个心跳定时器
        heartbeatTimer = scheduled.scheduleWithFixedDelay(
                new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                    @Override
                    public Collection<Channel> getChannels() {
                        return Collections.unmodifiableCollection(
                                HeaderExchangeServer.this.getChannels());
                    }
                }, heartbeat, heartbeatTimeout),
                heartbeat, heartbeat, TimeUnit.MILLISECONDS);
    }
}

心跳定时器的目的是检测provider和consumer之间的连接是否有效,如果连接断了,需要作出响应的处理。

provider:如上图源码所示heartbeat设置了60s,heatbeatTimeOut为(180s),表示如果在60秒内没接受到消息,就会发送心跳消息,如果连着3次没有收到心跳响应,则会关闭连接channel。

consumer:表示如果在60秒内如果没有接收到消息,就会发送心跳消息,如果连着3次没有收到心跳响应,则尝试重连。

心跳线程池任务原理代码如下:

代码语言:javascript复制
public void run() {
    try {
        long now = System.currentTimeMillis();
        for (Channel channel : channelProvider.getChannels()) {
            if (channel.isClosed()) {
                continue;
            }
try {                //获取最后一次读操作的时间
                Long lastRead = (Long) channel.getAttribute(
                HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                //获取最后一次写操作的时间                Long lastWrite = (Long) channel.getAttribute(
                HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                //如果在heartbeat时间内没有操作读操作 或 写操作,则发送心跳请求                if ((lastRead != null && now - lastRead > heartbeat)
                        || (lastWrite != null && now - lastWrite > heartbeat)) {
                    Request req = new Request();
                    req.setVersion(Version.getProtocolVersion());
                    req.setTwoWay(true);
                    req.setEvent(Request.HEARTBEAT_EVENT);
                    channel.send(req);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Send heartbeat to remote channel "   channel.getRemoteAddress()
                                  ", cause: The channel has no data-transmission exceeds a heartbeat period: "   heartbeat   "ms");
                    }
}                //正常消息和心跳在heartbeatTimeout设置的时间内都没接收到的话,进入if
                if (lastRead != null && now - lastRead > heartbeatTimeout) {
                    logger.warn("Close channel "   channel
                              ", because heartbeat read idle time out: "   heartbeatTimeout   "ms");
                    if (channel instanceof Client) {
                    try {                            //客户端则重连
                            ((Client) channel).reconnect();
                        } catch (Exception e) {
                            }
                        } else {                        //服务端关闭channel
                        channel.close();
                    }
                }
            } catch (Throwable t) {
                logger.warn("Exception when heartbeat to remote channel "   channel.getRemoteAddress(), t);
            }
        }
    } catch (Throwable t) {
        logger.warn("Unhandled exception when heartbeat, cause: "   t.getMessage(), t);
    }
}

三、provider发布总结

在ServiceBean初始化后监听到了spring刷新事件开始发布dubbo-provider服务,根据配置开始进行本地服务发布和远程服务发布,两者服务发布的原理有相似处,将api对象封装成invoker对象,本地服务发布的invoker对象会被封装为InjvmExporter对象放到exportMap中key为api的全路径名,远程服务发布的invoker对象会被封装为dubboExporter对象放到exportMap中并且key为api的全路径名 端口号来做区分。

远程服务发布好后,则会封装信息交换层exchanger对象和网络传输层transporter对象,在网络传输层对象的封装时,会调用doOpen方法来暴露netty服务。最后exchanger和transporter对象都会被封装成一个HeaderExchangeServer服务对象,并且初始化中会开启心跳机制的定时器,来管理服务端和客户端的心跳重连。

由于篇幅有限,本章只写了服务发布的暴露服务和暴露netty的原理,后面的打开zk、注册zk、监听zk放到后续的篇幅中。

补充:

在本地服务暴露和远程服务暴露时,都使用了protocol.export来暴露对象。Protocol协议可以在dubbo-provider.xml中被配置,如果没配置则默认使用http协议。

Protocol接口中存在两个关键方法:

(1)export:用于服务端暴露远程服务,实际上是将invoker对象通过协议暴露给外部。

(2)refer:用于客户端引用远程服务。

0 人点赞