在很久之前的文章多版本并行开发测试解决方案 中挖了个坑 今天来给填上; 今天主要讲解实现方案;
主要思路
- 给不同版本的dubbo服务打上 标签version上
- 在dubbo 提供和消费的出入口上 带上 标签version
- 服务消费进行路由的时候 给他找到相同标签version的提供者 进行消费;如果没有就给它稳定版本
是不是很简单,就是打个标签,然后路由的时候找相同服务嘛
简单代码
打标签
写个Register的Wrapper类 将标签注册上去 这里我是将标签绑定到了dubbo的属性application
; 放在哪里自己决定能读取到就行;
/**
* @author shirenchuang
* Registry 的包装类
* 修改URL 中的Application
* @date 2019/12/5 8:35 下午
*/
public class DevVersionRegisterWrapper implements Registry {
private static final Logger logger = LoggerFactory.getLogger("devVersion");
private Registry registry;
/**
* 注入Register
* @param registry
*/
public DevVersionRegisterWrapper(Registry registry) {
this.registry = registry;
}
@Override
public URL getUrl() {
return DevVersionRegisterFactoryWrapper.changeApplication(registry.getUrl());
}
@Override
public boolean isAvailable() {
return registry.isAvailable();
}
@Override
public void destroy() {
registry.destroy();
}
@Override
public void register(URL url) {
registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
}
@Override
public void unregister(URL url) {
registry.register(DevVersionRegisterFactoryWrapper.changeApplication((url)));
}
@Override
public void subscribe(URL url, NotifyListener listener) {
registry.subscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
registry.unsubscribe(DevVersionRegisterFactoryWrapper.changeApplication((url)),listener);
}
@Override
public List<URL> lookup(URL url) {
return registry.lookup(DevVersionRegisterFactoryWrapper.changeApplication((url)));
}
}
写一个RegistryFactory
的包装类
/**
* @author shirenchuang
* RegistryFactory 的包装类,在注册的时候 修改一下 Application
* 如果是 迭代环境则把Appliacation=Application_迭代版本号
* @date 2019/12/5 8:29 下午
*/
public class DevVersionRegisterFactoryWrapper implements RegistryFactory {
private static final Logger logger = LoggerFactory.getLogger("devVersion");
private RegistryFactory registryFactory;
/**
* 注入RegisterFactory
*/
public DevVersionRegisterFactoryWrapper(RegistryFactory registryFactory) {
this.registryFactory = registryFactory;
}
@Override
public Registry getRegistry(URL url) {
//获取当前环境的迭代版本号
if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
logger.info("=====启动的服务是迭代版本服务 devVersion:{}=====",MyThreadLocal.localVersion);
System.out.println("====启动的服务是迭代版本服务 devVersion:" MyThreadLocal.localVersion);
return new DevVersionRegisterWrapper(registryFactory.getRegistry(changeApplication(url)));
}
logger.info("=====启动的服务是稳定版本====");
System.out.println("=====启动的服务是稳定版本====");
return registryFactory.getRegistry(url);
}
public static URL changeApplication(URL url){
if(!StringUtils.isEmpty(MyThreadLocal.localVersion)){
String applicationKey = url.getParameter(Constants.APPLICATION_KEY) MyThreadLocal.spiltString MyThreadLocal.localVersion;
URL url2 = url.addParameter(Constants.APPLICATION_KEY,
applicationKey);
logger.info("=====迭代版本服务修改 Application key:{} =====",applicationKey);
return url2;
}
return url;
}
}
服务路由
Invoker 包装类
代码语言:javascript复制/**
* @author shirenchuang
* 2019/12/10
* 集群扩展包装器
* 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
*/
public class DevVersionClusterInvoker<T> implements Invoker<T> {
private static final Logger logger = LoggerFactory.getLogger("devVersion");
private final Directory<T> directory;
private final Invoker<T> invoker;
public DevVersionClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
this.directory = directory;
this.invoker = invoker;
}
@Override
public URL getUrl() {
return directory.getUrl();
}
@Override
public boolean isAvailable() {
return directory.isAvailable();
}
@Override
public void destroy() {
this.invoker.destroy();
}
@Override
public Class<T> getInterface() {
return directory.getInterface();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 找到迭代版本号
return doDevVersionInvoke(invocation, null);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private Result doDevVersionInvoke(Invocation invocation, RpcException e) {
Result result ;
Invoker<T> minvoker;
List<Invoker<T>> devVersionInvokers = selectDevVersionInvoker(invocation);
if (devVersionInvokers==null||devVersionInvokers.size()==0) {
logger.error("没有找到服务啊~~~~ ");
throw new RpcException("没有找到服务啊~~~~");
} else {
minvoker = devVersionInvokers.get(0);
}
try {
result = minvoker.invoke(invocation);
} catch (RpcException me) {
if (me.isBiz()) {
result = new RpcResult(me.getCause());
} else {
throw new RpcException(me.getCode(), getDevVersionExceptionMessage(e, me), me.getCause());
}
} catch (Throwable me) {
throw new RpcException(getDevVersionExceptionMessage(e, me), me.getCause());
}
return result;
}
private String getDevVersionExceptionMessage(Throwable t, Throwable mt) {
String msg = "devVersion error : " mt.getMessage();
if (t != null) {
msg = msg ", invoke error is :" StringUtils.toString(t);
}
return msg;
}
/**
* 获取对应迭代版本服务
* @param invocation
* @return
*/
private List<Invoker<T>> selectDevVersionInvoker(Invocation invocation) {
List<Invoker<T>> invokers = null;
if (invocation instanceof RpcInvocation) {
try {
/**其实我们也可以给directory生生一个代理类,来做帅选操作**/
invokers = directory.list(invocation);
//经过了dubbo的栓选之后,我们来找自己需要的Invokes
String devVersion = MyThreadLocal.getDevVersion();
List<Invoker<T>> newInvokers = new ArrayList<>();
List<Invoker<T>> stableInvokers = new ArrayList<>();
for (Invoker invoker : invokers){
URL providerUrl ;
//获取应用名称
Method getProviderUrl = invoker.getClass().getDeclaredMethod("getProviderUrl");
getProviderUrl.setAccessible(true);
providerUrl = (URL)getProviderUrl.invoke(invoker);
String application = providerUrl.getParameter(Constants.APPLICATION_KEY);
if(StringUtils.isEmpty(devVersion)){
if(application.indexOf(MyThreadLocal.spiltString)==-1){
//不是迭代过来或者本身不是迭代的请求 只能访问非迭代版本
newInvokers.add(invoker);
}
}else {
//是迭代的请求 就需要找对应的迭代服务
if(application.indexOf(MyThreadLocal.spiltString)!=-1){
String version = application.substring(application.indexOf(MyThreadLocal.spiltString) 5);
if(version.equals(devVersion)){
newInvokers.add(invoker);
}
}
}
//找到稳定环境
if(application.indexOf(MyThreadLocal.spiltString)==-1){
stableInvokers.add(invoker);
}
}
if(newInvokers==null||newInvokers.size()==0){
String serviceName = directory.getInterface().getName();
if(StringUtils.isEmpty(devVersion)){
String error = "=====当前消费者自身版本和迭代传递版本均为稳定版本~ ,但是没有找到将要消费的服务=>" serviceName " 的稳定版本!!";
logger.error(error);
throw new RuntimeException(error);
}else {
// 请求的是迭代服务, 但是迭代服务没有找到,退而求其次调用稳定环境 )
if(stableInvokers!=null&&stableInvokers.size()>0){
StringBuffer sb = new StringBuffer();
sb.append("=======当前cap请求的版本为:").append(devVersion)
.append(";往后传递版本").append(devVersion).append("; 将要消费的服务:").append(serviceName)
.append("没有找到与之对应的迭代版本;将会调用稳定版本");
logger.info(sb.toString());
return stableInvokers;
}else {
//可能有其他的迭代服务,但是不调用
logger.error("当前请求迭代版本:{},但是不存在迭代服务,也没有找到稳定服务;{},{},{}",devVersion,serviceName);
throw new RuntimeException("当前请求迭代版本:" devVersion ",但是不存在迭代服务,也没有找到稳定服务;" serviceName);
}
}
}else {
return newInvokers;
}
} catch (RpcException e) {
logger.error("获取 迭代版本 的服务时 发生错误~~:" directory.getUrl().getServiceInterface() ", method:" invocation.getMethodName()
, e);
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
return invokers;
}
@Override
public String toString() {
return "invoker :" this.invoker ",directory: " this.directory;
}
public static void main(String[] args) {
String application = "application" MyThreadLocal.spiltString "1.0.1";
boolean b = application.indexOf(MyThreadLocal.spiltString)==-1;
application = application.substring(application.indexOf(MyThreadLocal.spiltString) 5);
System.out.print(application);
}
}
代码语言:javascript复制/**
* @author shirenchuang
* 2019/12/10
* 集群扩展包装器
* 参照 {@link com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
*/
public class DevVersionClusterWrapper implements Cluster {
private Cluster cluster;
public DevVersionClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public Invoker join(Directory directory) throws RpcException {
//如果自己是迭代环境,则使用包装
return new DevVersionClusterInvoker(directory,
this.cluster.join(directory));
}
}
拦截器 带入 标签Version
消费者拦截器
代码语言:javascript复制/**
* @Description 消费别人服务的时候会走到这里
* 要把 迭代版本号 放到参数里面传到 服务提供者
* @Author shirenchuang
* @Date 2019/12/1 10:20 PM
**/
@Activate(group = {Constants.CONSUMER})
public class DevVersionConsumerFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger("devVersion");
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String traceId = TraceUtil.getTraceId();
RpcContext.getContext().setAttachment("myTraceId",traceId);
String toDevVersion = MyThreadLocal.getDevVersion();
RpcContext.getContext().setAttachment("devVersion",toDevVersion);
doLog(invoker,invocation,traceId);
return invoker.invoke(invocation);
}
private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
String interfaceName = invoker.getInterface().getCanonicalName();
String method = invocation.getMethodName();
String methodFullName = interfaceName "." method;
StringBuffer sb = new StringBuffer();
sb.append("==TraceId:").append(traceId)
.append("=== ConsumerFilter:当前自身版本:").append(MyThreadLocal.localVersion)
.append("; 接收传递版本:").append(MyThreadLocal.getFromVersion())
.append("; 往后传递版本:").append(MyThreadLocal.getDevVersion())
.append(" ;调用服务=> ").append(methodFullName);
logger.info(sb.toString());
}
}
提供者拦截器
代码语言:javascript复制/**
* @Description 当前服务提供者在被真正调用之前获取 消费者过来的迭代版本号
* 然后保存在本地线程变量中,在调用其他dubbo服务的时候 要带上版本号
* @Author shirenchuang
* @Date 2019/12/1 10:20 PM
**/
@Activate(group = {Constants.PROVIDER})
public class DevVersionProviderFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger("devVersion");
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String fromTraceId = RpcContext.getContext().getAttachment("myTraceId");
TraceUtil.traceLocal.set(fromTraceId);
String myTraceId = TraceUtil.getTraceId();
String fromDevVersion = RpcContext.getContext().getAttachment("devVersion");
//放入到本地线程存放
MyThreadLocal.devVersion.set(fromDevVersion);
doLog(invoker,invocation,myTraceId);
return invoker.invoke(invocation);
}
private void doLog(Invoker<?> invoker, Invocation invocation,String traceId){
String interfaceName = invoker.getInterface().getCanonicalName();
String method = invocation.getMethodName();
String methodFullName = interfaceName "." method;
StringBuffer sb = new StringBuffer();
sb.append("==TraceId:").append(traceId)
.append(" ProviderFilter:当前自身版本:").append(MyThreadLocal.localVersion)
.append("; 接收传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
.append("; 往后传递版本:").append(RpcContext.getContext().getAttachment("devVersion"))
.append(" ;将被调用服务=> ").append(methodFullName);
logger.info(sb.toString());
}
}
标签的存取
在dubbo服务启动的时候 通过Jvm参数传入; 透传的标签Version通过ThreadLocal保存;
代码语言:javascript复制public class MyThreadLocal {
private static final Logger logger = LoggerFactory.getLogger("devVersion");
public static ThreadLocal<String> devVersion = new TransmittableThreadLocal();
//public static ThreadLocal devVersion = new ThreadLocal();
/**用户Application评价的固定字符;**/
public static String spiltString = "_dev_";
public static String localVersion ;
static {
localVersion = System.getProperty("localVersion");
logger.info("====devVersion:{} ========",devVersion);
System.out.println("s====devVersion: ========" devVersion);
}
public static String getFromVersion(){
return devVersion.get();
}
/**
* 如果本地变量没有 则可能是第一个发起方;
* 则去当前服务的版本号,然后一直传递下去;
* @return
*/
public static String getDevVersion(){
String fromVersion = getFromVersion();
if(!StringUtils.isEmpty(fromVersion)){
return fromVersion;
}
return localVersion;
}
}
dubbo spi的配置
将上面的DevVersionRegisterFactoryWrapper
DevVersionClusterWrapper
DevVersionProviderFilter
DevVersionConsumerFilter
配置一下使其生效
重点问题说明
- 上面的只是一个扩展Jar包; 要做的无侵入性; 不能让具体业务修改代码和依赖 参考我的解决方案: 我写的dubbo扩展jar包如何无侵入的给别人使用
- ThreadLocal在线程池的情况下 值传递会有问题; 使用阿里开源的 TTL解决;