多线程进阶——JUC并发编程之Executors框架设计思想一探究竟

2021-12-28 10:53:37 浏览数 (4)

1、学习切入点

Executors 框架是整个JUC 包中类/接口关系中最为复杂的框架,真正理解Executors框架的前提是理清楚各个模块之间的关系,高屋建瓴,从整体到局部才能透彻理解各个模块的功能和背后设计的思路!

本文将从 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable这些核心模块展开分析。

2、从Executor谈起

Executor 是JDK 1.5 时,随着JUC引入的一个接口,引入该接口的主要目的是 解耦任务本身和任务执行。我们之前通过线程执行一个任务时,往往需要先创建一个线程.satrt()去执行任务,

而Executor 接口解耦了任务和任务的执行,该接口只有一个方法,入参为待执行的任务。

代码语言:javascript复制
public interface Executor {
    /**
     * 执行给定的Runable任务
     * 根据Executor接口的实现类不同,具体执行方式也不同
     */
    void execute(Runnable command);
}

我们可以像下面这样执行任务,而不必心线程的创建

代码语言:javascript复制
Executor executor = anExecutor
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

由于Executor 仅仅是一个接口,所有根据其实现不同,执行具体的任务也不同,参看源码注释,我们可以知道比如:

2.1、同步执行任务

代码语言:javascript复制
class DirectExecutor implements Executor {
    public void execute(Runnable r) {
    r.run();
  }
}}

2.2、异步执行任务

代码语言:javascript复制
class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
      new Thread(r).start();
    }
  }}

2.3、对任务进行排队执行

代码语言:javascript复制
 class SerialExecutor implements Executor {
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    final Executor executor;
    Runnable active;
 
    SerialExecutor(Executor executor) {
      this.executor = executor;
    }
    public synchronized void execute(final Runnable r) {
      tasks.offer(new Runnable() {
        public void run() {
          try {
            r.run();
          } finally {
            scheduleNext();
          }
        }
      });
      if (active == null) {
        scheduleNext();
      }
    }
    protected synchronized void scheduleNext() {
      if ((active = tasks.poll()) != null) {
        executor.execute(active);
      }
    }
  }}

以上这些源码给出的示例,仅仅是给出了一些可能的 Executor实现,JUC包中提供了很多 Executor的具体实现类,这里关键是理解Executor的设计思想——对任务和任务的执行解耦

3、增强的ExecutorService

因为Executor 接口的功能很简单,为了对它进行增强,JUC又提供了一个名为 ExecutorService 接口,那它到达增强了哪些东西呢?

可以看到,ExecutorService 在Executor 的基础上增加了对任务的控制,同时也包括对自己生命周期的管理,主要有四类:

1、关闭执行器,禁止任务提交

2、监视执行器状态

3、提供对异步任务支持

4、提供对处理任务的支持

4、周期任务调度——ScheduledExecutorService

在开发环境中,我们可能需要提交给执行器某些任务能够定时执行或者周期性执行,我们可以自己去实现Executor 接口来创建符合我们需要的类,Doug Lea 已经考虑好了这类需求,所有在ExecutorService 的基础之上,又提供了一个接口 ScheduledExecutorService。

案例演示

代码语言:javascript复制
public class ScheduleExecutorTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler=Executors.newScheduledThreadPool(4);
        //开始执行时间1秒后,每隔1秒执行一次任务
        final ScheduledFuture<?>scheduledFuture=scheduler.scheduleAtFixedRate(new BeepTask(),1,1,TimeUnit.SECONDS);

        //5秒后,取消任务,关闭线程池
        scheduler.schedule(()->{
            scheduledFuture.cancel(true);
            System.out.println("over");
            scheduler.shutdownNow();
        },5,TimeUnit.SECONDS);
    }
    private static class BeepTask implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() "beep!");
        }
    }
}

注意:scheduleAtFixedRate 方法返回一个ScheduledFuture 对象,ScheduledFuture其实就是在Future 的基础上增加了延迟功能。通过ScheduledFuture 可以取消一个任务的执行。

ScheduledExecutorService 完整接口说明如下:

