如何对第三方相同请求进行筛选过滤

2023-06-01 22:52:01 浏览数 (1)

问题背景

公司内部多个系统共用一套用户体系库,对外(钉钉)我们是两个客户身份(这里是根据系统来的),例如当第三方服务向我们发起用户同步请求:是一个更新用户操作,它会同时发送一个 delete 和 insert 请求,这两个请求几乎是并发进来的,实际上应该是先发起的delete 再 insert, 实际情况可能和网络延迟也有关系,此时在我们系统中就无法保证这两个请求的顺序执行,即先 delete 处理完之后 再进行 insert 的数据处理(正常流程),又或者直接把一定时间内同一个用户的 delete 和 insert 操作合并为一个update操作(本质就是更新操作)。

还有一种情况是:第三方系统中添加或者 删除一个用户时,会以两个客户的身份去发送两个相同的用户同步请求,但同一个用户在我们系统内用户数据只有一份,对应的接口肯定也都是同一个,即相同的添加接口会在一瞬间被调用两次,删除即使执行两次的话也没什么问题,问题是添加 即使在添加前判断了用户账号是否存在 并发过来的情况下还是避免不了一些脏数据的产生,加锁的话对整体影响又特别大。

处理思路

根据userId(账号)为每个请求分配一个房间(单独的线程),如果是第一次进来那么就new一个房间(也就是类,里边会有一个单独的线程处理这个用户的行为),后边一定时间内相同的 userId 进来会找到对应已存在的房间,当设置的时间窗口到了之后,判断当前userId的同步行为有哪些,如果有 insert 和 delete,那么直接转为 update 操作。如果是两个insert行为,那么最后就只调用一次insert服务,如果是两个delete行为,那么就只调用一个delete服务。

注意事项

时间窗口的设定,如果时间设置过短,属于同一个操作的请求因为网络波动 请求到接口的时间会有一定间隔,如果你设置的时间间隔小于等待的时间,还是会把本就属于同一批次的操作 <font color=blue>多次处理</font>

测试过程:刚开始时间设置的1500ms,也就是当第一个userId进来后,等待1.5秒后根据这段时间内收集到的用户行为再去真正的处理,后来在测试中发现有些本就属于同一批次的请求还是会被处理多次,也就是时间调小了,改成2000ms,测试还是发现同样的问题。最后:采取的是根据最近一个的userId请求的时间 等待1500ms,即相同的userId的请求进来后 在当前时间再重新计算等待1500ms,时间到了之后没有发现新的用户行为即算是一个批次结束

ps:可以创建一个单独的服务来负责对请求进行合理的处理分发,处理之后再去调用对应的业务系统服务

代码实现

定义操作行为枚举

代码语言:java复制
public enum OperationEnum {

    INSERT("insert"),
    DELETE("delete"),
    ;
    private final String value;

    OperationEnum(String operation) {
        this.value = operation;
    }

    public String getValue() {
        return value;
    }
}

定义每个用户所属的房间,房间内存储用户的多个行为(insert、delete):

代码语言:java复制
public class ActionRoomBean {

    //用于保存有效事件数据 <insert or delete, data>
    private Map<String, JSONObject> actionDataMap = new HashMap<>();
    private String userId;
    //真正负责处理事件的线程
    private DispatchTask dispatchTask;

    /**
     * 定义操作方法 排队接收
     * @param action 请求动作:insert 或者 delete
     * @param data 请求参数
     */
    public void addAction(String action, JSONObject data) {
        //有新请求进来后 计数器   1
        if(dispatchTask != null){
            dispatchTask.getIncrementAndGet();
        }
        //如果包含直接跳出
        if (actionDataMap.containsKey(action)) {
            return;
        }
        actionDataMap.put(action, data);
    }

    public ActionRoomBean() {
    }
    /**
     * 有参构造
     * @param userId 用户账号
     * @param actionDataMap 操作类型,请求参数
     */
    public ActionRoomBean(String userId, Map<String, JSONObject> actionDataMap) {
        this.actionDataMap = actionDataMap;
        this.userId = userId;
    }
    /**
     * 创建完这个类的实例后,要先调用startManager方法 启动线程
     */
    public void startManager() {
        dispatchTask = new DispatchTask(userId, actionDataMap);
        new Thread(dispatchTask).start();
    }
}

房间内真正的执行者(子线程):

代码语言:java复制
public class DispatchTask implements Runnable {
    //等待的时间窗口
    private static long sleepTime = 1500;
    //计数器,用户有新的行为之后  1,用来控制是否继续等待(sleep)
    private final AtomicInteger count = new AtomicInteger(0);
    //用于保存有效事件数据 <insert or delete, data>,与 ActionRoomBean中的 actionDataMap 指向的是同一个地址
    Map<String, JSONObject> actionDataMap;
    //用户账号
    String userId;
    /**
     * 有参构造
     */
    public DispatchTask(String userId, Map<String, JSONObject> dataLib) {
        this.userId = userId;
        this.actionDataMap = dataLib;
    }

