Dubbo下的多版本并行开发测试解决方案(服务路由)

2021-07-14 10:33:38 浏览数 (1)

在很久之前的文章多版本并行开发测试解决方案 中挖了个坑 今天来给填上; 今天主要讲解实现方案;

主要思路

  1. 给不同版本的dubbo服务打上 标签version上
  2. 在dubbo 提供和消费的出入口上 带上 标签version
  3. 服务消费进行路由的时候 给他找到相同标签version的提供者 进行消费;如果没有就给它稳定版本

是不是很简单,就是打个标签,然后路由的时候找相同服务嘛

简单代码

打标签

写个Register的Wrapper类 将标签注册上去 这里我是将标签绑定到了dubbo的属性application; 放在哪里自己决定能读取到就行;

代码语言:javascript复制
/**
 * @author shirenchuang
 * Registry 的包装类
 * 修改URL 中的Application
 * @date 2019/12/5  8:35 下午
 */

public class DevVersionRegisterWrapper implements Registry {

    private static final Logger logger = LoggerFactory.getLogger("devVersion");


    private Registry registry;

    /**
     * 注入Register
     * @param registry
     */
    public DevVersionRegisterWrapper(Registry registry) {
        this.registry = registry;
    }

    @Override
    public URL getUrl() {
        return DevVersionRegisterFactoryWrapper.changeApplication(registry.getUrl());
    }

    @Override
    public boolean isAvailable() {
        return registry.isAvailable();
    }

    @Override
    public void destroy() {
        registry.destroy();
    }

    @Override
    public void register(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }

    @Override
    public void unregister(URL url) {
        registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        registry.subscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }

    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        registry.unsubscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
    }

    @Override
    public List<URL> lookup(URL url) {
        return registry.lookup(DevVersionRegisterFactoryWrapper.changeApplication((url)));
    }
}

写一个RegistryFactory的包装类

代码语言:javascript复制
/**
 * @author shirenchuang
 * RegistryFactory 的包装类,在注册的时候 修改一下 Application
 * 如果是 迭代环境则把Appliacation=Application_迭代版本号
 * @date 2019/12/5  8:29 下午
 */

public class DevVersionRegisterFactoryWrapper implements RegistryFactory {

    private static final Logger logger = LoggerFactory.getLogger("devVersion");


    private RegistryFactory registryFactory;
    /**
     * 注入RegisterFactory
     */
    public DevVersionRegisterFactoryWrapper(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }

    @Override
    public Registry getRegistry(URL url) {
        //获取当前环境的迭代版本号
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            logger.info("=====启动的服务是迭代版本服务  devVersion:{}=====",MyThreadLocal.localVersion);
            System.out.println("====启动的服务是迭代版本服务  devVersion:" MyThreadLocal.localVersion);
            return new DevVersionRegisterWrapper(registryFactory.getRegistry(changeApplication(url)));
        }
        logger.info("=====启动的服务是稳定版本====");
        System.out.println("=====启动的服务是稳定版本====");
        return registryFactory.getRegistry(url);
    }

    public static URL changeApplication(URL url){
        if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
            String applicationKey = url.getParameter(Constants.APPLICATION_KEY) MyThreadLocal.spiltString MyThreadLocal.localVersion;
            URL url2 = url.addParameter(Constants.APPLICATION_KEY,
                    applicationKey);

            logger.info("=====迭代版本服务修改 Application key:{} =====",applicationKey);
            return url2;
        }
        return url;
    }
}

服务路由

Invoker 包装类

