重学 Java 线程基础之线程池

2023-11-28 15:30:12 浏览数 (1)

1、重学 Java 线程基础之线程池

1.1、什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。

1.2、线程池的优点
  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,使用线程池可以进行统一的分配、调优和监控。
1.3、线程池基本原理

线程池是基于 “池化思想” 的线程管理工具。线程池在启动时会先启动若干个线程,这些线程处于休眠的状态,当有一个新的请求进入,线程池则会唤醒一个线程去处理请求,处理完毕后又会处于休眠状态。

1.4、Executor

线程池的顶级接口,其中只有一个方法:void execute(Runnable command);,作用是代替我们手动创建线程,但是没有返回值,因为 Runnable 的 run 方法就是没有返回值的。

1.5、ExecutorService

Executor的子接口,主要是扩展了父接口,主要是增加了一个 submit 方法,并且这个方法是有返回值的,返回值类型是 Future 类型。

1.6、线程池状态

我们进入到 ThreadPoolExecutor 类中找一下其中定义的线程池状态:

源码给的一些注释:

代码语言:markdown复制
	 *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed


	 * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed

转成图就是这样的:

线程池状态

RUNNING: 接收新的任务和处理队列中的任务。

SHUTDOWN:不接收新的任务,但是处理队列中的任务。

STOP:不接收新任务,也不处理队列中的任务,并且会中断正在运行的任务。

TIDYING:所有的任务都已经完成了,workerCount的数量为0,当线程池状态变成 TIDYING 时,会执行钩子方法 terminated() 。

TERMINATED: 钩子方法 terminated() 执行完了就变成了 TERMINATED 状态了。

我们来实验一下线程池状态改变:

代码语言:text复制
public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 7; i  ) {
            executorService.execute(()->{
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(executorService);
    }

# 打印
java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2

创建一个固定大小的线程池,然后我们添加超过线程池大小的任务,从打印线程池的信息我们可以拿到这些信息:线程状态是 Running ,线程池大小是 5 ,活动线程大小是 5 ,阻塞队列中任务数是 2 ,已完成的任务是 0 。 为什么阻塞队列中的任务数是2呢?因为线程池在处理任务时,当未完成的任务数超过了当前可用的线程数,线程池就会把这些任务放置到阻塞队列中,当活动线程处理完任务再去阻塞队列中取任务,所以以上状态解释说处理队列中的任务就是这些任务。 而且从打印的结果可以看出线程的重复利用。

而且发现没,我们运行 Java 程序,但是直到所有的任务都结束了,我们的主线程都没有结束。这些因为线程池的生命周期和 JVM 的生命周期一样,JVM 什么时候挂,它就什么时候挂。而 JVM 又会因为线程池还在运行导致自己不能正常的退出( JVM 的其中一个退出条件就是用户线程都退出后结束,而线程池默认创建的就是用户线程),这样程序就一直在运行了。但是为什么线程池没有添加任务的时候就会随着运行到主函数末尾而结束呢?

代码语言:java复制
public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        System.out.println(executorService);
    }

#打印
java.util.concurrent.ThreadPoolExecutor@7229724f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]

我们会发现线程池中居然一个线程都没有。线程数我们到相应的线程池中研究了。

RUNNING -> SHUTDOWN

