大家好,又见面了,我是你们的朋友全栈君。
声明:本篇博客是在阅读了引用博客的两篇文章后做了简短的概括与归纳,只作为自己笔记
文章目录- 一、思想
- 二、工作窃取算法
- 三、demo用例
- 四、关键组件
- ForkJoinPool
- ForkJoinTask
- ForkJoinWorkerThread
- WorkQueue
- 五、Fork/Join运行流程图
- 任务提交
- 创建线程signalWork方法
- 任务执行
- 六、引用博客
- ForkJoinPool
- ForkJoinTask
- ForkJoinWorkerThread
- WorkQueue
- 任务提交
- 创建线程signalWork方法
- 任务执行
一、思想
Fork/Join是Java7提供的并行执行任务的框架,是一个把大人物分割成若干小任务,最终汇总小任务的结果得到大任务结果的框架
小任务可以继续拆分为更小的任务
二、工作窃取算法
1、工作窃取会选择双端队列作为存储任务的数据结构,默认正常线程会选择LIFO(栈获取)的方式,从当前双端队列的尾部获取任务;窃取线程会选择FIFO(队列获取)方式,从当前双端队列的头部获取任务
默认添加元素是从双端队列的尾部添加元素
三、demo用例
通过Fork/Join并行计算1 2 3 4
代码语言:javascript复制import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class ForkjoinDemo extends RecursiveTask<Long> {
private long start;
private long end;
public static final int THRESHOLD = 2;
public ForkjoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean flag = (end - start) <= THRESHOLD;
if (flag) {
for (long i = start; i <= end; i ) {
sum = i;
}
} else {
long middle = (end start) / 2;
// divide task
ForkjoinDemo leftTask = new ForkjoinDemo(start, middle);
ForkjoinDemo rightTask = new ForkjoinDemo(middle 1 , end);
// execute sub task
leftTask.fork();
rightTask.fork();
// get result sub task
long left = leftTask.join();
long right = rightTask.join();
sum = left right;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkjoinDemo task = new ForkjoinDemo(1, 4);
Future<Long> result = pool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
四、关键组件
ForkJoinPool
(一)线程池的作用可能是用来合理分配线程资源: 1、接受外部任务的提交(外部调用ForkJoinPool的invoke/execute/submit方法提交任务); 2、接受ForkJoinTask自身fork出的子任务的提交; 3、任务队列数组(WorkQueue[])的初始化和管理; 4、工作线程(Worker)的创建/管理 (二)提交方式的差异: invoke:同步提交,有返回值,任务执行完成后返回 submit:异步提交,有返回值,调用线程立即返回 execute:异步提交,无返回值,调用线程立即返回 (三)初始化线程池方式 1、FIFO_QUEUE:先进先出,异步模式 2、LIFO_QUEUE:(默认)先进后出,同步模式
ForkJoinTask
(一)子类 1、RecursiveAction:用于没有返回结果的任务 2、RecursiveTask:用于有返回结果的任务 (二)函数 1、fork()拆分为子任务 2、join()合并子任务
ForkJoinWorkerThread
任务处理原则:首先根据同步/异步模式从任务队列选择任务,如果完成自身任务,通过窃取算法获取其他线程的任务
WorkQueue
底层是通过数组实现的双端队列,容量为2的幂次,任务队列在首次调用线程池外部方法提交任务之后初始化任务队列,通过ThreadLocalRandom.probe来计算出任务队列在数组中的索引位置(外部方法调用产生的索引一定是偶数),没有绑定工作线程
1、有工作线程(Worker)绑定的任务队列:数组下标始终是奇数,称为task queue,该队列中的任务均由工作线程调用产生(工作线程调用FutureTask.fork方法); 2、没有工作线程(Worker)绑定的任务队列:数组下标始终是偶数,称为submissions queue,该队列中的任务全部由其它线程提交(也就是非工作线程调用execute/submit/invoke或者FutureTask.fork方法)。
五、Fork/Join运行流程图
任务提交
(一)外部提交任务 1、提交任务命中,externalPush 2、提交任务未命中,externalSubmit CASE1:线程池已经关闭,则执行终止操作,并拒绝该任务的提交; CASE2:线程池未初始化,则进行初始化,主要就是初始化任务队列数组; CASE3:命中了任务队列,则将任务入队,并尝试创建/唤醒一个工作线程(Worker); CASE4:未命中任务队列,则在偶数索引处创建一个任务队列 (二)工作线程fork任务:直接push进当前任务队列
创建线程signalWork方法
1、工作线程数不足: 创建一个工作线程: (1)通过createWorker方法通过线程池工厂创建线程,在线程创建的过程中,registerWorker会创建一个工作队列与工作线程绑定,创建成功,则start线程; (2)创建失败:则deregisterWorker注销该工作线程,清除已关闭的工作线程或回滚创建线程之前的操作,并把传入的异常抛给 ForkJoinTask 来处理。 2、工作线程数足够:唤醒一个空闲(阻塞)的工作线程。
任务执行
1、runWorker执行任务 (1)scan()窃取任务:随机选择一个任务队列,获取底部任务,任务数大于1,则signalWork创建或唤醒其他工作线程。 (2)成功窃取runTask: a.执行窃取任务:调用FutureTask.deExec()执行任务,其内部会调用FutureTask.exec()方法,该方法为抽象方法,由子类实现。 b.工作线程还会执行自己队列中的任务,即WorkQueue.execLocalTasks (3)未成功窃取awaitWork:没有任务则阻塞
六、引用博客
极力推荐并发系列博客:https://segmentfault.com/a/1190000016781127
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/191329.html原文链接:https://javaforall.cn