代码语言:javascript复制
/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterInvoker<T> implements Invoker<T> {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");

    private final Directory<T> directory;

    private final Invoker<T> invoker;

    public DevVersionClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
        this.directory = directory;
        this.invoker = invoker;
    }

    @Override
    public URL getUrl() {
        return directory.getUrl();
    }

    @Override
    public boolean isAvailable() {
        return directory.isAvailable();
    }

    @Override
    public void destroy() {
        this.invoker.destroy();
    }

    @Override
    public Class<T> getInterface() {
        return directory.getInterface();
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // 找到迭代版本号
        return doDevVersionInvoke(invocation, null);
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private Result doDevVersionInvoke(Invocation invocation, RpcException e) {
        Result result ;
        Invoker<T> minvoker;

        List<Invoker<T>> devVersionInvokers = selectDevVersionInvoker(invocation);
        if (devVersionInvokers==null||devVersionInvokers.size()==0) {
            logger.error("没有找到服务啊~~~~ ");
            throw new RpcException("没有找到服务啊~~~~");
        } else {
            minvoker = devVersionInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getDevVersionExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getDevVersionExceptionMessage(e, me), me.getCause());
        }
        return result;
    }

    private String getDevVersionExceptionMessage(Throwable t, Throwable mt) {
        String msg = "devVersion error : "   mt.getMessage();
        if (t != null) {
            msg = msg   ", invoke error is :"   StringUtils.toString(t);
        }
        return msg;
    }


    /**
     * 获取对应迭代版本服务
     * @param invocation
     * @return
     */
    private List<Invoker<T>> selectDevVersionInvoker(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        if (invocation instanceof RpcInvocation) {
            try {
                /**其实我们也可以给directory生生一个代理类,来做帅选操作**/
                invokers = directory.list(invocation);
                //经过了dubbo的栓选之后,我们来找自己需要的Invokes
                String devVersion = MyThreadLocal.getDevVersion();
                List<Invoker<T>> newInvokers = new ArrayList<>();
                List<Invoker<T>> stableInvokers = new ArrayList<>();

                for (Invoker invoker : invokers){
                    URL providerUrl ;
                    //获取应用名称
                    Method getProviderUrl = invoker.getClass().getDeclaredMethod("getProviderUrl");
                    getProviderUrl.setAccessible(true);
                    providerUrl = (URL)getProviderUrl.invoke(invoker);
                    String application  = providerUrl.getParameter(Constants.APPLICATION_KEY);

                    if(StringUtils.isEmpty(devVersion)){
                        if(application.indexOf(MyThreadLocal.spiltString)==-1){
                            //不是迭代过来或者本身不是迭代的请求    只能访问非迭代版本
                            newInvokers.add(invoker);
                        }
                    }else {
                        //是迭代的请求  就需要找对应的迭代服务
                        if(application.indexOf(MyThreadLocal.spiltString)!=-1){
                            String version = application.substring(application.indexOf(MyThreadLocal.spiltString) 5);
                            if(version.equals(devVersion)){
                                newInvokers.add(invoker);
                            }
                        }
                    }

                    //找到稳定环境
                    if(application.indexOf(MyThreadLocal.spiltString)==-1){
                        stableInvokers.add(invoker);
                    }



                }
                if(newInvokers==null||newInvokers.size()==0){
                    String serviceName = directory.getInterface().getName();
                    if(StringUtils.isEmpty(devVersion)){
                        String error = "=====当前消费者自身版本和迭代传递版本均为稳定版本~ ,但是没有找到将要消费的服务=>" serviceName " 的稳定版本!!";
                        logger.error(error);
                        throw new RuntimeException(error);
                    }else {
                        // 请求的是迭代服务, 但是迭代服务没有找到,退而求其次调用稳定环境  )
                        if(stableInvokers!=null&&stableInvokers.size()>0){
                            StringBuffer sb = new StringBuffer();
                            sb.append("=======当前cap请求的版本为:").append(devVersion)
                                    .append(";往后传递版本").append(devVersion).append("; 将要消费的服务:").append(serviceName)
                                    .append("没有找到与之对应的迭代版本;将会调用稳定版本");
                            logger.info(sb.toString());
                            return stableInvokers;
                        }else {
                            //可能有其他的迭代服务,但是不调用
                            logger.error("当前请求迭代版本:{},但是不存在迭代服务,也没有找到稳定服务;{},{},{}",devVersion,serviceName);
                            throw new RuntimeException("当前请求迭代版本:" devVersion ",但是不存在迭代服务,也没有找到稳定服务;" serviceName);
                        }
                    }
                }else {
                    return newInvokers;
                }

            } catch (RpcException e) {

                logger.error("获取 迭代版本 的服务时 发生错误~~:"  directory.getUrl().getServiceInterface()   ", method:"   invocation.getMethodName()
                        , e);
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
        }
        return invokers;
    }



    @Override
    public String toString() {
        return "invoker :"   this.invoker   ",directory: "   this.directory;
    }


    public static void main(String[] args) {

        String application = "application" MyThreadLocal.spiltString "1.0.1";
        boolean b = application.indexOf(MyThreadLocal.spiltString)==-1;
        application = application.substring(application.indexOf(MyThreadLocal.spiltString) 5);
        System.out.print(application);

    }
}
代码语言:javascript复制
/**
 * @author shirenchuang
 * 2019/12/10
 * 集群扩展包装器
 * 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
 */