代码语言:javascript复制
public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 提交一个待执行的任务, 并在给定的延迟后执行该任务.
     * 
     * @param command 待执行的任务
     * @param delay 延迟时间
     * @param unit 延迟时间的单位
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    /**
     * 提交一个待执行的任务(具有返回值), 并在给定的延迟后执行该任务
     *
     * @param callable 待执行的任务
     * @param delay 延迟时间
     * @param unit 延迟时间的单位
     * @param <V>  返回值类型
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /**
     * 提交一个待执行的任务
     * 该任务在 initialDelay 后开始执行, 然后在initialDelay period 后执行, 接 
     * 着在 initialDelay   2 * period 后执行, 依此类推
     * 
     * @param command 待执行的任务
     * @param initialDelay 首次执行的延迟时间
     * @param period 连续执行之间的周期
     * @param unit 延迟时间的单位
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * 提交一个待执行的任务
     * 该任务在 initialDelay 后开始执行, 随后在每一次执行终止和下一次执行开始之间 
     * 都存在给定的延迟
     * 如果任务的任一执行遇到异常, 就会取消后续执行. 否则, 只能通过执行程序的取消或 
     * 终止方法来终止该任务
     *
     * @param command 待执行的任务
     * @param initialDelay 首次执行的延迟时间
     * @param delay 一次执行终止和下一次执行开始之间的延迟
     * @param unit  延迟时间的单位
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

至此,Executors 框架中的三个最核心的接口介绍完毕,三个核心接口关系如下

5、生产Executor的工厂

通过前面的介绍,读者应该对Executors框架有了一个初步的认识,Executors 框架就是用来解耦任务本身与任务的执行,并提供了三个核心(Executor、ExecutorService、ScheduledExecutorService)接口来满足使用者的需求!

1、Executor:提交普通的可执行任务 2、ExecutorService:提供对线程池生命周期的管理、异步任务的支持 3、ScheduledExecutorService:提供对任务周期性执行的支持

既然有了接口,那么就有相应的实现类,JUC 提供了许多默认的接口实现,用户如果自己去创建这些类的实例,就需要了解这些类的细节,我们可不可以仅仅根据一些参数就能创建这些实例呢?

因此Executors类就是专门用于创建上述接口的实现类对象,Executors其实就是一个简单工厂,它的所有方法都是static的,Executors 一共提供了五类可供创建的Executors执行器实例。

1、固定线程数的线程池

代码语言:javascript复制
    /**
     * 创建一个具有固定线程数的Executor
     *
     * @param nThreads 核心线程数(银行有五个窗口,平时开三个窗口(核心线程数))
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * 创建一个具有固定线程数的Executor.
     * 在需要时使用提供的 ThreadFactory 创建新线程.
     *
     * @param nThreads 核心线程数
     * @param threadFactory 创建线程的工厂
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

public interface ThreadFactory {
    //创建了一个线程
    Thread newThread(Runnable r);
}

ThreadFactory 作为一个线程工厂,我们可以由外部指定ThreadFactory实例,以决定线程具体的创建方式。

下面就以 DefaultThreadFactory为例

代码语言:javascript复制
//默认的线程工厂
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);//原子性
        private final ThreadGroup group;//线程组
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-"  
                          poolNumber.getAndIncrement()  
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix   threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

可以看到,DefaultThreadFactory 初始化的时候定义了线程组、线程名称等信息,每创建一个线程,都给线程统一分配了这些信息,避免了手动new的方式创建线程,又可以进行工厂复用

2、单个线程的线程池

代码语言:javascript复制
 /**
     * 创建一个使用单个 worker 线程的 Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    /**
     * 创建一个使用单个 worker 线程的 Executor
     * 在需要时使用提供的 ThreadFactory 创建新线程
     *
     * @param threadFactory 线程工厂
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    //对返回的Executor实例进行一个包装
    static class FinalizableDelegatedExecutorService
        extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

我们可以看到,只有单个线程上文线程池其实就是指定好了线程数为1的固定线程池,主要区别就是返回的Executor实例用了一个 FinalizableDelegatedExecutorService对象 进行包装,它核心继承了 DelegatedExecutorService ,这是个包装类,实现了 ExecutorService的所有方法,但是内部实现其实都委托给了传入的 ExecutorService 实例。

为什么要多此一举呢?

因为返回的 ThreadPoolExecutor 包含一些设置线程池大小的方法,对于只有单个线程的线程池来说,我们是不希望用户通过强转的方式使用这些方法的,所以需要这么一个包装类,只暴露ExecutorService 本身的方法。

3、可缓存的线程池

代码语言:javascript复制
/**
 * 创建一个可缓存线程的Execotor
 * 如果线程池中没有线程可用, 则创建一个新线程并添加到池中
 * 如果有线程长时间未被使用(默认60s,可通过threadFactory配置), 则从缓存中移除
 */
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

/**
 * 创建一个可缓存线程的Execotor.
 * 如果线程池中没有线程可用, 则创建一个新线程并添加到池中;
 * 如果有线程长时间未被使用(默认60s, 可通过threadFactory配置), 则从缓存中移除.
 * 在需要时使用提供的 ThreadFactory 创建新线程.
 */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

可以看到,返回的还是ThreadPoolExecutor对象,只是指定了超时时间,另外线程池中线程的数量在[0, Integer.MAX_VALUE]之间。

4、可延时/周期性调度的线程池

代码语言:javascript复制
/**
 * 创建一个具有固定线程数的 可调度Executor.
 * 它可安排任务在指定延迟后或周期性地执行.
 */
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
/**
 * 创建一个具有固定线程数的 可调度Executor.
 * 它可安排任务在指定延迟后或周期性地执行.
 * 在需要时使用提供的 ThreadFactory 创建新线程.
 */
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

5、Fork/Join线程池

代码语言:javascript复制
/**
 * 创建具有指定并行级别的ForkJoin线程池.
 */
public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

/**
 * 创建并行级别等于CPU核心数的ForkJoin线程池.
 */
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

3、总结

至此,Executors框架就基本介绍完了,我们看下他的类图

0 人点赞