背景
公司的一个ETL项目,主要是从Blob上的CSV文件和HDFS平台下载数据并解析后入到业务的Mysql,数据量大概一个小时20个文件左右(基本集中到每个小时的50分左右),每个文件8~20万条数据量,分别入到不同的表, 我们在入库的时候是把文件解析后分成1000条一批批量插入(篇幅有限,这里只聊入库的场景)。用的是jdk1.8的Stream.parallel()的方式并发入库。
问题
运行一段时间后发现随着文件量的增加,入库时间越来越长,分析发现入库线程每个实例入库线程大概8个左右,线程占用满了就相互等待。
问题排查
入库工具代码如下
代码语言:javascript复制//由于插入数据量太大,这里做分段批量插入处理 1000个一批
List<List<T>> dayList = ...
long start = System.currentTimeMillis();
AtomicInteger atomicInteger = new AtomicInteger(0);
//这里异步入库
dayList.parallelStream().forEach(list->{
Integer integer;
try {
//具体的插入库的方法
integer = function.apply(list);
} catch (Exception e) {
log.error("[ListPageHelper.saveBatchOrSize]-------->{}",e.toString());
//线程池连接超时再试一次
if (e instanceof CannotAcquireLockException) {
log.error("[CannotAcquireLockException]-------->{}",e.toString());
integer = function.apply(it);
} else {
throw e;
}
}
atomicInteger.addAndGet(integer);
});
long end = System.currentTimeMillis();
log.info("数据量--------->{},耗时-------->{}",list.size(),(end - start));
之前了解过ForkJoin,实际开发中也没怎么用过,也知道java的Stream.parallel()底层用的是ForkJoin, 但是具体没有看过,刚好借此机会了解一下,就跟着源码看了一下 发现了几个问题
- ForkJoinPool 总的工作线程个数是 (cpu*2-1) main 个线程
- ForkJoinPool变量是全局的,也就是说如果不自己创建整个项目就用这 cpu*2个线程来处理。
- forkjoin 的过程是先fork完后再一一执行 ,其他的需要等待所有线程内的数据遍历后才会被分配到。这句话的意思是,比如:我有两个list:list1(1~1000)、list2(1001-2000)要遍历, 如果list1先进行forEach, 那么list2 会等到list1 放出空闲线程的时候才会开始执行。
「基于以上导致我们的问题如下:」
- 同一个时间只能有限个线程入库(这个量有点少)
- 虽然同一个文件的数据是并发入库的,但是不同文件之前并不是并发入库的(这样不符合我们的业务逻辑,我们业务是所有维度的数据入库完成才对业务有用)
「改进思路(这里只看入库逻辑)」
- 短期应急策略(改动量最小): 依然用这个模式,但是需要不同文件之间,能并行处理。
- 长期策略需要重构: 重构成生产者消费者模式,能自由控制并发下载和并发消费。
问题解决
「方法一:」 提高全局ForkJoinPool的线程数量,这样虽然会提高入库速度依然存在文件之间不能并发执行的问题。
代码语言:javascript复制//这个可以给ForkJoinPool 全局设置20个线程
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
「方法二:」
- 每次执行的时候新建个ForkJoinPool。
- 把Stream.parallel() 里面的任务Join到新建的ForkJoinPool里面(源码角度后面分析)。代码如下 代码如下:
//由于插入数据量太大,这里做分段批量插入处理 1000个一批
List<List<T>> dayList = ......
long start = System.currentTimeMillis();
AtomicInteger atomicInteger = new AtomicInteger(0);
//这里异步入库
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
forkJoinPool.submit(()->{
dayList.parallelStream().filter(CollectionUtils::isNotEmpty).forEach(list->{
Integer integer;
try {
//具体的插入库的方法
integer = function.apply(list);
} catch (Exception e) {
log.error("[ListPageHelper.saveBatchOrSize]-------->{}",e.toString());
//线程池连接超时再试一次
if (e instanceof CannotAcquireLockException) {
log.error("[CannotAcquireLockException]-------->{}",e.toString());
integer = function.apply(it);
} else {
throw e;
}
}
atomicInteger.addAndGet(integer);
});
}).join();
forkJoinPool.shutdown();
long end = System.currentTimeMillis();
log.info("数据量--------->{},耗时-------->{}",list.size(),(end - start));
疑问
为什么在ForkJoinPool的submit 方法里面用parallelStream开启线程会占用ForkJoinPool的线程池里的线程数量?parallelStream的原理这里就不展开说明,这里只分析线程内部是怎么共用ForkJoinPool里面的线程池的。
Fork/Join框架中几个重要的类介绍
「ForkJoinPool」: 实现自ExecutorService
是线程的执行器,其他的一些线程池也都是ExecutorService
的子类。
「ForkJoinTask」: 实现自Future
,可以看成是任务本身。
「ForkJoinWorkerThread」: 是Thread的子类,主要负责执行Runnable
任务。
❝严格意义上ForkJoinTask并不是任务本身,由于他没有实现
Runnable
接口,但是他的子类AdaptedRunnableAction
实现了Runnable
,这里是「适配器模式」赋予ForkJoinFask任务的执行逻辑Runnable
. ❞
ForkJoinPool主要是由WorkQueue[]组成,WorkQueue队列里面存的是ForkJoinTask[]和ForkJoinWorkerThread。而ForkJoinWorkerThread持有ForkJoinPool和WorkQueue的引用。
关系图如下
源码调用逻辑图
说明
submit流程
ForkJoinPool#submit()
通过适配类ForkJoinTask.AdaptedRunnableAction
创建任务并提交。ForkJoinPool#externalPush()
这里判断workQueues是否存在,如果不存在创建WorkQueue[]
。ForkJoinPool#signalWork()
如果活跃线程数量少创建工作线程。ForkJoinPool#createWorker()
创建工作线程。ForkJoinWorkerThread(ForkJoinPool pool)
- 创建
ForkJoinWorkerThread
对象并持有ForkJoinPool
的引用。 - 调用
pool.registerWorker(this)
获得workQueue
对象。
- 创建
registerWorker(ForkJoinWorkerThread wt)
把线程对象注册到WorkQueue,并且把workQueue添加到ForkJoinPool
的WorkQueue[]
中。WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner)
WorkQueue对象持有ForkJoinPool、ForkJoinWorkerThread
对象。
fork流程
ForkJoinTask<V> fork()
调用fork()方法。- 判断调用fork()方法的线程是否是
ForkJoinWorkerThread
,- 如果是直接调用当前线程的workQueue.push方法 「(这里就是为啥,parallelStream开启线程会占用ForkJoinPool线程池的数量)」
- 如果不是调用全局的
ForkJoinPool.common .externalPush(this)
WorkQueue#push(ForkJoinTask<?> task)
这个push方法并不是把任务加入到当前线程的WorkQueue,而是调用ForkJoinPool#signalWork()
方法添加到ForkJoinPool中重新分配到工作线程中的WorkQueue。
join流程
ForkJoinTask#join()
调用doJoin().doJoin()
调用exec()
中真正的执行分片任务的逻辑(这里就不展开细说了)getRawResult()
获取执行的结果
以上就是ForkJoin的大概源码的逻辑,解决上面疑问的逻辑主要是在fork流程的第二步中,判断当前线程是否是ForkJoinWorkerThread
类型的,如果是就把当前线程加入到线程池中,而并发流parallelStream()
中的创建线程提交任务逻辑就是调用fork()方法。