业务开发中,时常会批量执行任务,例如批量同时调用4个http接口或者rpc接口,这类业务代码执行具有通用性,为了提高开发效率、可复用性、可扩展性,简化代码,抽象出通用的工具类,方便开发同学使用。使用者只关心入参、具体任务执行、以及任务执行结果、线程池,并不关心批量处理的过程。
任务处理流程图
代码
代码语言:javascript复制public class BatchQuery {
/**
* 并行且异步处理结果
*
* @param tasks 任务列表
* @param p 参数
* @param handle 具体业务处理
* @param complete 完成处理逻辑
* @param executor 线程池
* @param <T>
* @param <P>
* @param <R>
*/
public static <T, P, R> void asyncQueryHandleAsync(List<T> tasks, P p, Function<T, P, R> handle,
BiConsumer<R, Throwable> complete, Executor executor) {
Objects.requireNonNull(p);
Optional.ofNullable(tasks).ifPresent(task -> {
val cfs = task.stream()
.map(t ->
CompletableFuture.supplyAsync(
() -> handle.apply(t, p), executor).whenCompleteAsync(complete)
).toArray(CompletableFuture[]::new);
//等待总任务完成
CompletableFuture.allOf(cfs).join();
});
}
/**
* 并行且同步处理结果
*
* @param tasks 任务列表
* @param p 参数
* @param handle 具体业务处理
* @param complete 完成处理逻辑
* @param executor 线程池
* @param <T>
* @param <P>
* @param <R>
*/
public static <T, P, R> void asyncQueryHandleSync(List<T> tasks, P p, Function<T, P, R> handle,
BiConsumer<R, Throwable> complete, Executor executor) {
Objects.requireNonNull(p);
Optional.ofNullable(tasks).ifPresent(task -> {
val cfs = task.stream()
.map(t ->
CompletableFuture.supplyAsync(
() -> handle.apply(t, p), executor).whenComplete(complete)
).toArray(CompletableFuture[]::new);
//等待总任务完成
CompletableFuture.allOf(cfs).join();
});
}
}
@FunctionalInterface
public interface Function<T, P, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @param p
* @return the function result
*/
R apply(T t, P p);
}
使用示例
代码语言:javascript复制 List<OrderDTO> orderDTOS = Lists.newArrayList();
try {
BatchQuery.asyncQueryHandleSync(BIZ_TYPE_LIST, onTheWayOrderSwitchProcessCtx.getOnTheWayOrderSwitchParam(),
(bizTypeEnum, orderSwitchParam) ->
standardApiSupport
.getDriverOrderApiByBizType(bizTypeEnum.getValue(), CONFIG)
.queryOrderList
(
OrderConverter
.buildDriverOrderReqDTO(currentStartTime, currentEndTime, bizTypeEnum,
orderSwitchParam)
), (r, ex) -> {
if (Objects.isNull(ex) && JudgeIsSuccessUtil.judgeDataNotNull(r)) {
orderDTOS.addAll(r.getDataList());
}
}, RPC_SEARCH_EXECUTOR_SERVICE);
} catch (Exception e) {
throw new ApiException();
}
使用者需要传入具体的任务,指定线程池,以及共同参数P,P的存在具有合理性,往往任务会使用共同的参数,因此自定义了函数式接口Function,以及具体的处理handle,handle里可以做差异化处理,执行结果会在complete中拿到,做具体的业务处理,可以大大减少重复代码。