Java 线程池原理
线程复用
通过 Override Thread类中的start方法,不断循环调用传递过来的Runnable对象
线程池的组成
线程池主要由4部分组成:
- 线程池管理器: 创建和管理线程池
- 工作线程: 线程池中的线程
- 任务接口: 每个任务必须实现的接口,用于工作线程调度其运行
- 任务队列: 存放待处理的任务,一种缓存机制
线程池 使用 Executor,Executors,ExecutorService,ThreadPoolExecutor,Callable,Future,FutureTask
代码语言:javascript复制// ThreadPoolExecutor 内部构造
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime
, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
- corePoolSize. 线程池线程数量
- maximumPoolSize. 线程池最大线程数量
- keepAliveTime 多余空闲线程存活时间
- unit keepAliveTime 的 单位
- workQueue 任务队列,被提交但未被执行的任务
- threadFactory 线程工厂, 用于创建线程
- handler 拒绝策略,拒绝任务的策略
拒绝策略
- AbortPolicy 直接抛出异常,阻止系统正常运行
- CallerRunsPolicy 线程池未关闭时运行当前被丢弃任务
- DiscardOldestPolicy 丢弃即将被执行的任务,并尝试重新提交当前任务
- DiscardPolicy 丢弃无法处理的任务 不予处理
Java 线程池工作过程
- 线程池创建,无线程,任务队列通过参数传入
- 调用 execute() 添加任务;小于corePoolSize 立马执行,大于等于放入队列;当队列装满且小于maximumPoolSize,立刻执行;当队列装满且大于等于maximumPoolSize,线程池抛出 RejectExecutionException
- 线程完成当前任务,从队列中拿取下一个任务执行
- 当线程不执行任务且超过keepAliveTime,当线程数大于corePoolSize,线程会被停掉,当所有任务完成,收缩到corePoolSize大小
Java 阻塞队列原理
线程阻塞的两种情况
- 队列没有数据,Cusomer端所有线程被自动阻塞,直到有数据放入队列
- 队列塞满数据,Producer端所有线程被自动阻塞,知道队列中有空的位置
阻塞队列的主要方法
- 插入 -- add(e)抛出异常 -- offer(e)特殊值 -- put(e)阻塞 -- offer(e,time,unit)超时
- 移除 -- remove()抛出异常 -- poll()特殊值 -- take()阻塞 -- pool(time,unit)超时
- 检查 -- element()抛出异常 -- peek()特殊值 -- 不可用 -- 不可用
Java中的阻塞队列
- ArrayBlockingQueue: 由数组组成的有界阻塞队列 (公平,非公平)
- LinkedBlockingQueue: 由链表组成的有界阻塞队列 (两个独立锁提高并发)
- PriorityBlockingQueue: 支持优先级排序的无界阻塞队列 (compareTo 排序实现优先)
- DelayQueue: 使用优先级队列的无界阻塞队列 (缓存失效,定时任务)
- SynchronousQueue: 不存储元素的阻塞队列 (不存储数据,可用于传递数据)
- LinkedTransferQueue: 由链表组成的无界阻塞队列
- LinkedBlockingDeque: 由链表组成的双向阻塞队列
CyclicBarrier、CountDownLatch、Semaphore 用法
CountDownLatch (线程计数器)
代码语言:javascript复制final CountDownLatch latch = new CountDownLatch(2);
new Thread(){
public void run() {
System.out.println("子线程" Thread.currentThread().getName() "正在执行");
Thread.sleep(3000);
System.out.println("子线程" Thread.currentThread().getName() "执行完毕");
latch.countDown();
};
}.start();
new Thread(){
public void run() {
System.out.println("子线程" Thread.currentThread().getName() "正在执行");
Thread.sleep(3000);
System.out.println("子线程" Thread.currentThread().getName() "执行完毕");
latch.countDown();
};
}.start();
System.out.println("等待 2 个子线程执行完毕..."); latch.await();
System.out.println("2 个子线程已经执行完毕"); System.out.println("继续执行主线程");
CyclicBarrier(回环栅栏-等待至Barrier状态再全部同时执行)
让一组线程等待到某个状态后再全部同时执行;调用 await()方法后 线程就处于barrier了
代码语言:javascript复制public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i )
new Writer(barrier).start();
}
static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
System.out.println(" 线 程 " Thread.currentThread().getName() "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务,比如数据操作");
}
}
Semaphore(信号量-控制同时访问的线程个数)
例如:若一个工厂有5台机器,但有8个工人,一台机器同时只能被一个工人使用,只有使用完,其他工人才能继续使用
代码语言:javascript复制public static void main (String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i )
new Worker(i,semaphore).start();
}
static class Worker extends Thread {
private int num;
private Semaphore semaphore;
public Worker(int num,Semaphore semaphore){
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人" this.num "占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人" this.num "释放出机器");
semaphore.release();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Volatile关键字使用
变量可见性
保证volatile的变量被所有线程可见
禁止重排序
volatile是比synchronized更轻量的同步所,一个变量被多个线程共享
Volatile 使用场景
- 对该变量的写操作不依赖当前值,或是单纯的变量赋值
- 不同的volatile变量之间,不能相互依赖
如何在两个线程中共享数据
Class上使用Synchronized 方法 (同步锁)
代码语言:javascript复制public class MyData {
private int j = 0;
public synchronized void add() {
j ;
System.out.println("线程" Thread.currentThread().getName() "j为: " j);
}
public synchronized void dec() {
j--;
System.out.println("线程" Thread.currentThread().getName() "j为: " j);
public int getData() {
return j;
}
}
}
public class AddRunnable implements Runnable {
MyData data;
public AddRunnable(MyData data) {
this.data = data;
}
public void run() {
data.add();
}
}
public class DecRunnable implements Runnable {
MyData data;
public DecRunnable(MyData data) {
this.data = data;
}
public void run() {
data.dec();s
}
}
public static void main (String[] args) {
MyData data = new MyData();
Runnable add = new AddRunnable(data);
Runnable dec = new DecRunnable(data);
for (int i=0; i < 2; i ) {
new Thread(add).start();
new Thread(dec).start();
}
}
Runnable上使用Synchronized方法 (同步锁)
代码语言:javascript复制public class MyData {
private int j=0;
public synchronized void add() {
j ;
System.out.println("线程" Thread.currentThread().getName() "j为: " j);
}
public synchronized void dec() {
j--;
System.out.println("线程" Thread.currentThread().getName() "j为: " j);
}
public int getData() {
return j;
}
}
public class TestThread {
public static void main (String[] args) {
final MyData data = new MyData();
for (int i=0; i<2; i ) {
new Thread(new Runnable(){
public void run() {
data.add();
}
}).start();
new Thread(new Runnable(){
public void run() {
data.dec();
}
}).start();
}
}
}
ThreadLocal作用 (线程本地 存储)
threadlocal 只在thread的生命周期内起作用
ThreadLocalMap
每一个线程都有自己独有的ThreadLocalMap,将 ThreadLocal 的静态实例作为key;有 set(v), get(), remove() 方法使用
ThreadLocal使用场景
ThreadLocal 被用来解决 数据库连接,Session管理等
代码语言:javascript复制private static final ThreadLocal threadSession = new ThreadLocal();
public static Session getSession() throws InfrastructureException {
Session s = (Session) threadSession.get();
try {
if (null == s) {
s = getSessionFactory().openSession;
threadSession.set(s);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return s;
}
Synchronized 和 ReentrantLock 的相同与不同
相同
- 都是可重入锁,同一个线程可以多次获得同一个锁
- 都保证了可见性和互斥性
- 都用来协调多线程共享的对象和变量
不同
- ReentrantLock 显式获得、释放Lock,Synchronized 隐式获得、释放Lock
- ReentrantLock 可响应中断、可轮回;Synchronized 不可
- ReentrantLock 是API级别,Sychronized 是 JVM级别
- ReentrantLock 是可实现公平锁
- ReentrantLock 是同步非阻塞,采用乐观并发策略;Synchronized 是同步阻塞,使用悲观并发策略
- ReentrantLock 是 接口,Synchronized 是关键字
- ReentrantLock 需要通过 unlock() 释放锁,Synchronized自动释放锁
- ReentrantLock 可以响应中断,Synchronized 不能响应中断
- ReentrantLock 可以知道是否成功获取锁,Synchronized 不能
- ReentrantLock 可以实现读写锁,Synchronized 不能
ConcurrentHashMap 与并发
减小Lock颗粒度
缩小锁定Object的范围
ConcurrentHashMap 分段锁 (Segment Lock)
ConcurrentHashMap 内部被分为若干个小的HashMap,成为 Segment;
默认情况下一个ConcurrentHashmap 被细分为16个Segment
只对相应的Segment 加锁, Segment 和 Segment 之间是并行的
Java 中的线程调度
抢占式调度
线程执行、切换都由系统控制,这种调度机制不会让一个thread的堵塞导致整个process堵塞
协同式调度
某一线程执行完主动通知系统切换另一个线程; 不存在线程同步问题; 线程切换可以预知
一个thread阻塞会导致整个process堵塞
Java 线程调度 (抢占式调度)
JVM线程采用抢占式调度;优先级越高越先执行;优先级高不代表可以独自占用CPU时间片
线程让出CPU情况
- 线程主动放弃CPU
- 线程因为某些原因被阻塞
- 线程运行结束
进程调度算法
优先调度算法
- First Come First Service 先来先服务
- Short Job First 短作业(进程)优先调度
- Short Process First 短进程优先
高优先权优先调度算法
- 非抢占式优先权算法
- 抢占式优先权调度算法
- 高响应比优先调度算法
基于时间片的轮转调度算法
- Round Robin 时间片轮转算法
- 多级反馈队列调度算法
CAS (Compare And Swap/Set)
CAS(V,E,N) V代表更新的变量,E表示旧值,N表示新值;当V=E,V才被设为N;如果V!=E,表示其他线程更新;当前不做操作
CAS 是 乐观锁
原子包 java.util.concurrent.atomic 锁自旋
代码语言:javascript复制public class AtomicInteger extends Number implements java.io.Serializable {
private volatile int value;
public final int get() {
return value;
}
public final int getAndIncrement() {
for(;;) {// CAS 自旋,一直尝试直到成功
int current = get();
int next = current 1;
if (compareAndSet(current, next))
return current;
}
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
}
ABA问题
数据在并发过程中被其他thread影响形成阶段性脏数据
部分CAS采用version number (版本号解决ABA问题)
AQS (抽象队列同步器)
AQS = AbstractQueuedSynchronizer; AQS 定义了一套多线程访问共享资源的同步器
ReentrantLock,Semaphore,CountDownLatch 都依赖于他
Exclusive 独享资源 (比如 ReentrantLock)
独占,只有一个thread能执行
Share 共享资源 (比如 Semaphore, CountDownLatch)
共享,多个threads可以执行
AQS主要实现方法
- isHeldExclusivelu() 线程是否独占资源
- tryAcquire(int) 独占方式 尝试获取资源
- tryRelease(int) 独占方式 尝试释放资源
- tryAcquireShared(int) 共享方式,尝试获取资源
- tryReleaseShared(int) 共享方式,尝试释放资源