多线程
线程的应用
- 实现callable接口
- 继承Thread类
Request请求类:
代码语言:javascript复制public class Request {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Request{"
"name='" name '''
'}';
}
}
RequestProcessor接口:
代码语言:javascript复制/**
* 请求处理接口
*/
public interface RequestProcessor {
void processRequest(Request request);
}
PrintProcessor打印处理:
代码语言:javascript复制import java.util.concurrent.LinkedBlockingQueue;
/**
* 打印处理
*/
public class PrintProcessor extends Thread implements RequestProcessor {
//存放请求的队列
LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>();
private final RequestProcessor nextProcessor;
public PrintProcessor(RequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
public void processRequest(Request request) {
linkedBlockingQueue.add(request);
}
@Override
public void run() {
while (true) {
try {
Request request = linkedBlockingQueue.take();
System.out.println("print data:" request);
nextProcessor.processRequest(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
SaveProcessor保存处理类:
代码语言:javascript复制import java.util.concurrent.LinkedBlockingQueue;
/**
* 保存处理
*/
public class SaveProcessor extends Thread implements RequestProcessor {
LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>();
@Override
public void run() {
while (true) {
try {
Request request = linkedBlockingQueue.take();
System.out.println("save data:" request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void processRequest(Request request) {
linkedBlockingQueue.add(request);
}
}
测试类:
代码语言:javascript复制public class Demo {
PrintProcessor printProcessor;
public Demo () {
//责任链模式 先输出后保存
SaveProcessor saveProcessor = new SaveProcessor();
saveProcessor.start();
printProcessor = new PrintProcessor(saveProcessor);
printProcessor.start();
}
public static void main(String[] args) {
Request request = new Request();
request.setName("mic");
new Demo().doTest(request);
}
private void doTest(Request request) {
printProcessor.processRequest(request);
}
}
并发编程的基础
线程状态
6种:
NEW 没有调用start 方法 RUNNABLE 运行状态 BLOCKED 阻塞
- 等待阻塞 wait
- 同步阻塞 synchronized
- 其他阻塞 sleep/join
WAITING 等待 TIMED_WAITING 时间等待 TERMINATED 终止
线程的启动和终止
启动:start
终止: interrupt
线程安全问题
可见性 原子性 有序性
可见性:
代码语言:javascript复制import java.util.concurrent.TimeUnit;
/**
* 通过volatile保证可见性 把volatile去掉 线程不会关闭
* 可见性问题
*/
public class VisableDemo {
private volatile static boolean stop=false;
public static void main(String[] args) throws InterruptedException {
Thread thread=new Thread(()->{
int i=0;
while(!stop){
i ;
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
stop=true;
}
}
原子性:
代码语言:javascript复制/**
* 原子性例子 按理说count=1000 但结果并不是 因为count 不是原子操作
*/
public class AtomicDemo {
private static int count = 0;
public static void inc() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count ;
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i ) {
new Thread(AtomicDemo::inc).start();
}
Thread.sleep(4000);
System.out.println("运行结果:" count);
}
}
有序性不好模拟。。
有序性问题:
- 编译器的指令重排序
- cpu处理器的指令重排序
- 内存系统的重排序
cpu高速缓存
总线锁 缓存锁(MIES协议)
JMM内存模型
JMM如何解决原子性 可见性 有序性的问题
限制处理器的优化和使用内存屏障
解决方法:volatile synchronized final juc
原子性:synchronized(monitorenter、monitorexit)
可见性:volatile synchronized final
有序性:volatile synchronized
volatile
volatile
- 保证可见性(底层是使用lock指令)和禁止指令重排序
- 使用缓存锁(MESI 缓存一致性协议) M是modify I是Invalid E是Exclusive S是Shared
- 内存屏障
- 对每个volatile写操作的前面插入storestore barrier
- 对每个volatile写操作后面插入storeload barrier
- 对每个volatile读操作前面插入loadload barrier
- 对每个volatile读操作后面插入loadstore barrier
内存屏障两个作用:
- 保证数据的可见性
- 防止指令的重排序
指令重排序:
代码语言:javascript复制/**
* 指令重排序
*/
public class VolatileDemo {
private static int x=0,y=0;
private static int a=0,b=0;
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(()->{
a=1;
x=b;
});
Thread thread2 = new Thread(()->{
b=1;
y=a;
});
thread1.start();
thread2.start();
//线程没有执行完阻塞在这里 底层wait notify
thread1.join();
thread2.join();
System.out.println("x=" x "," "y=" y);
//可能结果 x=0,y=1 x=1,y=0,x=1,y=1
//也有可能x=0 y=0 t1执行x=b t2执行b=1 t2执行y=a t1执行a=1 ;
}
}
内存屏障
synchronized 原理分析
- synchronized是如何实现锁 锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态
- 为什么任何一个对象都可以成为锁 对象监视器ObjectMonitor
- 锁存在哪个地方 对象头
偏向锁:锁不存在竞争 都是由同一个线程获得
轻量级锁
重量级锁
wait和notify
wait()、notify()方法属于Object中的方法;对于Object中的方法,每个对象都拥有。
wait:
- 释放当前的对象锁
- 让当前对象锁处于等待
wait notify 实现原理图:
示例:
ThreadWait:
代码语言:javascript复制package waitandnotify;
public class ThreadWait extends Thread{
private Object lock;
public ThreadWait(Object lock) {
this.lock = lock;
}
public void run() {
synchronized (lock) {
System.out.println("开始执行 thread wait");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行结束 thread wait");
}
}
}
ThreadNotify:
代码语言:javascript复制package waitandnotify;
public class ThreadNotify extends Thread{
private Object lock;
public ThreadNotify(Object lock) {
this.lock = lock;
}
public void run() {
synchronized (lock) {
System.out.println("开始执行 thread notify");
try {
lock.notify();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行结束 thread notify");
}
}
}
测试类:
代码语言:javascript复制package waitandnotify;
public class Demo {
public static void main(String[] args) {
Object lock = new Object();
//两个线程使用同一个线程
ThreadWait threadWait = new ThreadWait(lock);
threadWait.start();
ThreadNotify threadNotify = new ThreadNotify(lock);
threadNotify.start();
}
}
执行结果:
代码语言:javascript复制开始执行 thread wait
开始执行 thread wait
执行结束 thread wait
执行结束 thread wait
lock
synchronized和lock区别:
- lock是一个类 synchronized是jvm关键字
- lock灵活 可以选择什么时候获得锁和释放锁 synchronized是被动的 在同步代码块执行完或者抛出异常时释放锁
- lock可以判断锁的状态 synchronized无法判断锁的状态
- lock 有公平锁和非公平锁 synchronized是非公平锁
ReentrantLock 重入锁
示例:
代码语言:javascript复制import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReenTrantLockDemo {
static Lock lock = new ReentrantLock();
private static int count =0;
public synchronized static void incr() throws InterruptedException {
Thread.sleep(1);
lock.lock();;
count ;
lock.unlock();
}
}
基于ReentrantLock 的AQS原理分析
调用流程图:
源码分析 ReentrantLock获得锁过程:
代码语言:javascript复制static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//加锁
if (compareAndSetState(0, 1))
//设置独占状态
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
compareAndSetState 比较并替换 替换的是state的值
state=0表示无锁 state>0 表示有锁
AQS acquire方法
代码语言:javascript复制public final void acquire(int arg) {
//EXCLUSIVE 代表独占的状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法(尝试获取锁):
代码语言:javascript复制final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//重入
int nextc = c acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node.EXCLUSIVE), arg) :
代码语言:javascript复制//返回当前节点
private Node addWaiter(Node mode) {
//把当前线程封装成node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//把当前节点加入队列中
enq(node);
return node;
}
enq方法:
代码语言:javascript复制private Node enq(final Node node) {
//自旋方式
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//保证线程安全
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法:
代码语言:javascript复制final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//获得锁成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获得锁失败判断是否挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
当前节点获得锁成功:
源码分析 ReentrantLock释放锁
unlock方法调用release
代码语言:javascript复制public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
代码语言:javascript复制protected final boolean tryRelease(int releases) {
//每释放一次锁 减一
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor方法:
代码语言:javascript复制private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒下一个节点来竞争锁
LockSupport.unpark(s.thread);
}
非公平锁和公平锁差别
ReentrantLock有公平锁和非公平锁 默认是非公平锁 公平锁就是排队等待获取锁 非公平锁就是进来就获取锁 获取不到再进行排队
NonfairSync非公平锁:
代码语言:javascript复制static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
FairSync公平锁:
代码语言:javascript复制static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
ReentrantReadWriteLock 读写锁
代码语言:javascript复制import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 读写锁
*/
public class RWLockDemo {
static Map<String,Object> cacheMap = new HashMap<>();
static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock read = readWriteLock.readLock();
static Lock write = readWriteLock.writeLock();
/**
* 读操作
* @param key
* @return
*/
public static final Object get(String key) {
read.lock();
try {
return cacheMap.get(key);
}finally {
read.unlock();
}
}
public static final Object set(String key,String value) {
write.lock();
try {
return cacheMap.put(key,value);
} finally {
write.unlock();
}
}
}
Condition
相当于jdk层面的wat notify
让某个线程在满足某个条件下才能被唤醒
ThreadWait:
代码语言:javascript复制package condition;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Condition await()
*/
public class ThreadWait extends Thread{
private Lock lock;
private Condition condition;
public ThreadWait(Lock lock,Condition condition) {
this.lock = lock;
this.condition = condition;
}
public void run() {
try {
lock.lock();
System.out.println("开始执行 thread wait");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("执行结束 thread wait");
}finally {
lock.unlock();
}
}
}
ThreadNotify:
代码语言:javascript复制package condition;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Condition signal()
*/
public class ThreadNotify extends Thread{
private Lock lock;
private Condition condition;
public ThreadNotify(Lock lock,Condition condition) {
this.lock = lock;
this.condition = condition;
}
public void run() {
try {
lock.lock();
System.out.println("开始执行 thread notify");
condition.signal();
System.out.println("执行结束 thread notify");
} catch (Exception e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
代码语言:javascript复制package condition;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//两个线程使用同一个线程
ThreadWait threadWait = new ThreadWait(lock,condition);
threadWait.start();
ThreadNotify threadNotify = new ThreadNotify(lock,condition);
threadNotify.start();
}
}
运行结果:
代码语言:javascript复制开始执行 thread wait
开始执行 thread notify
执行结束 thread notify
源码分析
await方法:
代码语言:javascript复制public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加队列
Node node = addConditionWaiter();
//释放所有锁 重入锁重入次数变成0
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//让当前线程阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal方法:
代码语言:javascript复制public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal
代码语言:javascript复制private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal
代码语言:javascript复制final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//加入AQS队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
AQS队列 存放争抢锁的线程
Condition队列 存放挂起的线程
CountdownLatch
倒计时到0时候才执行 countDownLatch.await(); 后面的代码
代码语言:javascript复制import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(3);
new Thread(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown(); //递减
}).start();
new Thread(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}).start();
new Thread(()->{
countDownLatch.countDown();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
countDownLatch.await(); //阻塞
System.out.println("执行完毕 ");
}
}
源码分析
countDownLatch.await();
代码语言:javascript复制public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly方法
代码语言:javascript复制public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取共享锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared
代码语言:javascript复制protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
getState==0 表示不需要阻塞
doAcquireSharedInterruptibly方法:
代码语言:javascript复制private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//计数器变成0
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//是否要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDownLatch.countDown();
代码语言:javascript复制protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//cas 递减操作
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
Semaphore
应用场景:限流
代码语言:javascript复制import java.util.concurrent.Semaphore;
/**
*/
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(5);
for(int i=0;i<10;i ){
new DoAnything(i,semaphore).start();
}
}
static class DoAnything extends Thread{
private int num;
private Semaphore semaphore;
public DoAnything(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 获取一个令牌
System.out.println("第" num "个线程进入");
Thread.sleep(2000);
semaphore.release();//释放令牌
System.out.println("第" num "释放令牌");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
原子操作 Atomic
12个
- 基本类型
- 数组类型
- 引用类型
- 属性类型(字段类型)
线程池原理分析
避免线程的重复创建 限流
代码语言:javascript复制public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//超时时间 超出核心线程数量以外的线程的空余线程的存活时间
TimeUnit unit,// 存活时间的单位
BlockingQueue<Runnable> workQueue,//阻塞队列
RejectedExecutionHandler handler) {//拒绝策略
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
Executors工厂
newFixedThreadPool: 创建一个固定的线程池
代码语言:javascript复制public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor : 创建只有一个线程的线程池
代码语言:javascript复制public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool :不限制最大线程数
代码语言:javascript复制public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),//不存数据的队列
threadFactory);
}
newScheduledThreadPool :定时器 延时执行的线程池
代码语言:javascript复制public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
线程池流程图:
ThreadPoolExecutor类的execute方法
代码语言:javascript复制public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//创建核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
代码语言:javascript复制private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
t.start(); 实现runable接口 调用 runWorker:
代码语言:javascript复制final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask 从队列中取
while (task != null || (task = getTask()) != null) {
//独占锁 线程正在运行不应该被阻断
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程池的监控
代码语言:javascript复制import java.util.Date;
import java.util.concurrent.*;
/**
* 线程池监控
*/
public class MyExecutors extends ThreadPoolExecutor {
//beforeExecutor、afterExecutor、shutdown
private ConcurrentMap<String,Date> startTime;
public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.startTime=new ConcurrentHashMap<>();
}
public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void shutdown() {
System.out.print("已经执行的任务数量:" this.getCompletedTaskCount() "n");
System.out.print("当前的活动线程数:" this.getActiveCount() "n");
System.out.print("当前排队的线程数:" this.getQueue().size() "n");
super.shutdown();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTime.put(String.valueOf(r.hashCode()),new Date());
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//只用一次
Date statDate=startTime.remove(String.valueOf(r.hashCode()));
Date finishDate=new Date();
long dif=finishDate.getTime()-statDate.getTime(); //执行间隔时间
System.out.println("任务耗时:" dif);
System.out.println("最大允许的线程数:" this.getMaximumPoolSize());
System.out.println("线程的空闲时间" this.getKeepAliveTime(TimeUnit.MILLISECONDS));
System.out.println("任务总数:" this.getTaskCount());
super.afterExecute(r, t);
}
public static ExecutorService newMyExecutors(){
return new MyExecutors(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}
ThreadPoolDemo
代码语言:javascript复制import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池监控测试
*/
public class ThreadPoolDemo implements Runnable{
static ExecutorService executorService=MyExecutors.newMyExecutors();
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
public static void main(String[] args) {
ThreadPoolExecutor tpe=(ThreadPoolExecutor)executorService;
for(int i=0;i<100;i ){
executorService.execute(new ThreadPoolDemo());
}
executorService.shutdown();
}
}