【多线程】线程池基本知识

2021-08-10 11:57:18 浏览数 (3)

上篇文章讲了下线程的创建及一些常用的方法,但是在使用的时候,大多数是采用了线程池来管理线程的创建,运行,销毁等过程。本篇将着重讲线程池的基础内容,包括通过线程池创建线程,线程池的基本信息等。

创建线程

前期准备

❝本小节所有代码都是在CreateThreadByPool 类上,该类还有一个内部类MyThread 实现了Runnable 接口。 ❞

首先先把基本的代码给写出来

代码语言:javascript复制
public class CreateThreadByPool {
    public static void main(String[] args) {

    }
}

class MyThread implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()   " processing");
        process();
        System.out.println(Thread.currentThread().getName()   " end");
    }

    private void process() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return String.format("MyThread{%s}", Thread.currentThread().getName());
    }
}

先来大概回顾一下,当我们想创建10个线程的时候的代码「普通方式」是怎样的

代码语言:javascript复制
private static void createThreadByNormalWay() {
    for (int i = 0; i < 10; i  ) {
        MyThread myThread = new MyThread();
        Thread thread = new Thread(myThread);
        thread.start();
    }
}

「能看到的代码」中,是使用了start() 自己直接开启了线程,但是如果用线程池方式来呢

通过Executors

第一种创建线程池的方法是通过Executors 类的静态方法来构建,通过这种方式总共可以创建「4种线程池」

并且可以发现返回是ExecutorService ,所以还要接受返回值,最后通过execute 来启动线程

代码语言:javascript复制
private static void createThreadByPool() {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 10; i  ) {
        MyThread myThread = new MyThread();
        executorService.execute(myThread);
    }
}

先不管底层是如何实现的,至少代码上是「把线程交给了线程池」来执行,这样能够保证线程能够统一管理。

简单的比喻就是前者是要你自己去找班长签到,后者是班长统一管理这整个班的签到。在main函数中调用看看普通方法和通过线程池创建的线程有什么区别

threadPool1

可以很明显的看到有以下几点区别

  • 线程的名字都不一样
  • 并且普通方式是创建了10个线程,而后者只是创建了5个线程(「是由我们自己设定的」
  • 前者基本上是10个线程都是同时处理,后者是最多只能处理5个线程,需要等线程执行完有空闲才能处理其它线程。

通过ThreadPoolExecutor

除了使用Executors.newFixedThreadPool() 创建线程池,还可以通过new ThreadPoolExecutor() ,这里可能有的小伙伴会迷糊了,怎么上面返回的类是ExecutorService ,现在返回的又是ThreadPoolExecutor ,其实两者是同一个东西。

可以看到ThreadExecutorPool 是继承了 AbstractExecutorService ,而后者是实现了ExecutorService 。通过该方法创建的线程池的代码如下

❝可以先这样运行体验下,至于说构造函数里面不同参数的含义,在后面的篇幅中会说到,到时候再返回来看即可。 ❞

代码语言:javascript复制
private static void createThreadByThreadPoolExecutor() {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    for (int i = 0; i < 10; i  ) {
        MyThread myThread = new MyThread();
        executor.execute(myThread);
    }
}

看下运行结果

输出结果没啥好讲的,但是如果细心的小伙伴在上一个gif就会发现,通过线程池来启动线程的方式,程序并没有退出,会一直运行。这是因为我们没有shutdown 线程池。

两者区别

回过头来看看Executors.静态方法 这种方法来创建线程池的源码

可以看到其实更深一层还是使用了new ThreadPoolExecutor() ,只不过我们自己能定制的构造函数的参数变得极其少,这时候肯定有小伙伴疑问了,「那为什么不直接都用」new ThreadPoolExecutor() 「呢?」

「《阿里java开发手册》 嵩山版明确规定了两点,一是线程资源必须通过线程池提供,不允许自行显式创建线程;二是线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式去创建。」

着重看第二点强制通过ThreadPoolExecutor的方式来创建线程,原因在下面也有,「来看看FixedThreadPool和SingleThreadPool的源码」

image-20210629161632466

其它的不管,可以看到两者调用构造函数中的队列都是LinkedBlockingQueue ,这个队列是无边界的,所以有了允许请求长度为Integer.MAX_VALUE ,会堆积大量的「请求」 ,从而导致OOM。

「再来看看CachedThreadPool的源码」

注意这里构造函数的第二个参数是线程池最大线程数,它设置成了Integer.MAX_VALUE ,这就可能会创建大量的线程,从而导致OOM。

线程池信息

ThreadPoolExecutor

上面也可以看到,创建线程池最重要也是最应该使用的方法的是new ThreadPoolExecutor() ,接下来把重点放在ThreadPoolExecutor这个类上面

这个是类中的所有的属性,接下来再看看构造函数

有4种,但是归根结底只有以下这一种构造函数,讲下这些参数的意义,然后大家就可以回头看下上一小节的例子。

代码语言:javascript复制
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
 //省略实现
}
  • corePoolSize :核心线程数,大白话就是能够工作的线程数量
  • maximumPoolSize :最大线程数,就是这个线程池能容纳线程的数量
  • keepAliveTime :存活时间,当线程池中的线程数量大于核心线程数的时候,如果时候没有任务提交,核心线程池外的线程不会立即被销毁,而是会等待,直到等待的时间超过了这个字段才会被回收销毁
  • unit :存活时间的单位
  • workQueue :工作队列,就是在线程开始被调用前,就是存在这个队列中
  • threadFactory :线程工厂,执行程序创建新线程时使用的工厂
  • handler :拒绝策略,当达到线程边界和队列容量而采取的拒绝策略

