批量处理工具类

2021-12-13 13:56:12 浏览数 (2)

业务开发中,时常会批量执行任务,例如批量同时调用4个http接口或者rpc接口,这类业务代码执行具有通用性,为了提高开发效率、可复用性、可扩展性,简化代码,抽象出通用的工具类,方便开发同学使用。使用者只关心入参、具体任务执行、以及任务执行结果、线程池,并不关心批量处理的过程。

任务处理流程图

代码

代码语言:javascript复制
public class BatchQuery {

    /**
     * 并行且异步处理结果
     *
     * @param tasks    任务列表
     * @param p        参数
     * @param handle   具体业务处理
     * @param complete 完成处理逻辑
     * @param executor 线程池
     * @param <T>
     * @param <P>
     * @param <R>
     */
    public static <T, P, R> void asyncQueryHandleAsync(List<T> tasks, P p, Function<T, P, R> handle,
        BiConsumer<R, Throwable> complete, Executor executor) {

        Objects.requireNonNull(p);

        Optional.ofNullable(tasks).ifPresent(task -> {

            val cfs = task.stream()
                .map(t ->
                    CompletableFuture.supplyAsync(
                        () -> handle.apply(t, p), executor).whenCompleteAsync(complete)
                ).toArray(CompletableFuture[]::new);

            //等待总任务完成
            CompletableFuture.allOf(cfs).join();
        });
    }

    /**
     * 并行且同步处理结果
     *
     * @param tasks    任务列表
     * @param p        参数
     * @param handle   具体业务处理
     * @param complete 完成处理逻辑
     * @param executor 线程池
     * @param <T>
     * @param <P>
     * @param <R>
     */
    public static <T, P, R> void asyncQueryHandleSync(List<T> tasks, P p, Function<T, P, R> handle,
        BiConsumer<R, Throwable> complete, Executor executor) {

        Objects.requireNonNull(p);

        Optional.ofNullable(tasks).ifPresent(task -> {

            val cfs = task.stream()
                .map(t ->
                    CompletableFuture.supplyAsync(
                        () -> handle.apply(t, p), executor).whenComplete(complete)
                ).toArray(CompletableFuture[]::new);

            //等待总任务完成
            CompletableFuture.allOf(cfs).join();
        });
    }
}


@FunctionalInterface
public interface Function<T, P, R> {

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @param p
     * @return the function result
     */
    R apply(T t, P p);
}

使用示例

代码语言:javascript复制
 List<OrderDTO> orderDTOS = Lists.newArrayList();
        try {
            BatchQuery.asyncQueryHandleSync(BIZ_TYPE_LIST, onTheWayOrderSwitchProcessCtx.getOnTheWayOrderSwitchParam(),
                (bizTypeEnum, orderSwitchParam) ->
                    standardApiSupport
                        .getDriverOrderApiByBizType(bizTypeEnum.getValue(), CONFIG)
                        .queryOrderList
                            (
                                OrderConverter
                                    .buildDriverOrderReqDTO(currentStartTime, currentEndTime, bizTypeEnum,
                                        orderSwitchParam)
                            ), (r, ex) -> {
                    if (Objects.isNull(ex) && JudgeIsSuccessUtil.judgeDataNotNull(r)) {
                        orderDTOS.addAll(r.getDataList());
                    }
                }, RPC_SEARCH_EXECUTOR_SERVICE);
        } catch (Exception e) {
            throw new ApiException();
        }

使用者需要传入具体的任务,指定线程池,以及共同参数P,P的存在具有合理性,往往任务会使用共同的参数,因此自定义了函数式接口Function,以及具体的处理handle,handle里可以做差异化处理,执行结果会在complete中拿到,做具体的业务处理,可以大大减少重复代码。

0 人点赞