dubbo自定义泛化调用接口

2023-07-17 19:55:22 浏览数 (2)

在做公司业务时,有需求是让server端调用特定的client端 我是根据dubbo.application.name来判断的

代码语言:javascript复制
package com.jinw.utils.rpc;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.jinw.constant.ApiConstant;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Author @liuxin
 * @Description dubbo rpc调用缓存类
 * @Date 2023/4/20 16:47
 */
public class DubboReferenceCache {

    private static final Map<String, CacheEntry> referenceConfigMap = new LinkedHashMap<>(16, 0.75f, true);
    private static final int MAX_CACHE_SIZE = 1000;
    private static final int CACHE_EXPIRE_TIME = 1800;
    private static final ScheduledExecutorService CLEANUP_EXECUTOR = Executors.newSingleThreadScheduledExecutor();

    static {
        CLEANUP_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                synchronized (referenceConfigMap) {
                    long now = System.currentTimeMillis();
                    for (Iterator<Map.Entry<String, CacheEntry>> iter = referenceConfigMap.entrySet().iterator(); iter.hasNext(); ) {
                        Map.Entry<String, CacheEntry> entry = iter.next();
                        if (referenceConfigMap.size() > MAX_CACHE_SIZE || (now - entry.getValue().getCreatedTimestamp()) > (CACHE_EXPIRE_TIME * 1000)) {
                            iter.remove();
                        } else {
                            break;
                        }
                    }
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

    public static <T> T getReference(Class<T> clazz, RegistryConfig registry, String group, String version, Integer timeout) {
        //加后缀用于区分不同端
        String cacheKey = clazz.getName()   "_"   registry.getId();

        synchronized (referenceConfigMap) {
            // 获取缓存中的服务引用实例
            CacheEntry cacheEntry = referenceConfigMap.get(cacheKey);
            if (cacheEntry != null) {
                // 如果缓存中存在,则更新时间戳并返回实际服务引用
                cacheEntry.setCreatedTimestamp(System.currentTimeMillis());
                return ((ReferenceConfig<T>) cacheEntry.getReferenceConfig()).get();
            }

            // 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中
            ReferenceConfig<T> referenceConfig = new ReferenceConfig<>();

            referenceConfig.setInterface(clazz);
            // 设置重试次数
            referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES);

            //不需要检查服务提供者是否存在
            referenceConfig.setCheck(false);
            referenceConfig.setRegistry(registry);
            // 延迟初始化服务引用实例
            referenceConfig.setLazy(true);
            // 声明为泛化接口
            referenceConfig.setGeneric("true");
            if (StrUtil.isNotBlank(group)) {
                referenceConfig.setGroup(group);
            }
            if (StrUtil.isNotBlank(version)) {
                referenceConfig.setVersion(version);
            }
            if (ObjectUtil.isNotNull(timeout)) {
                // 超时时间
                referenceConfig.setTimeout(timeout);
            }

            cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig);
            referenceConfigMap.put(cacheKey, cacheEntry);

            if (referenceConfigMap.size() > MAX_CACHE_SIZE) {
                // 如果缓存数量超过限制,则清理最老的一个元素
                String oldestKey = referenceConfigMap.keySet().iterator().next();
                referenceConfigMap.remove(oldestKey);
            }

            return referenceConfig.get();
        }
    }

    public static <T> T getReference(Class<T> clazz, RegistryConfig registry, String group, String version, Integer timeout, String url) {
        //加后缀用于区分不同端
        String cacheKey = clazz.getName()   "_"   registry.getId();

        synchronized (referenceConfigMap) {
            // 获取缓存中的服务引用实例
            CacheEntry cacheEntry = referenceConfigMap.get(cacheKey);
            if (cacheEntry != null) {
                // 如果缓存中存在,则更新时间戳并返回实际服务引用
                cacheEntry.setCreatedTimestamp(System.currentTimeMillis());
                return ((ReferenceConfig<T>) cacheEntry.getReferenceConfig()).get();
            }

            // 如果缓存中不存在,则创建一个新的 ReferenceConfig 对象并加入到缓存中
            ReferenceConfig<T> referenceConfig = new ReferenceConfig<>();

            referenceConfig.setInterface(clazz);
            // 设置重试次数
            referenceConfig.setRetries(ApiConstant.MAX_REGISTRY_RETRIES);

            //不需要检查服务提供者是否存在
            referenceConfig.setCheck(false);

            // 延迟初始化服务引用实例
            referenceConfig.setLazy(true);
            // 声明为泛化接口
            referenceConfig.setGeneric("true");
            if (StrUtil.isNotBlank(group)) {
                referenceConfig.setGroup(group);
            }

            if (ObjectUtil.isNotNull(timeout)) {
                // 超时时间
                referenceConfig.setTimeout(timeout);
            }

            referenceConfig.setUrl(url);
            cacheEntry = new CacheEntry(System.currentTimeMillis(), referenceConfig);
            referenceConfigMap.put(cacheKey, cacheEntry);

            if (referenceConfigMap.size() > MAX_CACHE_SIZE) {
                // 如果缓存数量超过限制,则清理最老的一个元素
                String oldestKey = referenceConfigMap.keySet().iterator().next();
                referenceConfigMap.remove(oldestKey);
            }

            return referenceConfig.get();
        }
    }

