概述
线程池的意义:为了减少服务器端大量线程的创建和销毁,做到线程的复用。
- 线程池创建之后,线程池为空,没有任何线程
- 当有请求的时候,会在线程池中创建一个线程(核心线程)去处理这个请求
- 核心线程使用完毕之后不会被销毁,而是继续等待下一个请求
- 核心线程没有达到上限是,新来的请求会继续创建线程池的核心线程并使用
- 当核心线程被全部占用,新来的请求会被放入工作队列(阻塞队列)中
- 当阻塞队列也满了的时候新的请求会开启线程池的临时线程
- 临时线程被归还之后并不会立即销毁,而是存活指定的时间
- 临时线程即使处于空闲状态,也不会去处理工作队列里的请求,工作队列的里请求只能被核心线程处理
- 线程池中线程全部被占用之后,新来的请求会被拒绝执行处理器拒绝。
自定义线程池
代码语言:javascript复制package com.jmy.thredpool;
import java.util.concurrent.*;
public class MyExecutorService {
public static void main(String[] args) {
// 创建执行器服务
/*
int corePoolSize, 核心线程数
int maximumPoolSize, 最大线程数 = 核心线程 临时线程
long keepAliveTime, 临时线程存活时间
TimeUnit unit, 时间单位
BlockingQueue<Runnable> workQueue 工作队列类型
*/
ExecutorService es = new ThreadPoolExecutor(
,
,
,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义拒绝逻辑
System.out.println("请求被拒绝");
}
}
);
/*
7个请求被核心线程执行 8个请求放入工作队列 8个请求被临时线程执行 最后2个被拒绝
*/
for (int i = ; i < ; i ) {
es.execute(new MyThread());
}
}
}
class MyThread implements Runnable {
@Override
public void run() {
System.out.println("Start...");
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end...");
}
}
JDK提供的线程池
代码语言:javascript复制package com.jmy.thredpool;
import java.util.concurrent.*;
/*
jdk提供的两个线程池
*/
public class ExecutorServiceByJdk {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/*
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
源码分析:
核心线程数为0
临时线程数为2的31次方减1 可认为无限
临时线程存活时间60秒
工作队列为同步队列
------------------------------
大池子小队列:适用于短连接,高并发的短任务场景
*/
ExecutorService ex = Executors.newCachedThreadPool();
/*
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
源码分析:
核心线程数自定义
没有临时线程
工作队列为阻塞式链式队列可认为容量无限
------------------------------------------
小池子大队列:适用于长连接任务,例如百度网盘
*/
ExecutorService executorService = Executors.newFixedThreadPool();
// Callable 只能使用线程池执行
Future<String> future = ex.submit(new CThread());
System.out.println(future.get());
}
}
// 实现Callable接口实现run方法可以得到返回值,泛型指定返回值类型
class CThread implements Callable<String>{
@Override
public String call() throws Exception {
return "执行成功";
}
}
定时执行器服务
代码语言:javascript复制package com.jmy.thredpool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/*
定时调度执行器服务
*/
public class ScheduleExecutorServiceDemo01 {
public static void main(String[] args) {
// 创建定时调度执行器服务 核心线程数为5
ScheduledExecutorService ses = Executors.newScheduledThreadPool();
// 延时任务 5秒后执行一次ScThread线程 且只执行一次
ses.schedule(new ScThread(), , TimeUnit.SECONDS);
/*
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, Runable 线程
long initialDelay, 延时shijian
long period, 间隔时间
TimeUnit unit);
从上一个线程开始时计时 每隔5秒执行一次
*/
ses.scheduleAtFixedRate(new ScThread(),,,TimeUnit.SECONDS);
/*
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
从上一个线程结束开始计时,每5秒执行一次
*/
ses.scheduleWithFixedDelay(new ScThread(),,,TimeUnit.SECONDS);
}
}
class ScThread implements Runnable {
@Override
public void run() {
System.out.println("start...");
}
}
分叉合并
代码语言:javascript复制package com.jmy.thredpool;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/*
分叉合并
*/
public class ForkJoinPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
System.out.println(pool.submit(new Sum(, 5555555555L)).get());
}
}
class Sum extends RecursiveTask<Long>{
private long start;
private long end;
public Sum(long start, long end) {
this.start = start;
this.end = end;
}
// 分叉合并逻辑定义
@Override
protected Long compute() {
if (end - start <= ) {
long sum = ;
for (long i = start; i < end; i ) {
sum = i;
}
return sum;
} else {
long mid = (start - end)/;
Sum left = new Sum(start,mid);
Sum right = new Sum(mid ,end);
// 分叉过程
left.fork();
right.fork();
// 合并
return left.join() right.join();
}
}
}
代码语言:javascript复制1. Fork分叉:将一个大的任务拆分成多个小的任务
2. Join合并:将拆分的小的任务的结果进行汇总
3.数据量越大,分叉合并相对循环的效率就越高。如果数据量比较少,循环的效率
反而高于分叉合并
4. 分叉合并将任务拆分之后能够有效的提高CPU的利用率
5.分叉合并考虑到慢任务带来的问题,采取了"work-stealing"(工作窃取)策略。
即当一个核上的所有的任务执行完成之后,这个核并不会空闲下来,而是会随机
扫描一个核,然后从这个核的任务队列尾端"偷"一个任务回来执行