【进阶之路】线程池拓展与CompletionService操作异步任务

2021-04-02 10:54:14 浏览数 (3)

.markdown-body{word-break:break-word;line-height:1.75;font-weight:400;font-size:15px;overflow-x:hidden;color:#333}.markdown-body h1,.markdown-body h2,.markdown-body h3,.markdown-body h4,.markdown-body h5,.markdown-body h6{line-height:1.5;margin-top:35px;margin-bottom:10px;padding-bottom:5px}.markdown-body h1{font-size:30px;margin-bottom:5px}.markdown-body h2{padding-bottom:12px;font-size:24px;border-bottom:1px solid #ececec}.markdown-body h3{font-size:18px;padding-bottom:0}.markdown-body h4{font-size:16px}.markdown-body h5{font-size:15px}.markdown-body h6{margin-top:5px}.markdown-body p{line-height:inherit;margin-top:22px;margin-bottom:22px}.markdown-body img{max-width:100%}.markdown-body hr{border:none;border-top:1px solid #ddd;margin-top:32px;margin-bottom:32px}.markdown-body code{word-break:break-word;border-radius:2px;overflow-x:auto;background-color:#fff5f5;color:#ff502c;font-size:.87em;padding:.065em .4em}.markdown-body code,.markdown-body pre{font-family:Menlo,Monaco,Consolas,Courier New,monospace}.markdown-body pre{overflow:auto;position:relative;line-height:1.75}.markdown-body pre>code{font-size:12px;padding:15px 12px;margin:0;word-break:normal;display:block;overflow-x:auto;color:#333;background:#f8f8f8}.markdown-body a{text-decoration:none;color:#0269c8;border-bottom:1px solid #d1e9ff}.markdown-body a:active,.markdown-body a:hover{color:#275b8c}.markdown-body table{display:inline-block!important;font-size:12px;width:auto;max-width:100%;overflow:auto;border:1px solid #f6f6f6}.markdown-body thead{background:#f6f6f6;color:#000;text-align:left}.markdown-body tr:nth-child(2n){background-color:#fcfcfc}.markdown-body td,.markdown-body th{padding:12px 7px;line-height:24px}.markdown-body td{min-width:120px}.markdown-body blockquote{color:#666;padding:1px 23px;margin:22px 0;border-left:4px solid #cbcbcb;background-color:#f8f8f8}.markdown-body blockquote:after{display:block;content:""}.markdown-body blockquote>p{margin:10px 0}.markdown-body ol,.markdown-body ul{padding-left:28px}.markdown-body ol li,.markdown-body ul li{margin-bottom:0;list-style:inherit}.markdown-body ol li .task-list-item,.markdown-body ul li .task-list-item{list-style:none}.markdown-body ol li .task-list-item ol,.markdown-body ol li .task-list-item ul,.markdown-body ul li .task-list-item ol,.markdown-body ul li .task-list-item ul{margin-top:0}.markdown-body ol ol,.markdown-body ol ul,.markdown-body ul ol,.markdown-body ul ul{margin-top:3px}.markdown-body ol li{padding-left:6px}.markdown-body .contains-task-list{padding-left:0}.markdown-body .task-list-item{list-style:none}@media (max-width:720px){.markdown-body h1{font-size:24px}.markdown-body h2{font-size:20px}.markdown-body h3{font-size:18px}}

大家好,我是练习java两年半时间的南橘,小伙伴可以一起互相交流经验哦。

一、扩展ThreadPoolExecutor

1、扩展方法介绍

ThreadPoolExecutor是可以扩展的,它内部提供了几个可以在子类中改写的方法(红框内)。JDK内的注解上说,这些方法可以用以添加日志,计时、监视或进行统计信息的收集。是不是感觉很熟悉?有没有一种spring aop中 @Around @Before @After三个注解的既视感?

我们来对比一下

ThreadPoolExecutor

spring aop

beforeExecute()(线程执行之前调用)

@Before(在所拦截的方法执行之前执行 )

afterExecute() (线程执行之后调用)

@After (在所拦截的方法执行之后执行)

terminated() (线程池退出时候调用)

@Around(可以同时在所拦截的方法前后执行)

其实他们的效果是一样的,只是一个在线程池里,一个在拦截器中。

对于ThreadPoolExecutor中的这些方法,有这样的一些特点:

  • 1、无论任务时从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用(但是如果任务在完成后带有一个Error,那么就不会调用afterExecute)
  • 2、同时,如果beforeExecute抛出一个RuntimeExecption,那么任务将不会被执行,连带afterExecute也不会被调用了
  • 3、在线程池完成关闭操作时会调用terminated,类似于try-catch中的finally操作一样。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外也可以用来执行发送通知、记录日志亦或是收集finalize统计信息等操作

2、扩展方法实现

我们先构建一个自定义的线程池,它通过扩展方法来添加日志记录和统计信息的收集。为了测量任务的运行时间,beforeExecute必须记录开始时间并把它保存到一个afterExecute可以访问的地方,于是用ThreadLocal来存储变量,用afterExecute来读取,并通过terminated来输出平均任务和日志消息。

代码语言:javascript复制
public class WeedThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal startTime =new ThreadLocal<>();
    private final Logger log =Logger.getLogger("WeedThreadPool");
    //统计执行次数
    private final AtomicLong numTasks =new AtomicLong();
    //统计总执行时间
    private final AtomicLong totalTime =new AtomicLong();
    /**
     * 这里是实现线程池的构造方法,我随便选了一个,大家可以根据自己的需求找到合适的构造方法
     */
    public WeedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    //线程执行之前调用
    protected void  beforeExecute(Thread t,Runnable r){
        super.beforeExecute(t,r);
        System.out.println(String.format("Thread %s:start %s",t,r));
        //因为currentTimeMillis返回的是ms,而众所周知ms是很难产生差异的,所以换成了nanoTime用ns来展示
        startTime.set(System.nanoTime());
    }
    //线程执行之后调用
    protected void afterExecute(Runnable r,Throwable t){
        try {
            Long endTime =System.nanoTime();
            Long taskTime =endTime-startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread(),r,taskTime));
        }finally {
            super.afterExecute(r,t);
        }
    }
    //线程池退出时候调用
   protected void terminated(){
        try{
            System.out.println(String.format("Terminated: avg time =%dns, ",totalTime.get()/numTasks.get()));
        }finally {
            super.terminated();
        }
   }

}