对于这个拒绝策略,简单说下,有四种实现。

❝实现RejectedExecutionHandler 接口就能实现自己的拒绝策略 ❞

监控线程

下面就来简单实现一个自己的拒绝策略,并且来看下上述类中属性的信息

首先需要一个「监控线程类」

代码语言:javascript复制
class MonitorThread implements Runnable {
 
    //注入一个线程池
    private ThreadPoolExecutor executor;

    public MonitorThread(ThreadPoolExecutor executor) {
        this.executor = executor;
    }

    private boolean monitor = true;

    public void stopMonitor() {
        monitor = false;
    }

    @Override
    public void run() {
        //监控一直运行,每3s输出一次状态
        while (monitor) {
            //主要逻辑是监控线程池的状态
            System.out.println(
                    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s, rejectedExecutionHandler: %s",
                            this.executor.getPoolSize(),
                            this.executor.getCorePoolSize(),
                            this.executor.getActiveCount(),
                            this.executor.getCompletedTaskCount(),
                            this.executor.getTaskCount(),
                            this.executor.isShutdown(),
                            this.executor.isTerminated(),
                            this.executor.getRejectedExecutionHandler()));

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

同时实现「自定义的拒绝策略」

❝其实这还是没有对r处理,拒绝了就拒绝了,只是打印出来,但是并没有实质性地处理 ❞

代码语言:javascript复制
class MyRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("task is rejected");
    }
}

接下来就是「public类」TheradPoolInfo「注意工作线程采用的是上一小节的MyThread类」

代码语言:javascript复制
public class ThreadPoolInfo {
    public static void main(String[] args) throws InterruptedException {
        //新建了一个线程池,核心线程数是3,最大线程数是5,30s
        //队列是ArrayBlockingQueue,并且大小边界是3,拒绝策略自定义输出一句话
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3,5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new MyRejectedExecutionHandler());
        
        //开启监控线程
        MonitorThread monitorThread = new MonitorThread(executor);
        new Thread(monitorThread).start();
        
        //开启工作线程
        for (int i = 0; i < 10; i  ) {
            executor.execute(new MyThread());
        }
  
        //关闭线程池和监控线程
        Thread.sleep(12000);
        executor.shutdown();
        Thread.sleep(3000);
        monitorThread.stopMonitor();
    }
}

「预期结果:」 通过构造函数可以知道,预期是有3个核心线程执行任务,会拒绝2个线程,完成8个任务(最大线程数是5,队列长度是3,具体会在下一篇文章中讲)。

可以看到结果和预期的一样

「创作不易,如果对你有帮助,欢迎点赞,收藏和分享啦!」

0 人点赞