代码语言:java复制
public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i  ) {
            executorService.execute(()->{
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(executorService);
        executorService.shutdown();
        System.out.println(executorService);
        executorService.execute(()->{
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        });
    }

#打印

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.rookie.stream.download.thread.TestThread$$Lambda$2/1650967483@533ddba rejected from java.util.concurrent.ThreadPoolExecutor@5910e440[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.rookie.stream.download.thread.TestThread.main(TestThread.java:23)
java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@5910e440[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
pool-1-thread-1
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4

线程池调用 shutdown() 方法从 Running 变成了 Shutting down状态,然后后面我们再添加任务进线程池就被拒绝了。从打印结果上也没发现后添加任务的打印结果。

RUNNING -> STOP

代码语言:scss复制
 public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 7; i  ) {
            executorService.execute(()->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(executorService);
        executorService.shutdownNow();
        System.out.println(executorService);
    }

# 打印

java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@5910e440[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-2
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

状态从 Running 变成 Shutting down,并且中断了线程,看源码知道它实际上调用的是 t.interrupt(); 方法,因为我们用的是 sleep ,所以线程运行的时候会直接抛出异常,然后打印后面的线程名称,并且在阻塞队列中还有两个任务居然被丢了,看打印的线程池状态(completed tasks = 0)和打印结果发现队列中的任务没有被执行,原来这就是不执行队列任务……

SHUTDOWN -> STOP

代码语言:scss复制
public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 7; i  ) {
            executorService.execute(()->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(executorService);
        executorService.shutdown();
        System.out.println(executorService);
        executorService.shutdownNow();
        System.out.println(executorService);
    }

# 打印
java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@5910e440[Shutting down, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@5910e440[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-5
pool-1-thread-4
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

这个也是一样的。

SHUTDOWN -> TIDYING

代码语言:java复制
public class TestThread extends ThreadPoolExecutor {
    private static TestThread testThread = new TestThread(5, 5,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());

    public TestThread(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("terminated -> " testThread);
    }

    public static void main(String[] args) {

        for (int i = 0; i < 7; i  ) {
            testThread.execute(()->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(testThread);
        testThread.shutdown();
        System.out.println(testThread);
    }
}

# 打印

com.rookie.stream.download.thread.TestThread@5910e440[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
com.rookie.stream.download.thread.TestThread@5910e440[Shutting down, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0]
pool-1-thread-2
pool-1-thread-4
pool-1-thread-5
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
terminated -> com.rookie.stream.download.thread.TestThread@5910e440[Shutting down, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 7]

可以看到 pool size 和 queued tasks 都为 0 了,并且 terminated 也被执行了。

STOP -> TIDYING

这个就不说了,它都把队列中的任务都丢了,所以只要等 pool 中的任务结束了就ok了。

代码语言:java复制
public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 7; i  ) {
            executorService.execute(()->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(executorService);
                System.out.println(Thread.currentThread().getName());
            });
        }
        executorService.shutdownNow();
    }

# 打印
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:11)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:11)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:11)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:11)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.rookie.stream.download.thread.TestThread.lambda$main$0(TestThread.java:11)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.ThreadPoolExecutor@e55ba13[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
pool-1-thread-2
java.util.concurrent.ThreadPoolExecutor@e55ba13[Shutting down, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 1]
pool-1-thread-5
java.util.concurrent.ThreadPoolExecutor@e55ba13[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 2]
pool-1-thread-1
java.util.concurrent.ThreadPoolExecutor@e55ba13[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 3]
pool-1-thread-3
java.util.concurrent.ThreadPoolExecutor@e55ba13[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 4]
pool-1-thread-4

可以发现线程池 stop 后,pool size 不断减少。

1.7、线程池工具类 Executors

Executors 可以快速的帮助我们创建线程池,我们来看一下它有哪些常用的方法吧。

newFixedThreadPool

创建一个固定容量的线程池。其中活动线程数和 pool 线程容量存在限制。所有的线程池都有一个任务队列,使用 BlockingQueue workQueue 作为任务的载体。从上面的演示我们可以发现,如果任务数大于线程池(pool)容量时会将任务放置到任务队列中,当有空闲的线程就会往任务队列中取出任务执行。

代码语言:text复制
ThreadPoolExecutor threadPoolExecutor =(ThreadPoolExecutor) Executors.newFixedThreadPool(1);

可以通过 BlockingQueue<Runnable> getQueue() 取出任务队列,而完成的任务却没有保留在队列中,只能通过 long getCompletedTaskCount() 去获取完成的任务总数。

使用场景:首选推荐 FixedThreadPool ,因为系统与硬件的线程限制,不能让线程无限的创建下去,所以推荐有限制的线程池。

线程池默认容量是 Integer.MAX_VALUE,输出是 2147483647;

用法:

第一种 public static ExecutorService newFixedThreadPool(int nThreads) 我们已经很熟悉了,因为上面使用的都是这种。 我们来看下第二种 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory); 多了个 ThreadFactory 参数,那么这个参数有什么作用呢?

进接口里面看下发现只有一个方法 Thread newThread(Runnable r);

看一下注释:

代码语言:less复制
Constructs a new {@code Thread}.  Implementations may also initialize
priority, name, daemon status, {@code ThreadGroup}, etc.

意思是可以构建一个新的线程,也可以初始化线程的优先级、名称与 daemon 状态。我们看下它的某个实现类里面的实现(CustomizableThreadFactory):

代码语言:text复制
public Thread createThread(Runnable runnable) {
        Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
        thread.setPriority(this.getThreadPriority());
        thread.setDaemon(this.isDaemon());
        return thread;
    }
代码语言:text复制
public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2,
                new CustomizableThreadFactory("线程-"));
        for (int i = 0; i < 4; i  ) {
            threadPoolExecutor.execute(()->{
                System.out.println(Thread.currentThread().getName());
            });
        }
        threadPoolExecutor.shutdown();
    }

# 打印

线程-1
线程-1
线程-1
线程-2

newSingleThreadExecutor

创建单一线程池,里面的线程容量只有 1 ,始终使用的都是那个线程。使用场景是保证任务的执行顺序。

代码语言:text复制
 public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 1000; i  ) {
            int index = i;
            executorService.execute(() -> {
                System.out.println(index);
                System.out.println(Thread.currentThread().getName());
            });
        }
        executorService.shutdown();
    }

打印结果就不复制了。

newCachedThreadPool

创建一个缓存线程池,线程池容量为最大的容量 Integer.MAX_VALUE 。线程容量是自动扩容的,直到最大。当线程池中没有空闲的线程时就会自动创建线程去执行新添加的任务,在线程空闲时间达到最大的阀值时会被自动释放,默认是60s。

