(十二)J.U.C-ForkJoin

2022-04-13 14:27:31 浏览数 (1)

ForkJoin

ForkJoin是Java7提供的一个并行执行任务的框架,是把大任务分割成若干个小任务,待小任务完成后将结果汇总成大任务结果的框架。主要采用的是工作窃取算法,工作窃取算法是指某个线程从其他队列里窃取任务来执行。

在窃取过程中两个线程会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常我们会使用双端队列来实现工作窃取算法。被窃取任务的线程永远从队列的头部拿取任务,窃取任务的线程从队列尾部拿取任务。

局限性:

1、任务只能使用fork和join作为同步机制,如果使用了其他同步机制,当他们在同步操作时,工作线程就不能执行其他任务了。比如在fork框架使任务进入了睡眠,那么在睡眠期间内在执行这个任务的线程将不会执行其他任务了。 2、我们所拆分的任务不应该去执行IO操作,如读和写数据文件。 3、任务不能抛出检查异常。必须通过必要的代码来处理他们。

框架核心:

核心有两个类:ForkJoinPool | ForkJoinTask ForkJoinPool:负责来做实现,包括工作窃取算法、管理工作线程和提供关于任务的状态以及他们的执行信息。 ForkJoinTask:提供在任务中执行fork和join的机制。

使用方式:(模拟加和运算)

代码语言:javascript复制
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;//设定不大于两个数相加就直接for循环,不适用框架
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i  ) {
                sum  = i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算(分裂算法,可依情况调优)
            int middle = (start   end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle   1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult   rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1 2 3 4...100
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

0 人点赞