    private static class CacheEntry {
        private long createdTimestamp;
        private ReferenceConfig<?> referenceConfig;

        public CacheEntry(long createdTimestamp, ReferenceConfig<?> referenceConfig) {
            this.createdTimestamp = createdTimestamp;
            this.referenceConfig = referenceConfig;
        }

        public long getCreatedTimestamp() {
            return createdTimestamp;
        }

        public void setCreatedTimestamp(long createdTimestamp) {
            this.createdTimestamp = createdTimestamp;
        }

        public ReferenceConfig<?> getReferenceConfig() {
            return referenceConfig;
        }
    }
}
代码语言:javascript复制
package com.jinw.utils.rpc;

import com.jinw.constant.ApiConstant;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.service.GenericService;

import javax.annotation.Nullable;

/**
 * 功能描述: 基于 Dubbo 泛化调用特性的远程调用
 *
 * @author 20024968@cnsuning.com
 * @version 1.0.0
 */
public class DubboRpcClient {

    private RegistryConfig registry;


    public DubboRpcClient(ApplicationConfig application, RegistryConfig registry) {
        this.registry = registry;
        ApplicationModel.getConfigManager().setApplication(application);
    }

    public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args) {
        return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, null, null);
    }

    public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, String version) {
        return genericInvoke(interfaceClass, methodName, parameterTypes, args, null, version, ApiConstant.MAX_REFERENCE_TIMEOUT);
    }

    /**
     * @return java.lang.Object
     * @Author @liuxin
     * @Description dubbo泛化 rpc调用
     * @Date 2023/4/23 16:42
     */
    public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout) {
        GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout);
        Object $invoke = genericService.$invoke(methodName, parameterTypes, args);
        // 重置RpcContext对象,避免其他的调用受到影响
        RpcContext.getContext().clearAttachments();
        return $invoke;
    }

    /**
     * @return java.lang.Object
     * @Author @liuxin
     * @Description 通过dubbo的url进行调用
     * @Date 2023/4/23 16:42
     */
    public Object genericInvoke(Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args, @Nullable String group, @Nullable String version, @Nullable Integer timeout, @Nullable String url) {
        GenericService genericService = (GenericService) DubboReferenceCache.getReference(interfaceClass, registry, group, version, timeout, url);
        Object $invoke = genericService.$invoke(methodName, parameterTypes, args);
        // 重置RpcContext对象,避免其他的调用受到影响
        RpcContext.getContext().clearAttachments();
        return $invoke;
    }
}
代码语言:javascript复制
package com.jinw.utils.rpc;

import com.alibaba.fastjson.JSON;
import com.jinw.constant.ApiConstant;
import com.jinw.system.entity.SysUser;
import com.jinw.utils.SecurityUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;

/**
 * @ClassName RpcUserUtils
 * @Description TODO
 * @Author @liuxin
 * @Date 2023/4/4 17:29
 * @Version 1.0
 */
public class RpcUtils {
    public static SysUser getCurrentUser() {
        RpcContext context = RpcContext.getContext();
        String currentUser = context.getAttachment(ApiConstant.CURRENT_USER);
        if (StringUtils.isNotBlank(currentUser)) {
            return JSON.parseObject(currentUser, SysUser.class);
        }

        if (StringUtils.isBlank(currentUser)) {
            try {
                return SecurityUtils.getCurrentUser();
            } catch (Exception e) {
                throw new RpcException("Rpc获取用户信息失败!");
            }
        }
        return null;
    }

}
代码语言:javascript复制
 private Object getGenericInvokeResult(Integer partyId, Class<?> interfaceClass, String methodName, String[] parameterTypes, Object[] args) {
        String[] zookeeperAddressSpilts = zookeeperAddress.replaceAll("zookeeper://", "").split(":");
        RegistryService registryService = ExtensionLoader.getExtensionLoader(RegistryFactory.class)
                .getExtension("zookeeper")
                .getRegistry(new URL("zookeeper",
                        zookeeperAddressSpilts[0],
                        Integer.valueOf(zookeeperAddressSpilts[1])));
        URL serviceUrl = new URL("dubbo", zookeeperAddressSpilts[0], 0,
                interfaceClass.getName());
        serviceUrl = serviceUrl.addParameter("version", ApiConstant.VERSION);  // 在URL中添加版本号参数
        // 获取可用的服务列表
        List<URL> urlList = registryService.lookup(serviceUrl).stream().filter(url -> url.getParameter("application").contains(partyId.toString())).collect(Collectors.toList());
        // 判断服务列表中是否包含指定的服务
        if (ObjectUtil.isNotEmpty(urlList)) {
            DubboRpcClient rpc = createDubboRpc(partyId);
            // 调用提供的方法
            Object result = rpc.genericInvoke(interfaceClass, methodName, parameterTypes, args, null, ApiConstant.VERSION, ApiConstant.MAX_REFERENCE_TIMEOUT, urlList.get(0).toString());
            return result;
        } else {
            throw new RpcException("未找到可用服务!");
        }
    }

    private DubboRpcClient createDubboRpc(Integer partyId) {
        ApplicationConfig application = new ApplicationConfig("kingow-oa-client-"   partyId);
        RegistryConfig registry = new RegistryConfig();
        registry.setProtocol("zookeeper");
        registry.setAddress(zookeeperAddress);
        registry.setTimeout(ApiConstant.MAX_REGISTRY_TIMEOUT);
        registry.setId("kingow-oa-client-"   partyId);
        DubboRpcClient rpc = new DubboRpcClient(application, registry);
        return rpc;
    }
