问题背景
公司内部多个系统共用一套用户体系库,对外(钉钉)我们是两个客户身份(这里是根据系统来的),例如当第三方服务向我们发起用户同步请求:是一个更新用户操作,它会同时发送一个 delete 和 insert 请求,这两个请求几乎是并发进来的,实际上应该是先发起的delete 再 insert, 实际情况可能和网络延迟也有关系,此时在我们系统中就无法保证这两个请求的顺序执行,即先 delete 处理完之后 再进行 insert 的数据处理(正常流程),又或者直接把一定时间内同一个用户的 delete 和 insert 操作合并为一个update操作(本质就是更新操作)。
还有一种情况是:第三方系统中添加或者 删除一个用户时,会以两个客户的身份去发送两个相同的用户同步请求,但同一个用户在我们系统内用户数据只有一份,对应的接口肯定也都是同一个,即相同的添加接口会在一瞬间被调用两次,删除即使执行两次的话也没什么问题,问题是添加 即使在添加前判断了用户账号是否存在 并发过来的情况下还是避免不了一些脏数据的产生,加锁的话对整体影响又特别大。
处理思路
根据userId
(账号)为每个请求分配一个房间(单独的线程),如果是第一次进来那么就new
一个房间(也就是类,里边会有一个单独的线程处理这个用户的行为),后边一定时间内相同的 userId 进来会找到对应已存在的房间,当设置的时间窗口到了之后,判断当前userId的同步行为有哪些,如果有 insert 和 delete,那么直接转为 update 操作。如果是两个insert行为,那么最后就只调用一次insert服务,如果是两个delete行为,那么就只调用一个delete服务。
注意事项
时间窗口的设定,如果时间设置过短,属于同一个操作的请求因为网络波动 请求到接口的时间会有一定间隔,如果你设置的时间间隔小于等待的时间,还是会把本就属于同一批次的操作 <font color=blue>多次处理</font>
测试过程:刚开始时间设置的1500ms,也就是当第一个userId进来后,等待1.5秒后根据这段时间内收集到的用户行为再去真正的处理,后来在测试中发现有些本就属于同一批次的请求还是会被处理多次,也就是时间调小了,改成2000ms,测试还是发现同样的问题。最后:采取的是根据最近一个的userId请求的时间 等待1500ms,即相同的userId的请求进来后 在当前时间再重新计算等待1500ms,时间到了之后没有发现新的用户行为即算是一个批次结束
ps:可以创建一个单独的服务来负责对请求进行合理的处理分发,处理之后再去调用对应的业务系统服务
代码实现
定义操作行为枚举
代码语言:java复制public enum OperationEnum {
INSERT("insert"),
DELETE("delete"),
;
private final String value;
OperationEnum(String operation) {
this.value = operation;
}
public String getValue() {
return value;
}
}
定义每个用户所属的房间,房间内存储用户的多个行为(insert、delete):
代码语言:java复制public class ActionRoomBean {
//用于保存有效事件数据 <insert or delete, data>
private Map<String, JSONObject> actionDataMap = new HashMap<>();
private String userId;
//真正负责处理事件的线程
private DispatchTask dispatchTask;
/**
* 定义操作方法 排队接收
* @param action 请求动作:insert 或者 delete
* @param data 请求参数
*/
public void addAction(String action, JSONObject data) {
//有新请求进来后 计数器 1
if(dispatchTask != null){
dispatchTask.getIncrementAndGet();
}
//如果包含直接跳出
if (actionDataMap.containsKey(action)) {
return;
}
actionDataMap.put(action, data);
}
public ActionRoomBean() {
}
/**
* 有参构造
* @param userId 用户账号
* @param actionDataMap 操作类型,请求参数
*/
public ActionRoomBean(String userId, Map<String, JSONObject> actionDataMap) {
this.actionDataMap = actionDataMap;
this.userId = userId;
}
/**
* 创建完这个类的实例后,要先调用startManager方法 启动线程
*/
public void startManager() {
dispatchTask = new DispatchTask(userId, actionDataMap);
new Thread(dispatchTask).start();
}
}
房间内真正的执行者(子线程):
代码语言:java复制public class DispatchTask implements Runnable {
//等待的时间窗口
private static long sleepTime = 1500;
//计数器,用户有新的行为之后 1,用来控制是否继续等待(sleep)
private final AtomicInteger count = new AtomicInteger(0);
//用于保存有效事件数据 <insert or delete, data>,与 ActionRoomBean中的 actionDataMap 指向的是同一个地址
Map<String, JSONObject> actionDataMap;
//用户账号
String userId;
/**
* 有参构造
*/
public DispatchTask(String userId, Map<String, JSONObject> dataLib) {
this.userId = userId;
this.actionDataMap = dataLib;
}
@Override
public void run() {
try {
//线程等待前的数量和休眠后被唤醒的数量做对比,如果不相等说明休眠时间内有新的用户行为,则进入循环继续sleep
int afterCount = 0;
while (afterCount == 0 || afterCount != count.get()){
//每休眠一次 1,如果下次循环的值与 1之后的afterCount相等,说明时间窗口内没有新的行为,则不循环
afterCount = count.incrementAndGet();
Thread.sleep(sleepTime);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
String url = RestTemplateUtil.DD_READING_API_URL;
JSONObject param = null;
// 只有添加操作
if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.size() == 1) {
url = "/nc/eduUserInsert";
param = actionDataMap.get(OperationEnum.INSERT.getValue());
if (param != null) {
RestTemplateUtil.postForObject(url, param.toJSONString());
}
} else if (actionDataMap.containsKey(OperationEnum.DELETE.getValue()) && actionDataMap.size() == 1) {
//只有删除操作
url = "/nc/eduUserDelete";
param = actionDataMap.get(OperationEnum.DELETE.getValue());
if (param != null) {
RestTemplateUtil.postForObject(url, param.toJSONString());
}
} else if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) && actionDataMap.containsKey(OperationEnum.DELETE.getValue())) {
//既有添加又有删除,就是更新处理
url = "/nc/eduUserUpdate";
param = actionDataMap.get(OperationEnum.INSERT.getValue());
if (param != null) {
RestTemplateUtil.postForObject(url, param.toJSONString());
}
}
} finally {
//最后从全局变量中删除userId
DispatchController.closeRoom(userId);
}
}
/**
* 计数器 1
*/
public Integer getIncrementAndGet() {
return count.incrementAndGet();
}
}
控制器:
代码语言:java复制@RestController
@RequestMapping(value = "/api/dd")
public class DispatchController {
//全局map,记录当前有多少个用户正在被处理中
private final static Map<String, ActionRoomBean> allMap = new ConcurrentHashMap<>();
//简单的配置的密钥,用于接口的身份校验
@Value("${url.secret}")
private String secret;
/**
* insert和 delete 操作都会进入这个接口,用 operation 区分当前是什么操作
*/
@PostMapping(value = "/dispatch")
public Result dispatch(@RequestBody JSONObject jsonObject,
@RequestHeader(value = "secret") String secret){
//进行简单的接口身份校验
if(!Objects.equals(secret, this.secret)){
return Result.generateError("secret eroor");
}
String userId = jsonObject.getString("userId");
//operation = insert 或者 delete
String operation = jsonObject.getString("operation");
if(EmptyUtil.isNotEmpty(userId) && EmptyUtil.isNotEmpty(operation)){
//调用进入房间的方法
unboltRoom(userId, operation, jsonObject);
}
return Result.generateSuccess();
}
/**
* 为每个userId创建一个实例(房间)
* 这里决定了是创建一个新的房间还是进入到已有的房间中
*/
private void unboltRoom(String userId, String operation, JSONObject jsonObject) {
//加锁处理,由于真正的执行是在子线程中 所以加锁对整体性能影响也不是很大
//主要是避免:同一个userId创建了多个实例,即使map中key不可重复,也会造成请求丢失
//例如:同一个userId进来insert和delete请求各一个,并发不加锁的情况下就有可能创建了两个实例
synchronized (this) {
ActionRoomBean room = allMap.get(userId);
//如果全局map中没有,说明是这个userId是第一个进来
if (room == null) {
Map<String, JSONObject> actionMap = new HashMap<>(4);
actionMap.put(operation, jsonObject);
room = new ActionRoomBean(userId, actionMap);
//开启计时
room.startManager();
//放入到全局map中
allMap.put(userId, room);
}
//如果有,直接调用 addAction方法
room.addAction(operation, jsonObject);
}
}
/**
* 当前批次处理完之后,从集合中删除用户实例
*/
public static void closeRoom(String userId) {
allMap.remove(userId);
}
}
整体核心代码就是上边这些,以上还可以通过线程池去优化一下。
如果涉及到批量导入,同时有大量用户同步数据过来,就需要在测试环境进行反复测试 看是否会丢数据(因为每个用户都是一个独立的子线程),对线程的数量进行优化。