IO密集型任务使用Java的parallelStream并行流,提高性能及隔离故障,如何自定义线程池

2024-09-13 13:55:45 浏览数 (1)

在Java中,parallelStream 是 Java 8 引入的 Stream API 的一部分,它允许并行处理集合中的元素。默认情况下,parallelStream 共享使用默认的 ForkJoinPool 作为其线程池,可能对你的业务影响性能,而且起不到隔离的作用。所以我们需要自定义其使用的线程池。

下面列出几种方法设置线程池:

一、设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism,修改默认共享的ForkJoinPool 的并行数

代码语言:javascript复制
public static void main(String[] args) throws InterruptedException {
        List<Integer> list = IntStream.range(1, 50).boxed().collect(Collectors.toList());
        list.parallelStream().forEach(t->{
            System.out.println(t ":"  Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        TimeUnit.HOURS.sleep(1);
    }

并行流使用的默认线程池是:ForkJoinPool.commonPool()

代码语言:javascript复制
代码语言:javascript复制
代码语言:javascript复制
 ForkJoinPool common的初始化:

其中并行度的值和系统属性:

java.util.concurrent.ForkJoinPool.common.parallelism

有关,如果没配置,则默认和系统cpu核数相关(

Runtime.getRuntime().availableProcessors()获取)。

本机测试,默认并行数为11。

代码语言:javascript复制

System.out.println("ForkJoinPool并行数"   ForkJoinPool.getCommonPoolParallelism());

修改系统属性 ,设置并行数为20:

代码语言:javascript复制
 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");
 System.out.println("ForkJoinPool并行数:  "   ForkJoinPool.getCommonPoolParallelism());

执行结果:

注意:虽然可以通过设置系统属性修改默认

代码语言:javascript复制
ForkJoinPool common的并行数,提高并行度,但是默认共享使用一个

ForkJoinPool起不到隔离作用,择情况而选择使用。

二、在自定义的ForkJoinPool中运行parallel()操作

通过创建新的ForkJoinPool,设置线程池数目:

代码语言:javascript复制
ForkJoinPool forkJoinPool = new ForkJoinPool(20);

然后并行流的执行提交给新建的ForkJoinPool执行:

代码语言:javascript复制
forkJoinPool.submit

示例:

代码语言:javascript复制
package com.renzhikeji.demo;


import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    public static void main(String[] args) throws InterruptedException {

        List<Integer> list = IntStream.range(1, 50).boxed().toList();
        ForkJoinPool forkJoinPool = new ForkJoinPool(20);
        forkJoinPool.submit(() -> list.parallelStream().forEach(t->{
            System.out.println(t ":"  Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        TimeUnit.HOURS.sleep(1);
    }

}

执行结果:

执行原理:

判断当前线程是否ForkJoinWorkerThread,如果时,则使用当前线程绑定的ForkJoinPool即我们自定义创建的去执行任务。

三、小结

java的parallelStream并行流,可能需要开发者自定义线程池,起到提高性能及隔离故障的作用。

0 人点赞