    @Override
    public void run() {
        try {
            //线程等待前的数量和休眠后被唤醒的数量做对比,如果不相等说明休眠时间内有新的用户行为,则进入循环继续sleep
            int afterCount = 0;
            while (afterCount == 0 || afterCount != count.get()){
                //每休眠一次  1,如果下次循环的值与  1之后的afterCount相等,说明时间窗口内没有新的行为,则不循环
                afterCount = count.incrementAndGet();
                Thread.sleep(sleepTime);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            String url = RestTemplateUtil.DD_READING_API_URL;
            JSONObject param = null;
            // 只有添加操作
            if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.size() == 1) {
                url  = "/nc/eduUserInsert";
                param = actionDataMap.get(OperationEnum.INSERT.getValue());
                if (param != null) {
                    RestTemplateUtil.postForObject(url, param.toJSONString());
                }
            } else if (actionDataMap.containsKey(OperationEnum.DELETE.getValue()) && actionDataMap.size() == 1) {
                //只有删除操作
                url  = "/nc/eduUserDelete";
                param = actionDataMap.get(OperationEnum.DELETE.getValue());
                if (param != null) {
                    RestTemplateUtil.postForObject(url, param.toJSONString());
                }
            } else if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.containsKey(OperationEnum.DELETE.getValue())) {
                //既有添加又有删除,就是更新处理
                url  = "/nc/eduUserUpdate";
                param = actionDataMap.get(OperationEnum.INSERT.getValue());
                if (param != null) {
                    RestTemplateUtil.postForObject(url, param.toJSONString());
                }
            }
        } finally {
            //最后从全局变量中删除userId
            DispatchController.closeRoom(userId);
        }
    }
    /**
     * 计数器   1
     */
    public Integer getIncrementAndGet() {
        return count.incrementAndGet();
    }
}

控制器:

代码语言:java复制
@RestController
@RequestMapping(value = "/api/dd")
public class DispatchController {
    //全局map,记录当前有多少个用户正在被处理中
    private final static Map<String, ActionRoomBean> allMap = new ConcurrentHashMap<>();
    //简单的配置的密钥,用于接口的身份校验
    @Value("${url.secret}")
    private String secret;

    /**
     * insert和 delete 操作都会进入这个接口,用 operation 区分当前是什么操作
     */
    @PostMapping(value = "/dispatch")
    public Result dispatch(@RequestBody JSONObject jsonObject,
                           @RequestHeader(value = "secret") String secret){
        //进行简单的接口身份校验
        if(!Objects.equals(secret, this.secret)){
            return Result.generateError("secret eroor");
        }
        String userId = jsonObject.getString("userId");
        //operation = insert 或者 delete
        String operation = jsonObject.getString("operation");
        if(EmptyUtil.isNotEmpty(userId) && EmptyUtil.isNotEmpty(operation)){
            //调用进入房间的方法
            unboltRoom(userId, operation, jsonObject);
        }
        return Result.generateSuccess();
    }

    /**
     * 为每个userId创建一个实例(房间)
     * 这里决定了是创建一个新的房间还是进入到已有的房间中
     */
    private void unboltRoom(String userId, String operation, JSONObject jsonObject) {
        //加锁处理,由于真正的执行是在子线程中 所以加锁对整体性能影响也不是很大
        //主要是避免:同一个userId创建了多个实例,即使map中key不可重复,也会造成请求丢失
        //例如:同一个userId进来insert和delete请求各一个,并发不加锁的情况下就有可能创建了两个实例
        synchronized (this) {
            ActionRoomBean room = allMap.get(userId);
            //如果全局map中没有,说明是这个userId是第一个进来
            if (room == null) {
                Map<String, JSONObject> actionMap = new HashMap<>(4);
                actionMap.put(operation, jsonObject);
                room = new ActionRoomBean(userId, actionMap);
                //开启计时
                room.startManager();
                //放入到全局map中
                allMap.put(userId, room);
            }
            //如果有,直接调用 addAction方法
            room.addAction(operation, jsonObject);
        }
    }
    /**
     * 当前批次处理完之后,从集合中删除用户实例
     */
    public static void closeRoom(String userId) {
        allMap.remove(userId);
    }
}

整体核心代码就是上边这些,以上还可以通过线程池去优化一下。

如果涉及到批量导入,同时有大量用户同步数据过来,就需要在测试环境进行反复测试 看是否会丢数据(因为每个用户都是一个独立的子线程),对线程的数量进行优化。

0 人点赞