public class DevVersionClusterWrapper implements Cluster {

    private Cluster cluster;

    public DevVersionClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        //如果自己是迭代环境,则使用包装


        return new DevVersionClusterInvoker(directory,
                this.cluster.join(directory));
    }

}

拦截器 带入 标签Version

消费者拦截器

代码语言:javascript复制
/**
 * @Description 消费别人服务的时候会走到这里
 *              要把 迭代版本号 放到参数里面传到 服务提供者
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.CONSUMER})
public class DevVersionConsumerFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        String traceId = TraceUtil.getTraceId();
        RpcContext.getContext().setAttachment("myTraceId",traceId);

        String toDevVersion = MyThreadLocal.getDevVersion();
        RpcContext.getContext().setAttachment("devVersion",toDevVersion);
        doLog(invoker,invocation,traceId);
        return invoker.invoke(invocation);

    }

    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName   "."   method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append("=== ConsumerFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(MyThreadLocal.getFromVersion())
                .append("; 往后传递版本:").append(MyThreadLocal.getDevVersion())
        .append(" ;调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }
}

提供者拦截器

代码语言:javascript复制
/**
 * @Description 当前服务提供者在被真正调用之前获取 消费者过来的迭代版本号
 *              然后保存在本地线程变量中,在调用其他dubbo服务的时候 要带上版本号
 * @Author shirenchuang
 * @Date 2019/12/1 10:20 PM
 **/
@Activate(group = {Constants.PROVIDER})
public class DevVersionProviderFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        String fromTraceId = RpcContext.getContext().getAttachment("myTraceId");
        TraceUtil.traceLocal.set(fromTraceId);
        String myTraceId = TraceUtil.getTraceId();

        String fromDevVersion = RpcContext.getContext().getAttachment("devVersion");
        //放入到本地线程存放
        MyThreadLocal.devVersion.set(fromDevVersion);
        doLog(invoker,invocation,myTraceId);
        return invoker.invoke(invocation);
    }


    private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
        String interfaceName = invoker.getInterface().getCanonicalName();
        String method = invocation.getMethodName();
        String methodFullName = interfaceName   "."   method;
        StringBuffer sb = new StringBuffer();
        sb.append("==TraceId:").append(traceId)
        .append(" ProviderFilter:当前自身版本:").append(MyThreadLocal.localVersion)
                .append("; 接收传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append("; 往后传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
                .append(" ;将被调用服务=> ").append(methodFullName);
        logger.info(sb.toString());
    }



}

标签的存取

在dubbo服务启动的时候 通过Jvm参数传入; 透传的标签Version通过ThreadLocal保存;

代码语言:javascript复制
public class MyThreadLocal {
    private static final Logger logger = LoggerFactory.getLogger("devVersion");


    public static ThreadLocal<String> devVersion = new TransmittableThreadLocal();
    //public static ThreadLocal devVersion = new ThreadLocal();

    /**用户Application评价的固定字符;**/
    public static String spiltString = "_dev_";

    public static String localVersion  ;

    static {
        localVersion = System.getProperty("localVersion");
        logger.info("====devVersion:{}   ========",devVersion);
        System.out.println("s====devVersion:   ========" devVersion);
    }


    public static String getFromVersion(){
        return devVersion.get();
    }

    /**
     * 如果本地变量没有  则可能是第一个发起方;
     * 则去当前服务的版本号,然后一直传递下去;
     * @return
     */
    public static String getDevVersion(){
        String fromVersion = getFromVersion();
        if(!StringUtils.isEmpty(fromVersion)){
            return fromVersion;
        }
        return localVersion;
    }

}

dubbo spi的配置

将上面的DevVersionRegisterFactoryWrapper DevVersionClusterWrapper DevVersionProviderFilter DevVersionConsumerFilter配置一下使其生效

重点问题说明

  1. 上面的只是一个扩展Jar包; 要做的无侵入性; 不能让具体业务修改代码和依赖 参考我的解决方案: 我写的dubbo扩展jar包如何无侵入的给别人使用
  2. ThreadLocal在线程池的情况下 值传递会有问题; 使用阿里开源的 TTL解决;

0 人点赞