工具包 java-dataloader 异步批处理装载数据

2022-09-19 17:55:17 浏览数 (1)

一个小巧简单的工具包,主要解决应用程序组装数据,提供一个简单的通用的API,通过异步批处理方式,减少通信问题。

该工具包是从graphql工具中抽出,在graphql里查询数据,很容易导致"n 1"获取问题。比如用户列表需要查询对应的部门,有些用户的部门相同,会被多次查询。

Github:源码地址

特点

  • 使用简单直观
  • 使用队列临时存储查询健,之后进行批量查询,减少查询开销
  • 请求返回CompleteableFuture<V>可以做后续处理
  • 支持缓存,数据可以只获取一次
  • 可自定义缓存

原理

使用CompletableFuture异步加载实现。通过API将每组结果使用CompletableFuture包装,并将关键Key存储到队列中, 当Key的数量达到一定量或一定时间时后,再调用异步批量查询方法,然后再把结果回调给每组结果的CompletableFuture。

如何使用

代码语言:javascript复制
BatchLoader<Long, Department> departmentBatchLoader = new BatchLoader<Long, Department>() {
	@Override
	public CompletionStage<List<Department>> load(List<Long> departmentIds) {
		return CompletableFuture.supplyAsync(() -> {
			return departmentManager.loadDepartmentById(departmentIds);
		});
	}
};

DataLoader<Long, Department> departmentLoader = DataLoaderFactory.newDataLoader(departmentBatchLoader);
List<User> userList = userManager.getAllUser();
userList.foreach(user ->{
		departmentLoader.loader(user.getDepartmentId).thenAccept(department ->{
			user.setDepartment(department);
		});
});

避坑

批量处理BatchLoader

该方式是以List<key>形式入参,返回结果顺序,需要与入参时的Key顺序形同,否则会因顺序不同或返回数量不同,导致错误。 如对List结果无法确定,推荐MappedBatchLoader实现,该方式通过key进行匹配,无需担心顺序与结果。

使用List形式如传入key为:[2, 9, 6, 1]

正确例子

代码语言:javascript复制
  [
     { id: 2, name: '张三' },
     { id: 9, name: '李四' },
     { id: 6, name: '王五' },
     { id: 1, name: '赵六' }
  ]
  或
  [
     { id: 2, name: '张三' },
     null,
     { id: 6, name: '王五' },
     { id: 1, name: '赵六' }
  ]

错误例子

代码语言:javascript复制
//传入key与结果数量不符,直接报错
  [
     { id: 2, name: '张三' },
     { id: 6, name: '王五' },
     { id: 1, name: '赵六' }
  ]

//传入key与结果顺序不同,会导致数据关联错误,如第一个2实际关联的是1
 [
    { id: 1, name: '赵六' },
    { id: 2, name: '张三' },
    { id: 6, name: '王五' },
    { id: 9, name: '李四' }
 ]

缓存

使用dataloader缓存尽量保证所查数据是无状态化,避免因状态数据实时要求高导致业务异常。

Dataloader中使用了两个缓存

代码语言:javascript复制
//结果缓存成功执行批处理后返回的CompletableFuture结果。
//如果你在设计中DataLoader是单例模式,该缓存因在内存常驻无法清除,会导致永远使用缓存数据,建议不使用CacheMap,只使用ValueCache
private final CacheMap<Object, V> futureCache;

//是批处理结果值的缓存,在执行批处理之前,会先在valueCache中查找,减少IO请求
private final ValueCache<K, V> valueCache;

0 人点赞