15.11 Callable
第三种创建线程的方式,使用Callable创建的线程可以有一个返回值。Interface Callable<V>它是一个接口和Runnable一样。void run() 这方法是一个无返回值,无参数的方法。Callable 提供了一个call()返回可以返回一个值,这个值的类型由Callable<V>泛型决定。
FutureTask 类 构造方法 new FutureTask(Callable对象)
Future 接口 get方法这个get方法返回线程的计算结果。
Thread类来启动 构造 中基本上都放一个Runnable接口的对象进去,正好FutureTask类也实现了Runnable接口,代表Thread类可以用FutureTask对象来进行构造。
代码语言:javascript复制package com.qf.threadCallable;
import java.util.concurrent.Callable;
public class Calc implements Callable<Integer> { //自定义一个类来实现Callable接口
@Override
public Integer call() throws Exception { //具体实现call()
//计算1-100所有数的和
int sum=0;
for (int i = 1; i <= 100; i ) {
sum =i;
}
return sum;
}
}
代码语言:javascript复制package com.qf.threadCallable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建Callable接口的实现对象,这里有具体要做的事情
Calc calc=new Calc();
//FutureTask 类,这个类可以通过Callable接口来进行构造,同时这个类也实现了Runnable接口
FutureTask<Integer> futureTask=new FutureTask<>(calc);
//线程的启动一定要通过Thread类来实现 Thread的构造只能放一个Runnable接口对象进去,可以把FutureTask对象丢进去
Thread thread=new Thread(futureTask);
thread.start();
Integer integer = futureTask.get();//最后再获取结果的时候程序会阻塞
System.out.println("Main:" integer);
}
}
分段计算-10000中所有数的和,分为十段每一段计算1000个数
代码语言:javascript复制package com.qf.threadCallable;
import java.util.concurrent.Callable;
public class Calc1_10000 implements Callable<Integer> {
private Integer start;
private Integer end;
public Calc1_10000(int start,int end){
this.start=start;
this.end=end;
}
@Override
public Integer call() throws Exception {
int sum=0;
for (int i = start; i <= end ; i ) {
sum =i;
}
return sum;
}
}
package com.qf.threadCallable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Main1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//定义十个计算类的数组,元素空间中没东西
Calc1_10000[] calcs=new Calc1_10000[10];
//定义十个FutureTask数组,元素空间中没东西
FutureTask<Integer>[] futureTasks=new FutureTask[10];
//通过循环创建十个线程并启动
for (int i = 0; i < calcs.length ; i ) {
//创建计算类对象给定义当前这个对象的计算范围 (1,1000) (1001,2000) (2001,3000)
calcs[i]=new Calc1_10000(i*1000 1,i*1000 1000);
//创建new FutureTask<>(calcs[i])并使用当前这个计算范围的对象进行构造
futureTasks[i]=new FutureTask<>(calcs[i]);
//创建线程开启线程
Thread thread= new Thread(futureTasks[i]);
thread.start();
}
int sum=0;
//从每个FutureTask元素中获取线程计算的结果
for (int i = 0; i < futureTasks.length; i ) {
//这里汇总每个线程计算的结果
sum =futureTasks[i].get();
}
//最终结果
System.out.println(sum);
}
}
再次获取结果,不会重新计算
代码语言:javascript复制public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Calc calc=new Calc();
FutureTask<Integer> futureTask=new FutureTask<>(calc);
Thread thread=new Thread(futureTask);
thread.start();
long start=System.currentTimeMillis();
Integer integer = futureTask.get();
long end=System.currentTimeMillis();
System.out.println("Main:" integer ",计算总时长:" (end-start));
System.out.println("=================================================");
start=System.currentTimeMillis();
Integer sum = futureTask.get();
end=System.currentTimeMillis();
System.out.println("Main:" sum ",计算总时长:" (end-start));
}
}
运行结果:
Main:5050,计算总时长:10643
=================================================
Main:5050,计算总时长:0
调用两次get()获线程中的计算结果,第一次调用会实实在在的去计算,第二次调用时会从缓存直接取出来不再计算
15.12 辅助类
- 1、CountDownLatch
CountDownLatch 倒数计数器,初始的时候给一个初始因子,在线程去对这个因子进行递减操作,当CountDownLatch因子等于0的时候,再向下执行,不然就是阻塞。
能达到这个效果的之前有一个join()的方法可以实现,但是有区别,区别就是十个多线程执行出来的效果成了单线程
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) {
for (int i = 1; i <= 10; i ) {
Thread thread=new Thread(()->{
System.out.println(Thread.currentThread().getName() "---------" "Hello");
},String.valueOf(i));
thread.start();
try {
thread.join();//等待这个线程死亡
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("程序结束");
}
}
CountDownLatch 允许一个或多个线程等待其他线程完成一组操作后再继承执行。
构造方法; CountDownLatch(int count)
await()等待阻塞。
await(timeout,TimeUnit)等待阻塞。
countDown() 用于减少计数。
getCount() 获取当前计数因子具体数据
代码语言:javascript复制public class CountDownLatchDemo2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(10);
for (int i = 1; i <= 10; i ) {
Thread thread=new Thread(()->{
System.out.println(Thread.currentThread().getName() "---------" "Hello");
countDownLatch.countDown();//减的代码要放到线程中,放在最后,代表的是当前这个线程已经执行完成
},String.valueOf(i));
thread.start();
}
countDownLatch.await();
System.out.println("程序结束");
}
}
- CyclicBarrier
与CountDownLatch 类似 ,CountDownLatch 做减法CyclickBarrier做加法
CyclickBarrier 给定一个初始因子,这个因子表示的是终点,CyclickBarrier 从0开始计数到达这个设置因子为止,每加 1。然后执行一组操作。
CyclickBarrier(parties,Runnable); parties表示的是终点 Runnable表示的是最后要执行的任务。
await() 等待的同时就将计数器 1
代码语言:javascript复制public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier=new CyclicBarrier(10,()->{
//当线程的awit()执行量到达初始设定值时,开始执行这里面的代码
System.out.println("出发去玩");
});
for (int i = 1; i <= 10; i ) {
new Thread(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() "到了");
try {
cyclicBarrier.await();//每执行一次计数器 1,到了初始设定时,解除阻塞
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
System.out.println("main程序结束");
}
}
- Semaphore 信号灯,信号量 作用是限流,限制线程的运行数量。 Semaphore(int permits) permits 限定值,允许几条线程同时执行 acquire() --> 红绿灯 限定值满时亮红灯,没满时亮绿亮 亮红灯会阻塞 release()--> 释放许可,假定acquire()亮的是红灯,acquire()这绿灯
public class SemaphoreDemo {
public static void main(String[] args) {
List<String> names= Arrays.asList("谢兴灵","张鹏","冯小龙","蔡金恩","李登爽","陶冶","何海洋","浩楠哥","张万达","张帅");
Semaphore semaphore=new Semaphore(2);
for (int i = 0; i < names.size(); i ) {
new Thread(()->{
try {
semaphore.acquire(); //限流 亮红灯 阻塞类似于wait()
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() "关门");
System.out.println(Thread.currentThread().getName() "蹲下,使劲.......");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() "开门走人");
semaphore.release();//放行,类似于notify() 随机唤醒一条线程
},names.get(i)).start();
}
}
}
15.13 读写锁
读锁,写锁,读锁对查询读取的内容进行加锁,但是读锁允许多个资源进行共同访问(共享锁,乐观锁),写锁,与平时讲的锁一样是一种(排他锁,互斥锁)
ReentrantReadWriteLock 就是读写锁
ReentrantReadWriteLock.ReadLock 读锁
ReentrantReadWriteLock.WriteLock 写锁
lock.writeLock().lock(); 上写锁
lock.writeLock().unlock(); 解写锁
lock.readLock().lock(); 上读锁
lock.readLock().unlock(); 解读锁
代码语言:javascript复制package com.qf.threadReadWriteLock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MyCache {
private Map<String,String> myCahce=new HashMap<>();//非线程安全的hashMap
private ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
public void put(String key,String value){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() ":开始写");
myCahce.put(key, value);
System.out.println(Thread.currentThread().getName() ":写结束");
}finally {
lock.writeLock().unlock();
}
}
public String get(String key){
lock.readLock().lock();//这样上读锁
String result=null;
try {
System.out.println(Thread.currentThread().getName() ":开始读");
result= myCahce.get(key);
System.out.println(Thread.currentThread().getName() ":读结束");
}finally {
lock.readLock().unlock(); //读锁解锁
}
return result;
}
}
代码语言:javascript复制public class ReentrantReadWriteLockDemo {
public static void main(String[] args) {
MyCache mc=new MyCache();
for (int i = 0; i < 50; i ) {
final int j=i;
new Thread(()->{
mc.put(String.valueOf(j),String.valueOf(j));
},String.valueOf(i)).start();
}
for (int i = 0; i < 50; i ) {
new Thread(()->{
System.out.println(mc.get(Thread.currentThread().getName()));
},String.valueOf(i)).start();
}
}
}
通过结果分析得出一个结论:
1、上写锁的时候,读锁无法穿插,只是读锁在运行的时候,可以穿插。
2、一个线程写的时候其他线程不能读也不能写。下一次是读还是写看哪个线程快。
多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行。
但是,如果有一个线程想去修改共享资源,就不应该再有其他线程可以对该资源进行读或写。
- 读-读 可以共存 读线程得到了读锁,如果进来的线程还是读,可以继续获取读锁的,共存
- 读-写 不能共存 读线程得到了锁,如果进来的是写线程,写锁是排他,互斥,所以写要等待
- 写-读 不能共存 写线程得到了锁,如果进来的是读线程 , 不可能拿到锁
- 写-写 不能共存 写线程得到了锁,其他线程就别想得到锁
15.14 阻塞队列
队列有两个口一个口进一个口出,所以队列是先进先出的一种数据结构。
BlockingQueue 阻塞队列,父级接口就是util包中的Queue
根据上图,Thread1调用put方法向列队添加值,队列有6个空间,当put6次以后将队列填满,再调用第7次的时候,添加阻塞。这时候需要Thread2调用Take方法从队列中取值出来,取一个值就是清除一个,这时队列就不满,队列有空余空间的时候第7次添加的阻塞结束。如果Thread2在取值将列队已经取空了的时候,也就调用了6次take,在调用第7次的时候,取值也将进入阻塞。等待thread1添加新值进来。
BlockingQueue 的实现类ArrayBlockingQueue
ArrayBlockingQueue类在创建的时候要指定一个初始容量
BlockingQueue<String> names=new ArrayBlockingQueue<>(3);
第一组方法,抛异常:
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) {
BlockingQueue<String> names=new ArrayBlockingQueue<>(3);
names.add("张鹏");
names.add("谢兴灵");
names.add("冯小龙");
//names.add("蔡金恩"); 列队3个容量添加第四个的时候抛异常
System.out.println(names.remove());
System.out.println(names.remove());
System.out.println(names.remove());
// System.out.println(names.remove()); 只要列队空了,再取值就抛异常
}
}
第二组方法:
添加返回布尔,取值返回null
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) {
BlockingQueue<String> names=new ArrayBlockingQueue<>(3);
System.out.println(names.offer("张鹏"));
System.out.println(names.offer("谢兴灵"));
System.out.println(names.offer("冯小龙"));//正常容量内的数据返回true
System.out.println(names.offer("蔡金恩"));//多添加的数据返回false
System.out.println(names.poll());
System.out.println(names.poll());
System.out.println(names.peek());//获得结果冯小龙,peek虽然能够获取到数据但是不会清除数据,所以只能叫检查类方法
System.out.println(names.poll());//获取正常容量内的数据返回对应的值
System.out.println(names.peek());//获得结果null
System.out.println(names.poll());//队列已经空了的时候再取值返回null
// System.out.println(names.remove());
}
}
第三组方法:
阻塞:
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> names=new ArrayBlockingQueue<>(3);
names.put("张鹏");
names.put("谢兴灵");
names.put("冯小龙");
//names.put("蔡金恩"); 程序走到这里的时候阻塞,
System.out.println(names.take());
System.out.println(names.take());
System.out.println(names.take());
System.out.println(names.take());//程序走到这里的时候又阻塞,取值已经把列队取空了。所以程序结束没有打印出来
System.out.println("程序结束");
}
}
代码语言:javascript复制package com.qf.threadQueue;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> names=new ArrayBlockingQueue<>(3);
CountDownLatch countDownLatch=new CountDownLatch(2);
new Thread(()->{
for (int i = 0; i < 4; i ) {
try {
System.out.println("开始存第" (i 1) "个值");
Thread.sleep(3000);//这里暂停的意义再于,取的时候如果列队中没有值要等待
names.put(i);
System.out.println("存值结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
},"put").start();
new Thread(()->{
for (int i = 0; i < 4; i ) {
try {
System.out.println("开始取第" (i 1) "个值");
//取值如果上面存值的线程处于暂停状态时,列队也是空的时候,上面线程停多少,这里就等多久
System.out.println(names.take());
System.out.println("取值结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
},"take").start();
countDownLatch.await();
System.out.println("程序结束");
}
}
//阻塞队列要由多个人去完成,一个人存,一个人取
第四组方法:
当队列满时再添加新的数据可以指定等待的时间,超过时间添加失败,取数据时,当队列空时,等待时间,超过时间取值失败。
代码语言:javascript复制import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Demo2 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue=new ArrayBlockingQueue<>(3);
Thread thread= new Thread(()->{
for (int i = 0; i < 4; i ) {
try {
//如果子线程先启动,这时的队列中空的状态,这里取值需要等待,如果3秒内,主线程有添加值进来就立即取出
System.out.println(queue.poll(3,TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
queue.offer("张鹏",3, TimeUnit.SECONDS);
queue.offer("谢兴灵",3, TimeUnit.SECONDS);
queue.offer("张万达",3, TimeUnit.SECONDS);
//如果子线程一直没有取,这里等待3秒,如果3秒内子线程取出了一个值,这里都能添加成功
queue.offer("冯小龙",3, TimeUnit.SECONDS);
thread.join();
System.out.println(queue.size());
System.out.println("程序结束");
}
}
SynchronousQueue
SynchronousQueue 没有容量。与其他的 BlockingQueue 不同,SynchronousQueue是一个不存储元素的 BlockingQueue 。 每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
代码语言:javascript复制package com.qf.threadQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue=new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println("开始取值");
System.out.println(queue.take()); //子线程走到这里如果没有元素进来也会进入阻塞状态,如果有元素直接取值
} catch (InterruptedException e) {
e.printStackTrace();
}
},"取值线程").start();
queue.put("张鹏");//main走到这里会进入阻塞状态,当子线程取完值以后,解除阻塞状态
}
}
SynchronousQueue 的测试案例一定要使用两个线程来操作,因为put一个值后当前线程会进入到阻塞状态,等待其他线程取值,取值完成后才执行put之后的代码,同理take()方法取值时如果SynchronousQueue 队列中没有元素的话也会进入等待状态,等其他线程为这个队列添加一个元素进来,因为SynchronousQueue 内部是没有空间来存放添加的元素的。
15.15 线程池
池化技术:线程的开启和Java没有关系,和CPU有关系,所以线程池是CPU中划分出几条线程出来,使用Java技术存储下来。线程池做的工作主要是:控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:线程复用,控制最大并发数,管理线程。
第一:降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系,统的稳定性,使用线程池可以进行统一分配,调优和监控。
线程池技术也是第四种创建线程的方式。
第一种,Executors线程工具类:
ExecutorService executorService = Executors.newFixedThreadPool(5); //创建指定固定数量的线程池 LinkedBlockingQueue
ExecutorService executorService = Executors.newSingleThreadExecutor(); //创建一个线程的线程池 LinkedBlockingQueue
ExecutorService executorService = Executors.newCachedThreadPool(); //创建一个最大值为Integer.MAX_VALUE的线程池 LinkedBlockingQueue
ScheduledExecutorService executorService=Executors.newScheduledThreadPool(3); 创建一个定时任务
ExecutorService 线程池服务类
返回值,Future<T> 如果任务类中有返回结果的话,使用Future获取线程中的计算结果。
submit(Runnable task) 开启线程的执行 参数一个任务类对象
submit(Callable<T> task) 返回一个Future对象用于结果的接收
execute(Runnable run) 只能执行不带返回值的任务
线程池用完以后记得要关闭 ,如果不关闭,线程池会一直运行
shutdown()
第一种 ExecutorService executorService = Executors.newFixedThreadPool(5);
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i ) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName() "hello");
});
}
executorService.shutdown();
}
}
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i ) {
executorService.submit(()->{
System.out.println(Thread.currentThread().getName() "hello");
});
}
executorService.shutdown();
}
}
计算1-10000之间所有数的和 ,使用线程池技术来进行优化
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);//定义出5条线程
int sum=0;
for (int i = 0; i < 10; i ) {
Calc1_10000 calc=new Calc1_10000(i*1000 1,i*1000 1000);
//将Callable接口用submit执行返回future ,使用future对象获取计算结果
Future<Integer> future = executorService.submit(calc);
sum = future.get();
}
System.out.println(sum);
executorService.shutdown();
}
}
第二种:ExecutorService executorService = Executors.newSingleThreadExecutor(); 创建只有一个线程的线程池
代码语言:javascript复制public class Demo2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i ) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName() "-------------hello");
});
}
executorService.shutdown();
}
}
第三种: ExecutorService executorService = Executors.newCachedThreadPool();
代码语言:javascript复制public class Demo3 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5000; i ) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName() "-------------hello");
});
}
executorService.shutdown();
}
}
代码语言:javascript复制SpringBoot cron表达式去完成@Scheduled定时任务
scheduleAtFixedRate(Runable,第一次延迟多久执行,每次执行的时间间隔,时间单位) TimeUnit.SECONDS
第二种,ThreadPoolExecutor原生类创建:
代码语言:javascript复制public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
corePoolSize -- > 正常活动空间
maximumPoolSize--> 最大活动空间
keepAliveTime --> 空闲时间
TimeUnit -->空闲时间对应的单位
BlockingQueue<Runnable> -->阻塞队列 用于存放等待排队的线程
ThreadFactory 线程创建工厂 默认不动Executors.defaultThreadFactory() 创建线程池工厂
RejectedExecutionHandler 拒绝策略
一个线程能够容纳多少线程数是maximumPoolSize workQueue空间来决定的
RejectedExecutionHandler 拒绝策略有四种:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务 (重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
代码语言:javascript复制public class Demo5 {
public static void main(String[] args) {
ThreadPoolExecutor pool=new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() //多出来的异常
//new ThreadPoolExecutor.DiscardPolicy() //多出来的直接丢弃
//new ThreadPoolExecutor.DiscardOldestPolicy() //丢弃队列中最前面的任务
//new ThreadPoolExecutor.CallerRunsPolicy() //回到调用处的线程上执行
);
for (int i = 0; i < 100; i ) {
final int j=i;
pool.execute(()->{
System.out.println(Thread.currentThread().getName() "-------------hello" j);
});
}
pool.shutdown();
}
}
AbortPolicy()
DiscardPolicy()抛弃掉了后面大部分任务
DiscardOldestPolicy() 抛弃掉前面的一些任务
CallerRunsPolicy()多出来的线程回到调用由调用处的线程执行
1、线程有几种创建方式? 四种 Runnable Thread Callable ThreadPoolExecutor
2、线程池有几种创建方式 Executors工具类 new ThreadPoolExecutor
3、Executors有几种创建方式?每种有什么区别 1.创建指定固定数量的线程池 2.创建一个线程的线程池 LinkedBlocingQueue 3. 创建一个最大值为Integer.MAX_VALUE的线程池 SynchronousQueue
4、为什么不建议用Executors创建? 不灵活
5、创建线程时构造方法有几个参数,什么含义 7个 每一个的含意
6、线程池中拒绝策略有几种
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务 (重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
7、如何去创建一个线程池?对线程数量的定义如何决策。
CPU密集型程序
根据CPU核数决定 CPU的核数是常用活动数,核数*2是最大活动数
代码语言:javascript复制ThreadPoolExecutor pool=new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors()*2,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
IO密集型
主要看功能的复杂度和写的频率
CPU核心数/(1-阻塞系数) 阻塞系数一般0.8~0.9
代码语言:javascript复制ThreadPoolExecutor pool=new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors()*5, // 8/(1-0.8)
Runtime.getRuntime().availableProcessors()*10, // 8/(1-0.8)*2
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
15.16 分支合并
线程中的分支合并,问题求1-10000以求所有数的和,采用二分的方式去计算,规定最小计算单元在100个数以内。
ForkJoinPool WorkQueue是一个ForkJoinPool中的内部类,它是线程池中线程的工作队列的一个封装,支持任务窃取
ForkJoinTask ForkJoinTask代表运行在ForkJoinPool中的任务。 FutureTask是一兄弟 Future接口
Recursive RecursiveAction 不带返回值的任务 RecursiveAction RecursiveTask 继承了ForkJoinTask
RecursiveTask 带返回值的任务
第一步实现一个任务类去继承RecursiveTask重写里面的compute()方法
第二步 可以使用多态的形式,ForkJoinTask task=new 自定义任务类();
第三步 ForkJoinPool.submit(task) 开始执行线程池
第四步 在子线程中 再创建一个ForkJoinTask 类的对象使用fork()方法进行任务窃取
第五步 使用ForkJoinTask join()来获取返回的结果
代码语言:javascript复制package com.qf.threadForkJoin;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class CalcNum extends RecursiveTask<Integer> {
private int start;
private int end;
public CalcNum(int start,int end){
this.start=start;
this.end=end;
}
@Override
protected Integer compute() {
if(end-start<=100){
//最小计算单元
int sum=0;
for(int i=start;i<=end;i ){
sum =i;
}
return sum;
}
int m=(start end)/2;
ForkJoinTask<Integer> leftFork=new CalcNum(start,m);
ForkJoinTask<Integer> rightFork=new CalcNum(m 1,end);
leftFork.fork();//窃取任务,继承以新的一个线程的形式向下执行compute方法
rightFork.fork();
return leftFork.join() rightFork.join();
}
}
代码语言:javascript复制package com.qf.threadForkJoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class Demo1 {
public static void main(String[] args) {
CalcNum cn=new CalcNum(0,10000);
ForkJoinPool pool=new ForkJoinPool();
pool.submit(cn);
Integer sum = cn.join();
System.out.println(sum);
pool.shutdown();
}
}
15.17 CompletableFuture
异步回调
CompletableFuture.runAsync(Runable run) 定义出一个没有返回值的任务
代码语言:javascript复制CompletableFuture completableFuture=CompletableFuture.runAsync(()->{
System.out.println("hello");
});
调用:
completableFuture.get() 方法进行任务的执行,get()返回一个Object类型的数据,如果是定义的不带返回值的任务,这里的get()返回的是一个null。
CompletableFuture.supplyAsync() 定义一个有返回结果的任务
下面的两个方法都是回调方法:
回调:
A-B的方法 A又给B传了一个A自己写的对象 由B来调用A写的这个对象中的方法,这就形成了回调。
whenComplete(BiConsumer<? super T,? super Throwable> action) 这里的参数是一个函数式接口,接口中的方法有两个参数,第一个代表执行成功的结果,如果执行不成功这个结果是一个null,第二个参数代表异常对象,如果执行成功第二个参数是一个null
exceptionally(Function<Throwable,? extends T> fn) 执行失败时执行 这个方法必须要有返回值,如果执行的任务是无返回值的任务时,这里的返回就return null; 如果执行的任务是带返回值的,这里返回的数据一定要去根据执行的任务来进行设计一个执行任务不可能出现的结果。
代码语言:javascript复制public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//链式编程
CompletableFuture completableFuture=CompletableFuture.runAsync(()->{
System.out.println("hello");
}).whenComplete((v,e)->{
if(e==null){
System.out.println("线程任务执行成功");
}
}).exceptionally((e)->{
System.out.println(e.getMessage());
return null;
});
completableFuture.get();
System.out.println("====================================================");
//带返回值的任务定义出来
CompletableFuture<Integer> completableFuture1=CompletableFuture.supplyAsync(()->{
int sum=0;
for (int i = 1; i <= 100 ; i ) {
sum =i;
int j=1/0;
}
return sum;
});
//可以不写它
completableFuture1.whenComplete((t,u)->{
System.out.println("whenComplete:" t); //t结果
System.out.println("whenComplete:" u); //u 异常
});
//也可以不写它
completableFuture1.exceptionally((t)->{
System.out.println("exceptionally:" t);
return -1;
});
System.out.println(completableFuture1.get());
}
}
15.18 JMM执行流程
Java内存模型(Java Memory Model)
JMM (Java Memory Model)是Java内存模型,JMM定义了程序中各个共享变量的访问规则,即在虚拟机中将变量存储到内存和从内存读取变量这样的底层细节。 为什么要设计JMM 屏蔽各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的内存访问效果。
JMM规定了共享变量都存储在主内存中 类的成员变量,类的静态变量
每条线程还有自己的工作内存 局部变量
线程的工作内存保存了主内存的副本拷贝,对变量的操作在工作内存中进行,不能直接操作主内存中的变量.不同线程间无法直接访问对方的工作内存变量,需要通过主内存完成。
JMM八大原子操作是:lock(锁定)、unlock(解锁)、read(读取)、load(载入)、use(使用)、assign(赋值)、store(存储)、write(写入)。
1、lock:作用于主内存的变量,把一个变量标记为一条线程独占状态。
2、unlock:作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
3、read:作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用。
4、load:作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
5、use:作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎。
6、assign:作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量。
7、store:作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
8、write:作用于工作内存的变量,它把store操作从工作内存中的一个变量的值传送到主内存的变量中。
volatile关键字:
1、保证可见性
代码语言:javascript复制public class Demo2 {
private volatile boolean status=false;
private volatile int num=0;
public static void main(String[] args) throws InterruptedException {
//对象 变量 局部变量
Demo2 m=new Demo2();
new Thread(()->{
//status false
while (!m.status) {
m.num ;
}
}).start();
TimeUnit.SECONDS.sleep(1);
//status false
m.status = true;
System.out.println(m.num);
System.out.println("程序结束");
}
}
保证最小单元的同步,比synchronized重量锁,Lock 效率都要高
javap -c Main1.class执行以后发现在java内部对 int i=0; boolean status=true;这样的操作实际是是两行代码
只要不是一行代码,就有可能发生线程并发 ,不同步, volatile保证的最小同步也就是变量定义这里。
再比如:
代码语言:javascript复制public class a{
public static void main(String[] args){
int i=1;
//A i=6 B i=6 并发
i=i 5;
System.out.println(i);
}
}
2、不保证原子性
原子性每一步操作都不可分割的。
代码语言:javascript复制public class Demo3 {
private volatile static int num=0;
public static void main(String[] args) {
for (int i = 1; i <= 20; i ) {
new Thread(()->{
for (int j = 1; j <= 1000 ; j ) {
num ;
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(num);
}
}
结果:
19920
为了达到最高的效率,放弃使用锁,如果进行同步,
java.util.concurrent.atomic 包 原子包
原子类: AtomicInteger AtocmicLong AtocmicBoolean ......
AtomicInteger
构造 :
AtomicInteger() -- 默认值为0的原子整型
AtomicInteger(value) -- 创建一个指定数据的原子整型
addAndGet(int delta) -- 将给指定的值加到原来的值上
get() -- 获取当前值
set(int newValue) -- 设定一个新值
getAndSet(int newValue) -- 用新值替换旧值,返回旧值
setAndGet(int newValue)-- 用新值替换旧值,返回新值
compareAndSet(expect,update) -- 如果给的预期的值是原子对象中的值,就修改为新的值
decrementAndGet() 减1
incrementAndGet() 加1
底层使用的原子类
private static final Unsafe unsafe = Unsafe.getUnsafe();
UnSafe UnSafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问, UnSafe相当于一个后门,基于该类可以直接操作特定内存的数据,Unsafe类存在于 sun.misc包中,其 内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。 注意:Unsafe类中的所有方法都是Native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层 资源执行相应任务
CAS 的全称为 Compare-And-Swap,它是一条CPU并发原语。 它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。 CAS并发原语体现在JAVA语言中就是 sun.misc.Unsafe 类中的各个方法。调用UnSafe类中的CAS方法, JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强 调,由于CAS是一种系统原语,原语属于操作系统用于范畴,是由若干条指令组成的,用于完成某个功 能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU 的原子指令,不会造成所谓的数据不一致问题。
compareAndSwapInt() 比较并交换
代码语言:javascript复制//来自于C底层 它能保证原子性 var1 当前的对象 var2 你觉得的当前对象的值 var5是当前对象内存中的值 var5 var4是要更新的值
//通过var1找到内存地址,然后 判断var2和var5是否一样,一样的话将var5 var4的值直接赋值到内存中 改变成功返回true,改变失败返回false
compareAndSwapInt(var1, var2, var5, var5 var4)
代码语言:javascript复制//调用处代码 AtomicInteger对象
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) 1;
}
Unsafe中的源码
public final int getAndAddInt(Object var1, long var2, int var4) {// a 5 2020 b 5 2022
int var5;
do {
//根据这个对象的地址和var2(预估的值),去内存中将对象真正的值拿到手
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 var4));
return var5;
}
代码语言:javascript复制num=0 //a 1 b var2 = 2 c var2 =2 2 d var2=3 3
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//A B C D
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 var4));
return var5;
}
3、禁止指令重排
javap -c Main1.class 可以查看一个Class类文件在java中真正的代码结构
多线程是交替执行,指令重排后结果是不可预估的,使用volatile 修饰的变量禁止对指令的位置重新排列。
代码语言:javascript复制public class a{
volatile static int i=1;
public static void main(String[] args){
i=i 5;
System.out.println(i);
}
}
volatile 修饰变量前后加上内存屏障,只要出现了这种内存屏障都是按顺序排列,不会进行指令重排。
volatile 实现了禁止指令重排优化,从而避免 多线程环境下程序出现乱序执行的现象。 先了解一个概念,内存屏障(Memory Barrier)又称内存栅栏,是一个CPU 指令,它的作用有两个: 1、保证特定操作的执行顺序。 2、保证某些变量的内存可见性(利用该特性实现volatile的内存可见性)。 由于编译器和处理器都能执行指令重排优化。如果在指令间插入一条 Memory Barrier 则会告诉编译器 和CPU,不管什么指令都不能和这条 Memory Barrier 指令重排序,也就是说,通过插入内存屏障禁止 在内存屏障前后的指令执行重排序优化。内存屏障另外一个作用是强制刷出各种CPU的缓存数据,因此 任何CPU上的线程都能读取到这些数据的最新版本。
15.19 CAS的ABA问题
ABA问题基本数据类型不影响
代码语言:javascript复制public class Main3 {
public static void main(String[] args) {
AtomicInteger atomic=new AtomicInteger(5);
//A A线程先将5改成了2020
System.out.println(atomic.compareAndSet(5, 2020));
//过了一会儿之后,又将2020改回了5
System.out.println(atomic.compareAndSet(2020, 5));
//B 现在进来将5,改成2022 修改成功
System.out.println(atomic.compareAndSet(5,2022));
System.out.println(atomic.get());
}
}
ABA引用数据类型中有问题
代码语言:javascript复制public class Main4 {
public static void main(String[] args) {
Main.Student stu1=new Main.Student(1,"张三");
AtomicReference<Main.Student> atomic=new AtomicReference<>(stu1);
//A线程
Main.Student stu2=new Main.Student(2,"李四");
atomic.compareAndSet(stu1,stu2);
stu1.setId(4);
stu1.setName("王五");
atomic.compareAndSet(stu2,stu1);
System.out.println(atomic.get());
//B线程 在上面A线程实际上已经对原对象的属性进行了修改
atomic.compareAndSet(stu1,new Main.Student(3,"赵六"));
}
}
解决方案加版本号
代码语言:javascript复制public class Main5 {
public static void main(String[] args) {
Main.Student stu1=new Main.Student(1,"张三");
AtomicStampedReference<Main.Student> atomic=new AtomicStampedReference<>(stu1,1);
Main.Student stu2=new Main.Student(2,"李四");
atomic.compareAndSet(stu1,stu2,1,2);
stu1.setId(4);
stu1.setName("王五");
atomic.compareAndSet(stu2,stu1,2,3);
//每一次修改都要修改版本号,版本号不一致时不让改
atomic.compareAndSet(stu1,new Main.Student(3,"朱六"),1,2);
System.out.println(atomic.getReference());
}
}
15.20 锁的种类
乐观锁和悲观锁:
无论是悲观锁还是乐观锁,都是人们定义出来的概念,可以认为是一种思想。其实不仅仅是关系型数据库系统中有乐观锁和悲观锁的概念,像 hibernate、tair、memcache 等都有类似的概念。所以,不应该拿乐观锁、悲观锁和其他的数据库锁等进行对比。乐观锁比较适用于读多写少的情况(多读场景),悲观锁比较适用于写多读少的情况(多写场景)。
悲观锁: 之所以叫做悲观锁,是因为这是一种对数据的修改持有悲观态度的并发控制方式。总是假设最坏的情况,每次读取数据的时候都默认其他线程会更改数据,因此需要进行加锁操作,当其他线程想要访问数据时,都需要阻塞挂起。悲观锁的实现:Java 里面的同步 synchronized 关键字的实现。
乐观锁: 乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果冲突,则返回给用户异常信息,让用户决定如何去做。乐观锁适用于读多写少的场景,这样可以提高程序的吞吐量。
公平锁非公平锁:
公平锁:是指多个线程按照申请锁的顺序来获取锁,类似排队打饭,先来后到。
非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比现申请的线程优先获取锁,在高并发的情况下,有可能会造成优先级反转或者饥饿现象。
互斥锁/共享锁/读写锁:
互斥锁在Java中的具体实现就是ReentrantLock
读写锁在Java中的具体实现就是ReadWriteLock
分段锁: 缓存redis会讲.
分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作。
下订单-->扣库存这样的一个功能模型,上万人下订单,支付,扣库存,手机库存10000台,如果有一个人先抢到这个资源根据锁的特点,一定会有9999人处于阻塞状态。这是只有一把锁的情况,这样会降低效率,业绩。可以把这个库存分为100份 phone=10000 phone1=100 phone2=100 phone3=100...phone100=100 分段锁
分段锁的设计目的是细化锁的粒度,当操作不需要更新整个数组的时候,就仅仅针对数组中的一项进行加锁操作。
偏向锁/轻量级锁/重量级锁:
这三种锁是指锁的状态,并且是针对Synchronized。在Java
5通过引入锁升级的机制来实现高效Synchronized。这三种锁的状态是通过对象监视器在对象头中的字段来表明的。
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。
自旋锁:
在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。
代码语言:javascript复制public class Main {
public static void main(String[] args) {
Clac c=new Clac();
new Thread(()->{
c.add(3,5);
}).start();
}
}
class Clac{
public synchronized void add(int num1,int num2){
int result=num1 num2;
show(result);
}
public synchronized void show(int num){
System.out.println(num);
}
}
add方法中调用show方法不再上锁
死锁:
死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干 涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性 就很低,否者就会因为争夺有限的资源而陷入死锁。
死锁检测
代码语言:javascript复制public class Main1 {
private static Object lock1=new Object();
private static Object lock2=new Object();
public static void main(String[] args) {
Calc c=new Calc();
new Thread(()->{
//线程1 走到这里对lock1上锁
synchronized (lock1){
try {
System.out.println("线程1开始");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//线程1这里对lock2上锁,因为线程2已经对lock2上锁了,线程1就会在这里阻塞,这里阻塞同步代码块执行不完
//执行不完就无法解锁,就会造成死锁
synchronized (lock2){
System.out.println("线程1结束");
}
}
},"A").start();
new Thread(()->{
//线程1 走到这里对lock2上锁
synchronized (lock2){
try {
System.out.println("线程2开始");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock1){
System.out.println("线程2结束");
}
}
},"B").start();
}
}
如何检测死锁
进入到jdk--> bin目录下 cmd打开命令提示窗口
jps -l 查看当前程序的进程号
jstack 进程号 排查死锁信息
代码语言:javascript复制Java stack information for the threads listed above:
===================================================
"B":
at com.qf.demo19.Main1.lambda$main$1(Main1.java:33)//当前这个类文件中的B线程所在的33行是死锁
- waiting to lock <0x000000076b6b24e0> (a java.lang.Object)
- locked <0x000000076b6b24f0> (a java.lang.Object)
at com.qf.demo19.Main1$$Lambda$2/1831932724.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
"A":
at com.qf.demo19.Main1.lambda$main$0(Main1.java:18)//当前这个类文件中的A线程所在的18行是死锁
- waiting to lock <0x000000076b6b24f0> (a java.lang.Object)
- locked <0x000000076b6b24e0> (a java.lang.Object)
at com.qf.demo19.Main1$$Lambda$1/990368553.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock. //提示就是死锁
上锁之后没有解锁。
----------小珑