Concurrent包之ExecutorService(执行器服务)

2022-10-27 16:34:33 浏览数 (1)

概述

线程池的意义:为了减少服务器端大量线程的创建和销毁,做到线程的复用。

  1. 线程池创建之后,线程池为空,没有任何线程
  2. 当有请求的时候,会在线程池中创建一个线程(核心线程)去处理这个请求
  3. 核心线程使用完毕之后不会被销毁,而是继续等待下一个请求
  4. 核心线程没有达到上限是,新来的请求会继续创建线程池的核心线程并使用
  5. 当核心线程被全部占用,新来的请求会被放入工作队列(阻塞队列)中
  6. 当阻塞队列也满了的时候新的请求会开启线程池的临时线程
  7. 临时线程被归还之后并不会立即销毁,而是存活指定的时间
  8. 临时线程即使处于空闲状态,也不会去处理工作队列里的请求,工作队列的里请求只能被核心线程处理
  9. 线程池中线程全部被占用之后,新来的请求会被拒绝执行处理器拒绝。

自定义线程池

代码语言:javascript复制
package com.jmy.thredpool;

import java.util.concurrent.*;

public class MyExecutorService {
    public static void main(String[] args) {
        // 创建执行器服务
        /*
                              int corePoolSize,  核心线程数
                              int maximumPoolSize,  最大线程数 = 核心线程   临时线程
                              long keepAliveTime,    临时线程存活时间
                              TimeUnit unit,          时间单位
                              BlockingQueue<Runnable> workQueue  工作队列类型
         */
        ExecutorService es = new ThreadPoolExecutor(
                ,
                ,
                ,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // 自定义拒绝逻辑
                        System.out.println("请求被拒绝");
                    }
                }
        );

        /*
        7个请求被核心线程执行 8个请求放入工作队列 8个请求被临时线程执行 最后2个被拒绝
         */
        for (int i = ; i < ; i  ) {
            es.execute(new MyThread());
        }

    }
}

class MyThread implements Runnable {

    @Override
    public void run() {
        System.out.println("Start...");

        try {
            Thread.sleep();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("end...");
    }
}

JDK提供的线程池

代码语言:javascript复制
package com.jmy.thredpool;

import java.util.concurrent.*;

/*
jdk提供的两个线程池
 */
public class ExecutorServiceByJdk {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /*
        public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
        }
        源码分析:
            核心线程数为0
            临时线程数为2的31次方减1 可认为无限
            临时线程存活时间60秒
            工作队列为同步队列
        ------------------------------
        大池子小队列:适用于短连接,高并发的短任务场景
         */
        ExecutorService ex = Executors.newCachedThreadPool();

        /*
        public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
        }
        源码分析:
            核心线程数自定义
            没有临时线程
            工作队列为阻塞式链式队列可认为容量无限
        ------------------------------------------
        小池子大队列:适用于长连接任务,例如百度网盘
         */
        ExecutorService executorService = Executors.newFixedThreadPool();

        // Callable 只能使用线程池执行
        Future<String> future = ex.submit(new CThread());
        System.out.println(future.get());
    }
}

// 实现Callable接口实现run方法可以得到返回值,泛型指定返回值类型
class CThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        return "执行成功";
    }
}

定时执行器服务

代码语言:javascript复制
package com.jmy.thredpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/*
定时调度执行器服务
 */
public class ScheduleExecutorServiceDemo01 {
    public static void main(String[] args) {
        // 创建定时调度执行器服务 核心线程数为5
        ScheduledExecutorService ses = Executors.newScheduledThreadPool();

        // 延时任务 5秒后执行一次ScThread线程 且只执行一次
        ses.schedule(new ScThread(), , TimeUnit.SECONDS);
        /*
                public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, Runable 线程
                                                  long initialDelay, 延时shijian
                                                  long period,  间隔时间
                                                  TimeUnit unit);
                从上一个线程开始时计时 每隔5秒执行一次
         */
        ses.scheduleAtFixedRate(new ScThread(),,,TimeUnit.SECONDS);
        
        /*
         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
                从上一个线程结束开始计时,每5秒执行一次
         */
        ses.scheduleWithFixedDelay(new ScThread(),,,TimeUnit.SECONDS);
    }
}

class ScThread implements Runnable {

    @Override
    public void run() {
        System.out.println("start...");
    }
}

分叉合并

代码语言:javascript复制
package com.jmy.thredpool;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/*
  分叉合并
 */
public class ForkJoinPoolDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        System.out.println(pool.submit(new Sum(, 5555555555L)).get());
    }
}

class Sum extends RecursiveTask<Long>{

    private long start;
    private long end;

    public Sum(long start, long end) {
        this.start = start;
        this.end = end;
    }

    // 分叉合并逻辑定义
    @Override
    protected Long compute() {
        if (end - start <= ) {
            long sum = ;
            for (long i = start; i < end; i  ) {
                sum  = i;
            }

            return sum;
        } else {
            long mid = (start - end)/;
            Sum left = new Sum(start,mid);
            Sum right = new Sum(mid ,end);
            // 分叉过程
            left.fork();
            right.fork();
            // 合并
            return left.join()   right.join();
        }

    }
}
代码语言:javascript复制
1. Fork分叉:将一个大的任务拆分成多个小的任务
2. Join合并:将拆分的小的任务的结果进行汇总
3.数据量越大,分叉合并相对循环的效率就越高。如果数据量比较少,循环的效率
反而高于分叉合并
4. 分叉合并将任务拆分之后能够有效的提高CPU的利用率
5.分叉合并考虑到慢任务带来的问题,采取了"work-stealing"(工作窃取)策略。
即当一个核上的所有的任务执行完成之后,这个核并不会空闲下来,而是会随机
扫描一个核,然后从这个核的任务队列尾端"偷"一个任务回来执行

0 人点赞