在Java中,parallelStream 是 Java 8 引入的 Stream API 的一部分,它允许并行处理集合中的元素。默认情况下,parallelStream 共享使用默认的 ForkJoinPool 作为其线程池,可能对你的业务影响性能,而且起不到隔离的作用。所以我们需要自定义其使用的线程池。
下面列出几种方法设置线程池:
一、设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism,修改默认共享的ForkJoinPool 的并行数
代码语言:javascript复制public static void main(String[] args) throws InterruptedException {
List<Integer> list = IntStream.range(1, 50).boxed().collect(Collectors.toList());
list.parallelStream().forEach(t->{
System.out.println(t ":" Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
TimeUnit.HOURS.sleep(1);
}
并行流使用的默认线程池是:ForkJoinPool.commonPool()
代码语言:javascript复制
代码语言:javascript复制
代码语言:javascript复制 ForkJoinPool common的初始化:
其中并行度的值和系统属性:
java.util.concurrent.ForkJoinPool.common.parallelism
有关,如果没配置,则默认和系统cpu核数相关(
Runtime.getRuntime().availableProcessors()获取)。
本机测试,默认并行数为11。
代码语言:javascript复制
System.out.println("ForkJoinPool并行数" ForkJoinPool.getCommonPoolParallelism());
修改系统属性 ,设置并行数为20:
代码语言:javascript复制 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");
System.out.println("ForkJoinPool并行数: " ForkJoinPool.getCommonPoolParallelism());
执行结果:
注意:虽然可以通过设置系统属性修改默认
代码语言:javascript复制ForkJoinPool common的并行数,提高并行度,但是默认共享使用一个
ForkJoinPool起不到隔离作用,择情况而选择使用。
二、在自定义的ForkJoinPool中运行parallel()操作
通过创建新的ForkJoinPool,设置线程池数目:
代码语言:javascript复制ForkJoinPool forkJoinPool = new ForkJoinPool(20);
然后并行流的执行提交给新建的ForkJoinPool执行:
代码语言:javascript复制forkJoinPool.submit
示例:
代码语言:javascript复制package com.renzhikeji.demo;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;
/**
* @author 认知科技技术团队
* 微信公众号:认知科技技术团队
*/
public class JdkDemo {
public static void main(String[] args) throws InterruptedException {
List<Integer> list = IntStream.range(1, 50).boxed().toList();
ForkJoinPool forkJoinPool = new ForkJoinPool(20);
forkJoinPool.submit(() -> list.parallelStream().forEach(t->{
System.out.println(t ":" Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
TimeUnit.HOURS.sleep(1);
}
}
执行结果:
执行原理:
判断当前线程是否ForkJoinWorkerThread,如果时,则使用当前线程绑定的ForkJoinPool即我们自定义创建的去执行任务。
三、小结
java的parallelStream并行流,可能需要开发者自定义线程池,起到提高性能及隔离故障的作用。