代码语言:javascript复制
dubbo:
  application:
    # 服务名称,保持唯一
    name: kingow-oa-client-9999
    # zookeeper地址,用于向其注册服务
  registry:
    protocal: zookeeper
    address: zookeeper://192.168.2.122:2181
    timeout: 5000  # 如果zookeeper是放在远程服务器上超时时间请设置长一些,不然很容易超时连接失败
  protocol:
    name: dubbo
    port: -1
#  consumer:
#    filter: dubboAccountFilter
#  provider:
#    filter: dubboAccountFilter

client:
 file:
  #本地文件上传路径配置
  uploadDir: D:clientdataupload
  #python文件路径
 python:
   fileDir: D:clientpython
  #shell文件路径
 shell:
    fileDir: D:clientshell

#AT 模式
#seata 管理面板 http://192.168.2.122:7091
seata:
  enabled: true
  application-id: kingow-oa-client-9999
  tx-service-group: kingow-oa-client-9999-group
  enable-auto-data-source-proxy: true
  client:
    support:
      spring:
        datasource-autoproxy: true
  registry:
    type: zk
    zk:
      server-addr: 192.168.2.122:2181
      connect-timeout: 2000
      session-timeout: 6000
      username: ""
      password: ""
  service:
    grouplist:
      default: 192.168.2.122:8091
    vgroup-mapping:
      kingow-oa-client-9999-group: default

dubbo的拦截器

代码语言:javascript复制
package com.jinw.api.filter;

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.jinw.constant.ApiConstant;
import com.jinw.system.entity.SysUser;
import com.jinw.utils.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;

/**
 * @Description dubbo拦截
 * @Author ljh
 * @Date 2020:09:27 11:47
 */
//https://blog.csdn.net/qq_36882793/article/details/118111841
//https://saint.blog.csdn.net/article/details/124110926?spm=1001.2101.3001.6650.8&utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-8-124110926-blog-89025640.235^v31^pc_relevant_default_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-8-124110926-blog-89025640.235^v31^pc_relevant_default_base3&utm_relevant_index=15
//https://segmentfault.com/a/1190000040164855?utm_source=sf-similar-article
@Slf4j
@Activate(group = {CommonConstants.CONSUMER, CommonConstants.PROVIDER}, order = -30000)
public class DubboAccountFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
        if (ObjectUtil.isNotEmpty(rpcContext.getObjectAttachments())) {
            if (ObjectUtil.isNotEmpty(rpcContext.getAttachment(ApiConstant.CURRENT_USER))) {
                rpcContext.setAttachment(ApiConstant.CURRENT_USER, rpcContext.getAttachment(ApiConstant.CURRENT_USER));
                log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
            } else {
                SysUser user = null;
                try {
                    user = SecurityUtils.getCurrentUser();
                } catch (Exception e) {
                }
                // 设置参数
                rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user));
                log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
            }
        } else {
            SysUser user = null;
            try {
                user = SecurityUtils.getCurrentUser();
            } catch (Exception e) {
            }
            // 设置参数
            rpcContext.setAttachment(ApiConstant.CURRENT_USER, JSON.toJSONString(user));
            log.debug(" DubboAccountFilter {} {} ", rpcContext.getMethodName(), rpcContext.getAttachment(ApiConstant.CURRENT_USER));
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            // 清空CURRENT_USER参数
            // rpcContext.removeAttachment(ApiConstant.CURRENT_USER);
        }
    }
}

0 人点赞