ThreadPool介绍

2021-10-18 09:07:52 浏览数 (1)

简介

线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

合理利用线程池能够带来三个好处

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池参数

代码语言:javascript复制
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize(核心池的大小):在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
  • workQueue(任务队列):用于传输和保存等待执行任务的阻塞队列。
  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime(线程存活保持时间):表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
  • unit:参数keepAliveTime的时间单位。
  • threadFactory(线程工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。
  • handler(线程饱和策略):当线程池和队列都满了,再加入线程会执行此策略。

线程池流程

  1. 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  3. 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

ThreadPool

  • Executors.newCacheThreadPool():可缓存线程池,先查看池中有没有以前建立的线程,如果有,就直接使用。如果没有,就建一个新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务。线程处理任务速度比较慢,且还有新任务不停进来,会导致CPU可能处于100%。
代码语言:javascript复制
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
说明:(没有正式工,都是临时工)-卖保险-效率高
如果当前workA工作完之后,发现还有Taks后,会重新回到线程池继续工作。

SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。 使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。

  • Executors.newFixedThreadPool(int n):创建一个可重用固定个数的线程池,以共享的无界队列方式来运行这些线程。一批n个线程执行完之后,再次返回线程池继续下一波任务。线程处理任务速度比较慢,且还有新任务不停进来,会导致内存溢出。
代码语言:javascript复制
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
说明:(都是正式工,没有临时工)国企-效率稍慢

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。且LinkedBlockingQueue队列大小是Integer.MAX_VALUE。

  • Executors.newScheduledThreadPool(int n):创建一个定长线程池,支持定时及周期性任务执行。
代码语言:javascript复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
  • Executors.newSingleThreadExecutor():创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。线程处理任务速度比较慢,且还有新任务不停进来,会导致内存溢出。
代码语言:javascript复制
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
说明:效率很慢

newCacheThreadPool

描述:线程池为无限大,当执行当前任务时上一个任务已经完成,会复用执行上一个任务的线程,而不用每次新建线程。

代码语言:javascript复制
public class ThreadPoolTest {

    public static void main(String[] args) {
        // 创建一个可缓存线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i  ) {
            try {
                // sleep可明显看到使用的是线程池里面以前的线程,没有创建新的线程
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    // 打印正在执行的缓存线程信息
                    System.out.println(Thread.currentThread().getName()
                              "正在被执行");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

执行结果:
pool-1-thread-1正在被执行
pool-1-thread-2正在被执行
pool-1-thread-2正在被执行
pool-1-thread-1正在被执行
pool-1-thread-1正在被执行
pool-1-thread-2正在被执行
pool-1-thread-1正在被执行
pool-1-thread-2正在被执行
pool-1-thread-1正在被执行
pool-1-thread-1正在被执行

newFixedThreadPool

描述:因为线程池大小为3,每个任务输出打印结果后sleep 2秒,所以每两秒打印3个结果。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。等第一批线程执行完之后,再次回到线程池继续下一批任务执行。

代码语言:javascript复制
public class ThreadPoolTest2 {

    public static void main(String[] args) {
        // 创建一个可重用固定个数的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i  ) {
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 打印正在执行的缓存线程信息
                        System.out.println(Thread.currentThread().getName()   "正在被执行");
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

newScheduledThreadPool

代码语言:javascript复制
public class ThreadPoolTest3 {

    public static void main(String[] args) {
        //创建一个定长线程池,支持定时及周期性任务执行——延迟执行
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        //延迟1秒执行
//         scheduledThreadPool.schedule(new Runnable() {
//             public void run() {
//                System.out.println("延迟1秒执行");
//             }
//         }, 1, TimeUnit.SECONDS);


        // 延迟1秒后每3秒执行一次
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("延迟1秒后每3秒执行一次");
            }
        }, 1, 3, TimeUnit.SECONDS);

    }
}

newSingleThreadExecutor

代码语言:javascript复制
public class ThreadPoolTest4 {

    public static void main(String[] args) {
        //创建一个单线程化的线程池
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i  ) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //结果依次输出,相当于顺序执行各个任务
                        System.out.println(Thread.currentThread().getName() "正在被执行,打印的值是:" index);
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

自定义线程池

代码语言:javascript复制
public class ThreadPoolTest5 {

    public static void main(String[] args) {
        // 创建一个可重用固定个数的线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
        for (int i = 0; i < 100; i  ) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 打印正在执行的缓存线程信息
                        System.out.println(Thread.currentThread().getName()   "正在被执行");
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

说明:在执行到底31个任务的时候抛出异常。

原因:自定义线程池核心线程为10、队列为10、最大线程数为20。所以再提交优先级下,前10个任务会在核心线程下,10到20会放在队列中,再20-30会放到最大线程数中(最大线程数是包含核心线程数的),所以第31个任务到来时因为没有存储的地方,导致异常。

代码语言:javascript复制
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.java.master.高并发.ThreadPool.ThreadPoolTest5$1@10f87f48 rejected from java.util.concurrent.ThreadPoolExecutor@b4c966a[Running, pool size = 20, active threads = 20, queued tasks = 10, completed tasks = 0]
代码语言:javascript复制
java.util.concurrent.ThreadPoolExecutor#runWorker

执行优先级:
while (task != null || (task = getTask()) != null)

0 人点赞