Java避坑指南:多线程批量调用下游接口,如何正确设置总超时时间

2024-09-13 13:54:51 浏览数 (3)

多线程批量调用下游接口,设置总超时时间是一种常见的需求,特别是在需要保证程序在预定时间内必须返回,否则超时设置不合理,导致接口变慢。

设置场景:多线程批量执行三个接口,耗时分别为10s、15s、20s(一般不会设置这么大的超时时间,此值为了模拟),总超时时间为15s。

错误做法

代码语言:javascript复制
package com.renzhikeji.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100), Thread::new,
            new ThreadPoolExecutor.AbortPolicy() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                    System.out.println("rejectedExecution");
                    super.rejectedExecution(r, e);
                }
            });

    public static void main(String[] args) {
        List<Future<Integer>> futures = new ArrayList<>(10);
        Future<Integer> future1 = poolExecutor.submit(() -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(10);
            return 1;
        });

        futures.add(future1);
        Future<Integer> future2 = poolExecutor.submit(() -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(15);
            return 1;
        });

        futures.add(future2);
        Future<Integer> future3 = poolExecutor.submit(() -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(20);
            return 1;
        });

        futures.add(future3);


        long start = System.currentTimeMillis();
        for (Future<Integer> integerFuture : futures) {
            try {
                integerFuture.get(15, TimeUnit.SECONDS);
            } catch (Throwable e) {
                e.printStackTrace();
            }

        }

        long d = System.currentTimeMillis() - start;
        System.out.println(d / 1000);

     }

}

执行结果:总超时时间为20s,大于预设置的15s。

上述错误做法:线程池提交任务后,每个任务的超时时间都设置为一个固定值,从而总任务超时超时延长。

java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)方法是对每个任务的超时时间设置,而不是对总任务设置超时时间。

注意:必须保证所有的任务同时执行,核心线程数必须大于等于3,否则会进入队列等待,超时时间会更长。

线程池实现原理解析 崔认知,公众号:认知科技技术团队【八股文Java】图解Java线程池实现原理(ThreadPoolExecutor)

正确做法:使用线程池invokeAll方法

代码语言:javascript复制
package com.renzhikeji.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Thread::new, new ThreadPoolExecutor.AbortPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            System.out.println("rejectedExecution");
            super.rejectedExecution(r, e);
        }
    });

    public static void main(String[] args) {
        List<Future<Integer>> futures = new ArrayList<>(10);
        Callable<Integer> callable1 = () -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(10);
            return 1;
        };


        Callable<Integer> callable2 = () -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(15);
            return 1;
        };


        Callable<Integer> callable3 = () -> {
            System.out.println(Thread.currentThread());
            TimeUnit.SECONDS.sleep(20);
            return 1;
        };

        long start = System.currentTimeMillis();

        try {
            List<Future<Integer>> invoked = poolExecutor.invokeAll(Arrays.asList(callable1, callable2, callable3),
                    15L, TimeUnit.SECONDS);
            for (Future<Integer> future : invoked) {
                try {
                    Integer a = future.get();
                } catch (Throwable e) {
                    e.printStackTrace();
                }

            }
        } catch (Throwable e) {
            System.out.println("12");
            e.printStackTrace();
        }

        long d = System.currentTimeMillis() - start;
        System.out.println(d / 1000);

    }

}

运行结果:总超时时间为预设值的15s。

线程池invokeAll的原理其实是动态改动了java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)设置的超时时间,每次都会设置为:任务截止时间减去当前时间,如下图源码所示:

正确做法:使用CompletableFuture

使用CompletableFuture.allOf(......).get(15L, TimeUnit.SECONDS),也能设置总任务超时时间。

代码语言:javascript复制
package com.renzhikeji.demo;

import java.util.concurrent.*;
import java.util.function.Supplier;

/**
 * @author 认知科技技术团队
 * 微信公众号:认知科技技术团队
 */
public class JdkDemo {
    private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 100, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Thread::new, new ThreadPoolExecutor.AbortPolicy() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            System.out.println("rejectedExecution");
            super.rejectedExecution(r, e);
        }
    });

    public static void main(String[] args) {


        Supplier<Integer> callable1 = () -> {
            System.out.println(Thread.currentThread());
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(callable1, poolExecutor);

        Supplier<Integer> callable2 = () -> {
            System.out.println(Thread.currentThread());
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(callable2, poolExecutor);


        Supplier<Integer> callable3 = () -> {
            System.out.println(Thread.currentThread());
            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };

        CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(callable3, poolExecutor);

        long start = System.currentTimeMillis();


        try {
            CompletableFuture.allOf(future2, future2, future3).get(15L, TimeUnit.SECONDS);
        } catch (Throwable e) {
            e.printStackTrace();
        }


        long d = System.currentTimeMillis() - start;
        System.out.println(d / 1000);

        try {
            Integer integer = future1.get();
            System.out.println("future1");
        } catch (Throwable e) {
            e.printStackTrace();

        }
        try {
            Integer integer = future2.get();
            System.out.println("future2");
        } catch (Throwable e) {
            e.printStackTrace();

        }
        try {
            Integer integer = future3.get();
            System.out.println("future3");
        } catch (Throwable e) {
            e.printStackTrace();

        }

    }

}

执行结果:任务1、任务2执行完了,任务3超时了。

0 人点赞