ThreadPoolExecutor的submit正确的使用方式

2023-10-25 15:53:00 浏览数 (3)

项目场景:

线程池的地方用的还是挺多的,一般来说用的多的还是execute方法,submit方法还是用的挺少的,一般ThreadPoolExecutorsubmit 方法通常用于将一个任务提交到线程池中执行。这个方法会返回一个 Future 对象,可以用来检查任务的执行状态,获取任务的返回值或者取消任务的执行。

使用 submit 方法可以将任务提交到线程池中,由线程池中的线程来执行任务,从而避免了为每个任务创建线程的开销。同时,线程池可以限制同时执行的任务数量,避免资源被过度占用。


问题描述

提示:部分代码

某台服务器上配置了一个agent服务用来做命令执行,发现队列老是堆积。消费不过来明明用了线程池也发现任务队列没有满,奇怪。 项目日志:

代码语言:javascript复制
 Add task [com.timelinecapital.util.agent.commands.ops.CommandSHLoginExecuteRunner@7751c119] ThreadPool status: FixedThreadPool:ActiveCount:0,CompletedTaskCount:1680,TaskCount:1681,PoolSize:6,CorePoolSize:6,QueueSize:1

项目代码:

代码语言:javascript复制
ThreadPoolExecutor service = null;
    @Test
    public void testPools() throws ExecutionException, InterruptedException, TimeoutException {
        final int corePoolSize = Runtime.getRuntime().availableProcessors();
        log.info("corePoolSize:{}", corePoolSize);
        final int maximumPoolSize = corePoolSize * 2;
        final long keepAliveTime = 20;
        final TimeUnit unit = TimeUnit.SECONDS;
        final BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();
        final ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger mThreadNum = new AtomicInteger(1);

            @Override
            public Thread newThread(final Runnable r) {
                return new Thread(r, "command-thread-"   mThreadNum.getAndIncrement());
            }
        };
        service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
                unit, workQueue, threadFactory);
        for (int i = 0; i < maximumPoolSize; i  ) {
            log.info("pool========{}", showStatus());
            final Future<?> future =  service.submit((Callable) () -> {
                log.info("thread name start:{}========", Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(3);
                    log.info("thread name end:{}========", Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return null;
            });
            Object o = future.get(10, TimeUnit.SECONDS);
        }
        log.info("complate");
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    String showStatus() {
        final ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
        final StringBuilder info = new StringBuilder();
        info.append("FixedThreadPool:");
        info.append("ActiveCount:").append(executor.getActiveCount()).append(",");
        info.append("CompletedTaskCount:").append(executor.getCompletedTaskCount()).append(",");
        info.append("TaskCount:").append(executor.getTaskCount()).append(",");
        info.append("PoolSize:").append(executor.getPoolSize()).append(",");
        info.append("CorePoolSize:").append(executor.getCorePoolSize()).append(",");
        final BlockingQueue<Runnable> queue = executor.getQueue();
        if (queue != null) {
            info.append("QueueSize:").append(queue.size());
        }
        return info.toString();
    }

原因分析:

提示:跑了一次看到日志按照单线程的方式执行,瞬间顿悟。

代码语言:javascript复制
2023-08-04 11:10:25 INFO  UtilsTest                       :105 - corePoolSize:12
2023-08-04 11:10:25 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:0,TaskCount:0,PoolSize:0,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:25 INFO  UtilsTest                       :123 - thread name start:command-thread-1========
2023-08-04 11:10:28 INFO  UtilsTest                       :126 - thread name end:command-thread-1========
2023-08-04 11:10:28 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:1,TaskCount:1,PoolSize:1,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:28 INFO  UtilsTest                       :123 - thread name start:command-thread-2========
2023-08-04 11:10:31 INFO  UtilsTest                       :126 - thread name end:command-thread-2========
2023-08-04 11:10:31 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:2,TaskCount:2,PoolSize:2,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:31 INFO  UtilsTest                       :123 - thread name start:command-thread-3========
2023-08-04 11:10:34 INFO  UtilsTest                       :126 - thread name end:command-thread-3========
2023-08-04 11:10:34 INFO  UtilsTest                       :121 - pool========FixedThreadPool:ActiveCount:0,CompletedTaskCount:3,TaskCount:3,PoolSize:3,CorePoolSize:12,QueueSize:0
2023-08-04 11:10:34 INFO  UtilsTest                       :123 - thread name start:command-thread-4========

原来submit的方式用错了,不应该直接这么get的,这样就跟没有开线程池一样,因为future.get(10, TimeUnit.SECONDS)会阻塞线程继续执行,线程池的最大使用效率没有返回出来,只用到一个单线程在执行,结果等于没有用。 从查看submit的源码来看,其实也是调用了java.util.concurrent.Executor#execute方法,只是换了线程实现而已,又让我想起那句话,之前不懂代码的时候看代码是代码,后面懂代码了,看代码就是看方法,现在深入代码底层看代码还是代码,惯性是个恐怖的事情。

代码语言:javascript复制
    /**
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

解决方案:

提示:取消立马获 future.get(10, TimeUnit.SECONDS)方式

最后只能修改业务逻辑,因为对执行结果不是特别需求,所有可以改成execute方式,当然如果逻辑对返回值的需求特别的可以解耦,使用生产者消费者模式,一边计算一边处理,实现逻辑可以这样,在submit返回的Future对象存储在一个集合里面,在另一边可以批次处理也可以单次处理,批次处理就判断所有的submit执行完之后处理,单次处理就使用队列集合,一次取一个值理论情况下不会阻塞太久。

总结

习惯了用execute就忘记了submit的正确使用方式,惯性是很恐怖的,还是得多多跑跑单元测试。

0 人点赞