线程池详解与异步任务编排使用案例

2022-10-28 16:42:06 浏览数 (1)

线程池详解与异步任务编排使用案例

1.初始化线程的4种方式

代码语言:javascript复制
1)、继承Thread
2)、实现 Runnable接口
3)、实现 Callable接口 FutureTask(可以拿到返回结果,可以处理异常)
4)、线程池

区别:
	1、2不能得到返回值。3可以获取返回值
	1、2、3都不能控制资源(无法控制线程数【高并发时线程数耗尽资源】)
	4可以控制资源,性能稳定,不会一下子所有线程一起运行

结论:
	实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】

2.创建线程池的方式

创建固定线程数的线程池ExecutorService

代码语言:javascript复制
固定线程数的线程池
Executors.newFixedThreadPool(10);
execute和submit区别
代码语言:javascript复制
作用:都是提交异步任务的

execute:只能提交Runnable任务,没有返回值
submit:可以提交Runnable、Callable,返回值是FutureTask

创建原生线程池ThreadPoolExecutor

代码语言:javascript复制
new ThreadPoolExecutor(5,
        200,
        10,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(100000),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());

7个参数:
    corePoolSize:	核心线程数,不会被回收,接收异步任务时才会创建
    maximumPoolSize:最大线程数量,控制资源
	keepAliveime: 	maximumPoolSize-corePoolSize 无任务存活超过空闲时间则线程被释放
	TimeUnitunit:	时间单位
	workQueue:		阻塞队列,任务被执行之前保存在任务队列中,只要有线程空闲,就会从队列取出任务执行
	threadFactory:	线程的创建工厂【可以自定义】
	RejectedExecutionHandler handler:队列满后执行的拒绝策略
    
线程池任务执行流程
    当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
    当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
    当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
    当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理(默认策略抛出异常)
    当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,释放空闲线程
    当设置allowCoreThreadTimeOut(true)时,该参数默认false,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
拒绝策略
代码语言:javascript复制
DiscardOldestPolicy:丢弃最老的任务
AbortPolicy:丢弃当前任务,抛出异常【默认策略】
CallerRunsPolicy:同步执行run方法
DiscardPolicy:丢弃当前任务,不抛出异常
阻塞队列
代码语言:javascript复制
1.new LinkedBlockingDeque<>();// 默认大小是Integer.Max会导致内存不足,所以要做压力测试给出适当的队列大小

线程池

1.常见的4种默认线程池

代码语言:javascript复制
注意:
    回收线程 = maximumPoolSize - corePoolSize

可缓冲线程池【CachedThreadPool】:corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE
定长线程池【FixedThreadPool】:corePoolSize=maximumPoolSize
周期线程池【ScheduledThreadPool】:指定核心线程数,maximumPoolSize=Integer.MAX_VALUE,支持定时及周期性任务执行(一段时间之后再执行)
单任务线程池【SingleThreadPool】:corePoolSize=maximumPoolSize=1,从队列中获取任务(一个核心线程)
  
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(10);
Executors.newScheduledThreadPool(10);
Executors.newSingleThreadExecutor();

2.为什么使用线程池?

代码语言:javascript复制
1.降低资源的消耗【减少创建销毁操作】
	通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
	高并发状态下过多创建线程可能将资源耗尽
2.提高响应速度【控制线程个数】
	因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢)
3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】
	线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

异步编排CompletableFuture

代码语言:javascript复制
1.runXXX都是没有返回结果的,supplyXXX可以获取返回结果
2.可以传入自定义线程池,否则使用默认线程池

1.业务场景

代码语言:javascript复制
4、5、6依赖1,得先知道sku是哪个spu下的

2.测试异步操作

supplyAsync

代码语言:javascript复制
// 5.1.提交任务异步执行(supplyAsync)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "测试使用", executor);
System.out.println(future1.get());

thenRunAsync串行化

代码语言:javascript复制
// 不能获取上一步结果   无返回值

thenAcceptAsync串行化

代码语言:javascript复制
// 能获取上一步结果   无返回值

thenApplyAsync串行化

代码语言:javascript复制
// 能获取上一步结果   有返回值
// 5.2.获取上一步结果并链式异步调用(thenApplyAsync)
CompletableFuture<String> future2 = future1.thenApplyAsync(s -> s   " 链式调用", executor);// 参数s是上一步的返回值
System.out.println(future2.get());

whenCompleteAsync

代码语言:javascript复制
// 5.3.获取上一步执行结果并获取异常信息(whenCompleteAsync)【无法处理异常返回默认值】
CompletableFuture<String> future3 = future2.whenCompleteAsync((result, exception) -> System.out.println("结果是:"   result   "----异常是:"   exception));

exceptionally

代码语言:javascript复制
// 5.4.获取上一步异常,如果出现异常可返回默认值,不出现异常保持原值(exceptionally)
CompletableFuture<Integer> future4 = future3.thenApplyAsync((s -> 1 / 0), executor);
CompletableFuture<Integer> future5 = future4.exceptionally(exception -> {
System.out.println("出现异常:"   exception);
return 10;
});// 出现异常,使用默认返回值
System.out.println("默认值:"   future5.get());

handle

代码语言:javascript复制
// 5.5.方法执行完成后的处理
CompletableFuture<Integer> future6 = future3.thenApplyAsync((s -> 1 / 0), executor).handle((result, exception) -> {
    if (exception == null) {
        return result;
    }
    System.out.println("handle处理异常:"   exception);
    return 1;
});
System.out.println("handle处理返回结果:"   future6.get());

两任务组合-都要完成

runAfterBothAsync
代码语言:javascript复制
// 5.6.1.二者都要完成,组合【不获取前两个任务返回值,且自己无返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<Void> future03 = future01.runAfterBothAsync(future02, () -> {
    System.out.println("任务3执行");
}, executor);
thenAcceptBothAsync
代码语言:javascript复制
// 5.6.2.二者都要完成,组合【获取前两个任务返回值,自己无返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<Void> future03 = future01.thenAcceptBothAsync(future02,
        (result1, result2) -> {
            System.out.println("任务3执行");
            System.out.println("任务1返回值:"   result1);
            System.out.println("任务2返回值:"   result2);
        }, executor);
thenCombineAsync
代码语言:javascript复制
// 5.6.3.二者都要完成,组合【获取前两个任务返回值,自己有返回值】
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1执行");
    return 10 / 2;
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2执行");
    return "hello";
}, executor);
CompletableFuture<String> future03 = future01.thenCombineAsync(future02,
        (result1, result2) -> {
            System.out.println("任务3执行");
            System.out.println("任务1返回值:"   result1);
            System.out.println("任务2返回值:"   result2);
            return "任务3返回值";
        }, executor);
System.out.println(future03.get());

两任务组合-任一完成

runAfterEitherAsync
代码语言:javascript复制
// 不获取前任务返回值,且当前任务无返回值
acceptEitherAsync
代码语言:javascript复制
// 获取前任务返回值,但当前任务无返回值
applyToEitherAsync
代码语言:javascript复制
// 获取前任务返回值,当前任务有返回值

多任务组合

allOf
代码语言:javascript复制
// 等待所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
allOf.get();// 阻塞等待所有任务完成
anyOf
代码语言:javascript复制
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);
anyOf.get();// 阻塞等待任一任务完成,返回值是执行成功的任务返回值

0 人点赞