测试案例:

代码语言:javascript复制
public class WeedThreadTest {
     BlockingQueue taskQueue;
   final static WeedThreadPool weedThreadPool =new WeedThreadPool(3,10,1, TimeUnit.SECONDS,new ArrayBlockingQueue(100));
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i  ) {
            weedThreadPool.execute(WeedThreadTest::run);
        }
        Thread.sleep(2000L);
        weedThreadPool.shutdown();
    }

    private static void run() {
        System.out.println("thread id is: "   Thread.currentThread().getId());
        try {
            Thread.sleep(1024L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3、使用场景

用到这些方法的地方其实和用到Spring AOP中一些场景比较相似,主要在记录跟踪、优化等方面可以使用,如日志记录和统计信息的收集、测量任务的运行时间,以及一些任务完成之后发送通知、邮件、信息之类的。

二、CompletionService操作异步任务

1、异步方法的原理

如果我们意外收获了一大批待执行的任务(举个例子,比如去调用各大旅游软件的出行机票信息),为了提高任务的执行效率,我们可以使用线程池submit异步计算任务,通过调用Future接口实现类的get方法获取结果。

虽然使用了线程池会提高执行效率,但是调用Future接口实现类的get方法是阻塞的,也就是和当前这个Future关联的任务全部执行完成的时候,get方法才返回结果,如果当前任务没有执行完成,而有其它Future关联的任务已经完成了,就会白白浪费很多等待的时间。

所以,有没有这样一个方法,遍历的时候谁先执行完成就先获取哪个结果?

没错,我们的ExecutorCompletionService就可以实现这样的效果,它的内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

逻辑图如下:

ExecutorCompletionService实现了CompletionService接口,在CompletionService接口中定义了如下这些方法:

  • Future submit(Callable task):提交一个Callable类型任务,并返回该任务执行结果关联的Future
  • Future submit(Runnable task,V result):提交一个Runnable类型任务,并返回该任务执行结果关联的Future
  • Future take():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成
  • Future poll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞
  • Future poll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null

2、异步方法的实现

代码语言:javascript复制
public class WeedExecutorServiceDemo {
    /**
     * 继续用之前建好的线程池,只是调整一下池大小
     */
    BlockingQueue taskQueue;
    final static WeedThreadPool weedThreadPool = new WeedThreadPool(1, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue(100));
    public static Random r = new Random();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletionService cs = new ExecutorCompletionService(weedThreadPool);
        for (int i = 0; i < 3; i  ) {
            cs.submit(() -> {
                //获取计算任务
                int init = 0;
                for (int j = 0; j < 100; j  ) {
                    init  = r.nextInt();
                }
                Thread.sleep(1000L);
                return Integer.valueOf(init);
            });
        }
        weedThreadPool.shutdown();
        /**
         * 通过take方法获取,阻塞,直到有任务完成
         */
        for (int i = 0; i < 3; i  ) {
            Future future = cs.take();
            if (future != null) {
                System.out.println(future.get());
            }
        }
    }
}

调用结果如下

我们也可以通过poll方法来获取

代码语言:javascript复制
 		 /**
         * 通过poll方法获取
         */
        for (int i = 0; i < 3; i  ) {
              System.out.println(cs.poll(1200L,TimeUnit.MILLISECONDS).get());

        }

结果自然是一样的


如果把阻塞时间改小一些,目前的代码就会出问题

代码语言:javascript复制
		/**
         * 通过poll方法获取
         */
        for (int i = 0; i < 3; i  ) {
            System.out.println(cs.poll(800L,TimeUnit.MILLISECONDS).get());

        }

同样的,poll方法也可以用来打断超时执行的业务,比如在poll超时的情况下,直接调用线程池的shutdownNow(),残暴地关闭整个线程池。

代码语言:javascript复制
	for (int i = 0; i < 3; i  ) {
            Future poll = cs.poll(800L, TimeUnit.MILLISECONDS);
            if (poll==null){
                System.out.println("执行结束");
                weedThreadPool.shutdownNow();
            }
        }

3、使用场景

选择怎么样的方法来异步执行任务,什么样的方式来接收任务,也是需要根据实际情况来考虑的。

  • 1.、需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。
  • 2、让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。
  • 3、线程池隔离。CompletionService支持创建知己的线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

0 人点赞