常用于短时间内需要大量进程的相关场景。测试场景。

测试当线程池没有空闲线程时新添加任务是否会创建新的线程去执行任务:

代码语言:text复制
public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 4; i  ) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
//        查看线程池的相关信息
        System.out.println(executorService);

        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        });
        System.out.println(executorService);

        executorService.shutdown();
    }

# 打印

java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
pool-1-thread-1
pool-1-thread-3
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5

从打印结果来看,线程池新增了一个线程。

测试当有线程空闲时新添加任务是否会创建线程去执行任务。

代码语言:text复制
public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 4; i  ) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
//        查看线程池的相关信息
        System.out.println(executorService);

        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        });
        System.out.println(executorService);

        executorService.shutdown();
    }

# 打印
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 4, active threads = 0, queued tasks = 0, completed tasks = 4]
java.util.concurrent.ThreadPoolExecutor@5910e440[Running, pool size = 4, active threads = 0, queued tasks = 0, completed tasks = 4]
pool-1-thread-4

从打印结果来看,线程池的线程数量始终是 4 ,新添加的任务也由空闲线程去执行了。

测试一下线程的自动释放:

代码语言:java复制
 public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName());
        });
//        查看线程池的相关信息
        System.out.println(executorService);
        try {
            TimeUnit.SECONDS.sleep(63);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(executorService);

        executorService.shutdown();
    }

# 打印

pool-1-thread-1
java.util.concurrent.ThreadPoolExecutor@728938a9[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@728938a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]

从打印结果来看,线程是自动释放了。

newScheduledThreadPool

创建一个计划任务线程池。可以定时的去执行任务的线程池。

ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

command:需要执行的任务

initialDelay:第一个任务延迟多少秒执行。

period:每个任务间隔多少秒执行。如果上一个任务的执行周期大于间隔时间,则下一个任务会延迟启动。

unit:时间单位。

我们来看一下源码注释:

代码语言:sql复制
 	 * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the given
     * period; that is executions will commence after
     * {@code initialDelay} then {@code initialDelay period}, then
     * {@code initialDelay   2 * period}, and so on.
     * If any execution of the task
     * encounters an exception, subsequent executions are suppressed.
     * Otherwise, the task will only terminate via cancellation or
     * termination of the executor.  If any execution of this task
     * takes longer than its period, then subsequent executions
     * may start late, but will not concurrently execute.

第一个任务是 initialDelay 后执行,第二个任务是 initialDelay period 后执行,第三个任务是 initialDelay 2 * period 时间后执行,以此类推。如果执行的任务中出现异常,则下一个任务将不会继续执行下去。如果当前任务执行的时间比 period 时间间隔长,则下一个任务可能会延迟执行,但不会同时执行。下一个任务的执行取决于上一个任务的开始,上一个任务的执行时间可能会影响下一个任务的时间,但是如果时间大于间隔时间,那么下一个任务会立刻执行,而不会等待间隔时间去执行。

代码语言:text复制
 public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        executorService.scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName());
        },0,1,TimeUnit.SECONDS);
    }

第一个任务延迟 0s 开始运行,每个任务间隔 1s。在执行下一个任务时,线程池会随机挑选空闲线程中的一个。

我们来尝试一下,下一个任务是否是以上一个任务开始时间作为间隔标准。

代码语言:scss复制
public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        executorService.scheduleAtFixedRate(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程结束");
        }, 0, 1, TimeUnit.SECONDS);
    }

从打印的速度中我们可以发现,下一个线程的名称打印是紧挨着 “线程结束” 后打印,而不是间隔一秒后打印。

我们再去研究一下线程池中的其他方法, ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

这个方法的参数于上面方法的参数一致,只不过当前方法下一个任务执行的时间取决于上一个任务执行结束的时间。

代码语言:scss复制
public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        executorService.scheduleWithFixedDelay(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程结束");
        }, 0, 1, TimeUnit.SECONDS);
       
    }

从打印结果来看,“线程结束” 打印后,间隔 delay 秒后才打印线程名称。

ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit); 任务延迟 delay 时间后执行,并且任务只执行一次。

代码语言:scss复制
 public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
        executorService.schedule(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程结束");
        },1,TimeUnit.SECONDS);

    }

ForkJoinPool

newWorkStealingPool

1.8、ThreadPoolExecutor

除了 ForkJoinPool 以外,其他线程池使用的都是 ThreadPoolExecutor 来实现的。 ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:线程池中要保留的线程数量。 maximumPoolSize:最大的线程容量。 keepAliveTime:线程的生命周期,0为永久。 unit:时间格式。 workQueue:阻塞队列,queued tasks。

我们来看一下以上线程池是如何调用 ThreadPoolExecutor 的。

newFixedThreadPool

代码语言:text复制
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }

newScheduledThreadPool

代码语言:java复制
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }

newCachedThreadPool

代码语言:c#复制
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }

newSingleThreadExecutor

代码语言:c#复制
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