并发编程

2022-10-25 16:13:31 浏览数 (3)

多线程

线程的应用

  1. 实现callable接口
  2. 继承Thread类

Request请求类:

代码语言:javascript复制
public class Request {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Request{"  
                "name='"   name   '''  
                '}';
    }
}

RequestProcessor接口:

代码语言:javascript复制
/**
 * 请求处理接口
 */
public interface RequestProcessor {
    void processRequest(Request request);
}

PrintProcessor打印处理:

代码语言:javascript复制
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 打印处理
 */
public class PrintProcessor extends Thread implements RequestProcessor {

    //存放请求的队列
    LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>();

    private final RequestProcessor nextProcessor;

    public PrintProcessor(RequestProcessor nextProcessor) {
        this.nextProcessor = nextProcessor;
    }

    public void processRequest(Request request) {
        linkedBlockingQueue.add(request);
    }

    @Override
    public void run() {
        while (true) {
            try {
                Request request = linkedBlockingQueue.take();
                System.out.println("print data:" request);
                nextProcessor.processRequest(request);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

SaveProcessor保存处理类:

代码语言:javascript复制
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 保存处理
 */
public class SaveProcessor extends Thread implements RequestProcessor {

    LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>();

    @Override
    public void run() {
        while (true) {
            try {
                Request request = linkedBlockingQueue.take();
                System.out.println("save data:" request);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void processRequest(Request request) {
        linkedBlockingQueue.add(request);
    }
}

测试类:

代码语言:javascript复制
public class Demo {

    PrintProcessor printProcessor;

    public Demo () {
        //责任链模式 先输出后保存
        SaveProcessor saveProcessor = new SaveProcessor();
        saveProcessor.start();
        printProcessor = new PrintProcessor(saveProcessor);
        printProcessor.start();
    }
    public static void main(String[] args) {
        Request request = new Request();
        request.setName("mic");
        new Demo().doTest(request);
    }

    private void doTest(Request request) {
        printProcessor.processRequest(request);
    }
}

并发编程的基础

线程状态

6种:

NEW 没有调用start 方法 RUNNABLE 运行状态 BLOCKED 阻塞

  • 等待阻塞 wait
  • 同步阻塞 synchronized
  • 其他阻塞 sleep/join

WAITING 等待 TIMED_WAITING 时间等待 TERMINATED 终止

线程的启动和终止

启动:start

终止: interrupt

线程安全问题

可见性 原子性 有序性

可见性:

代码语言:javascript复制
import java.util.concurrent.TimeUnit;

/**
 *  通过volatile保证可见性 把volatile去掉 线程不会关闭
 * 可见性问题
 */
public class VisableDemo {

    private volatile static boolean stop=false;

    public static void main(String[] args) throws InterruptedException {
        Thread thread=new Thread(()->{
            int i=0;
            while(!stop){
                i  ;
            }
        });
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        stop=true;
    }
}

原子性:

代码语言:javascript复制
/**
 * 原子性例子 按理说count=1000 但结果并不是 因为count  不是原子操作 
 */
public class AtomicDemo {
    private static int count = 0;

    public static void inc() {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        count  ;
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000; i  ) {
            new Thread(AtomicDemo::inc).start();
        }
        Thread.sleep(4000);
        System.out.println("运行结果:" count);
    }
}

有序性不好模拟。。

有序性问题:

  1. 编译器的指令重排序
  2. cpu处理器的指令重排序
  3. 内存系统的重排序

cpu高速缓存

总线锁 缓存锁(MIES协议)

JMM内存模型

JMM如何解决原子性 可见性 有序性的问题

限制处理器的优化和使用内存屏障

解决方法:volatile synchronized final juc

原子性:synchronized(monitorenter、monitorexit)

可见性:volatile synchronized final

有序性:volatile synchronized

volatile

volatile

  1. 保证可见性(底层是使用lock指令)和禁止指令重排序
  2. 使用缓存锁(MESI 缓存一致性协议) M是modify I是Invalid E是Exclusive S是Shared
  3. 内存屏障
    • 对每个volatile写操作的前面插入storestore barrier
    • 对每个volatile写操作后面插入storeload barrier
    • 对每个volatile读操作前面插入loadload barrier
    • 对每个volatile读操作后面插入loadstore barrier

内存屏障两个作用:

  1. 保证数据的可见性
  2. 防止指令的重排序

指令重排序:

代码语言:javascript复制
/**
 * 指令重排序
 */
public class VolatileDemo {
    private static int x=0,y=0;
    private static int a=0,b=0;

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(()->{
            a=1;
            x=b;
        });
        Thread thread2 = new Thread(()->{
            b=1;
            y=a;
        });
        thread1.start();
        thread2.start();

        //线程没有执行完阻塞在这里 底层wait notify
        thread1.join();
        thread2.join();
        System.out.println("x=" x "," "y=" y);

        //可能结果 x=0,y=1 x=1,y=0,x=1,y=1
        //也有可能x=0 y=0  t1执行x=b t2执行b=1 t2执行y=a t1执行a=1 ;

    }
}

内存屏障

synchronized 原理分析

  1. synchronized是如何实现锁 锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态
  2. 为什么任何一个对象都可以成为锁 对象监视器ObjectMonitor
  3. 锁存在哪个地方 对象头

偏向锁:锁不存在竞争 都是由同一个线程获得

轻量级锁

重量级锁

wait和notify

wait()、notify()方法属于Object中的方法;对于Object中的方法,每个对象都拥有。

wait:

  1. 释放当前的对象锁
  2. 让当前对象锁处于等待

wait notify 实现原理图:

示例:

ThreadWait:

代码语言:javascript复制
package waitandnotify;

public class ThreadWait extends Thread{
    private Object lock;
    public ThreadWait(Object lock) {
        this.lock = lock;
    }

    public void run() {
        synchronized (lock) {
            System.out.println("开始执行 thread wait");
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行结束 thread wait");
        }
    }
}

ThreadNotify:

代码语言:javascript复制
package waitandnotify;

public class ThreadNotify extends Thread{
    private Object lock;
    public ThreadNotify(Object lock) {
        this.lock = lock;
    }

    public void run() {
        synchronized (lock) {
            System.out.println("开始执行 thread notify");
            try {
                lock.notify();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("执行结束 thread notify");
        }
    }
}

测试类:

代码语言:javascript复制
package waitandnotify;

public class Demo {
    public static void main(String[] args) {
        Object lock = new Object();
        //两个线程使用同一个线程
        ThreadWait threadWait = new ThreadWait(lock);
        threadWait.start();
        ThreadNotify threadNotify = new ThreadNotify(lock);
        threadNotify.start();
        
        
    }
}

执行结果:

代码语言:javascript复制
开始执行 thread wait
开始执行 thread wait
执行结束 thread wait
执行结束 thread wait

lock

synchronized和lock区别:

  1. lock是一个类 synchronized是jvm关键字
  2. lock灵活 可以选择什么时候获得锁和释放锁 synchronized是被动的 在同步代码块执行完或者抛出异常时释放锁
  3. lock可以判断锁的状态 synchronized无法判断锁的状态
  4. lock 有公平锁和非公平锁 synchronized是非公平锁

ReentrantLock 重入锁

示例:

代码语言:javascript复制
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReenTrantLockDemo {
    static Lock lock = new ReentrantLock();

    private static int count =0;

    public synchronized static void incr() throws InterruptedException {
        Thread.sleep(1);
        lock.lock();;
        count  ;
        lock.unlock();
    }
}

基于ReentrantLock 的AQS原理分析

调用流程图:

源码分析 ReentrantLock获得锁过程:
代码语言:javascript复制
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            //加锁
            if (compareAndSetState(0, 1))
                //设置独占状态
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

compareAndSetState 比较并替换 替换的是state的值

state=0表示无锁 state>0 表示有锁

AQS acquire方法

代码语言:javascript复制
public final void acquire(int arg) {
    //EXCLUSIVE 代表独占的状态
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire方法(尝试获取锁):

代码语言:javascript复制
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
    	//重入
        int nextc = c   acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addWaiter(Node.EXCLUSIVE), arg) :

代码语言:javascript复制
//返回当前节点
private Node addWaiter(Node mode) {
    //把当前线程封装成node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //把当前节点加入队列中
    enq(node);
    return node;
}

enq方法:

代码语言:javascript复制
private Node enq(final Node node) {
    //自旋方式
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //保证线程安全
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued方法:

代码语言:javascript复制
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                //获得锁成功
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 获得锁失败判断是否挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

当前节点获得锁成功:

源码分析 ReentrantLock释放锁

unlock方法调用release

代码语言:javascript复制
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease

代码语言:javascript复制
protected final boolean tryRelease(int releases) {
    //每释放一次锁 减一
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor方法:

代码语言:javascript复制
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
    	// 唤醒下一个节点来竞争锁
        LockSupport.unpark(s.thread);
}
非公平锁和公平锁差别

ReentrantLock有公平锁和非公平锁 默认是非公平锁 公平锁就是排队等待获取锁 非公平锁就是进来就获取锁 获取不到再进行排队

NonfairSync非公平锁:

代码语言:javascript复制
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

FairSync公平锁:

代码语言:javascript复制
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c   acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

ReentrantReadWriteLock 读写锁

代码语言:javascript复制
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 读写锁
 */
public class RWLockDemo {

    static Map<String,Object> cacheMap = new HashMap<>();

    static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    static Lock read = readWriteLock.readLock();
    static Lock write = readWriteLock.writeLock();

    /**
     * 读操作
     * @param key
     * @return
     */
    public static  final  Object get(String key) {
        read.lock();
        try {
            return cacheMap.get(key);
        }finally {
            read.unlock();
        }
    }

    public static  final  Object set(String key,String value) {
        write.lock();
        try {
            return cacheMap.put(key,value);
        } finally {
            write.unlock();
        }
    }





}

Condition

相当于jdk层面的wat notify

让某个线程在满足某个条件下才能被唤醒

ThreadWait:

代码语言:javascript复制
package condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Condition await()
 */
public class ThreadWait extends Thread{
    private Lock lock;
    private Condition condition;
    public ThreadWait(Lock lock,Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    public void run() {
            try {
                lock.lock();
                System.out.println("开始执行 thread wait");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("执行结束 thread wait");
            }finally {
                lock.unlock();
            }

    }
}

ThreadNotify:

代码语言:javascript复制
package condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Condition signal()
 */
public class ThreadNotify extends Thread{
    private Lock lock;
    private Condition condition;
    public ThreadNotify(Lock lock,Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    public void run() {
            try {
                lock.lock();
                System.out.println("开始执行 thread notify");
                condition.signal();
                System.out.println("执行结束 thread notify");
            } catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
    }
}
代码语言:javascript复制
package condition;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        //两个线程使用同一个线程
        ThreadWait threadWait = new ThreadWait(lock,condition);
        threadWait.start();
        ThreadNotify threadNotify = new ThreadNotify(lock,condition);
        threadNotify.start();
    }
}

运行结果:

代码语言:javascript复制
开始执行 thread wait
开始执行 thread notify
执行结束 thread notify

源码分析

await方法:

代码语言:javascript复制
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //添加队列
    Node node = addConditionWaiter();
    //释放所有锁 重入锁重入次数变成0
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //让当前线程阻塞
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal方法:

代码语言:javascript复制
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

doSignal

代码语言:javascript复制
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

transferForSignal

代码语言:javascript复制
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //加入AQS队列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

AQS队列 存放争抢锁的线程

Condition队列 存放挂起的线程

CountdownLatch

倒计时到0时候才执行 countDownLatch.await(); 后面的代码

代码语言:javascript复制
import java.util.concurrent.CountDownLatch;


public class CountDownLatchDemo {



    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch=new CountDownLatch(3);
        new Thread(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();  //递减

        }).start();

        new Thread(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();

        }).start();

        new Thread(()->{
            countDownLatch.countDown();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        countDownLatch.await(); //阻塞
        System.out.println("执行完毕 ");
    }
}

源码分析

countDownLatch.await();
代码语言:javascript复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly方法

代码语言:javascript复制
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试获取共享锁
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared

代码语言:javascript复制
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

getState==0 表示不需要阻塞

doAcquireSharedInterruptibly方法:

代码语言:javascript复制
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //计数器变成0
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //是否要挂起线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
countDownLatch.countDown();
代码语言:javascript复制
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
       	//cas 递减操作
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

Semaphore

应用场景:限流

代码语言:javascript复制
import java.util.concurrent.Semaphore;

/**
 */
public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(5);
        for(int i=0;i<10;i  ){
            new DoAnything(i,semaphore).start();
        }

    }
    static class DoAnything extends Thread{
        private int num;
        private Semaphore semaphore;

        public DoAnything(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire(); // 获取一个令牌
                System.out.println("第" num "个线程进入");
                Thread.sleep(2000);
                semaphore.release();//释放令牌
                System.out.println("第" num "释放令牌");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

原子操作 Atomic

12个

  1. 基本类型
  2. 数组类型
  3. 引用类型
  4. 属性类型(字段类型)

线程池原理分析

避免线程的重复创建 限流

代码语言:javascript复制
public ThreadPoolExecutor(int corePoolSize,//核心线程数
                          int maximumPoolSize,//最大线程数
                          long keepAliveTime,//超时时间 超出核心线程数量以外的线程的空余线程的存活时间
                          TimeUnit unit,// 存活时间的单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列
                          RejectedExecutionHandler handler) {//拒绝策略
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

Executors工厂

newFixedThreadPool: 创建一个固定的线程池

代码语言:javascript复制
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor : 创建只有一个线程的线程池

代码语言:javascript复制
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool :不限制最大线程数

代码语言:javascript复制
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),//不存数据的队列
                                  threadFactory);
}

newScheduledThreadPool :定时器 延时执行的线程池

代码语言:javascript复制
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

线程池流程图:

ThreadPoolExecutor类的execute方法

代码语言:javascript复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        //创建核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
代码语言:javascript复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

t.start(); 实现runable接口 调用 runWorker:

代码语言:javascript复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //getTask 从队列中取
        while (task != null || (task = getTask()) != null) {
        	//独占锁  线程正在运行不应该被阻断
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks  ;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

线程池的监控

代码语言:javascript复制
import java.util.Date;
import java.util.concurrent.*;

/**
 * 线程池监控
 */
public class MyExecutors extends ThreadPoolExecutor {
    //beforeExecutor、afterExecutor、shutdown

    private ConcurrentMap<String,Date> startTime;


    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.startTime=new ConcurrentHashMap<>();
    }

    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MyExecutors(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void shutdown() {
        System.out.print("已经执行的任务数量:" this.getCompletedTaskCount() "n");
        System.out.print("当前的活动线程数:" this.getActiveCount() "n");
        System.out.print("当前排队的线程数:" this.getQueue().size() "n");
        super.shutdown();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTime.put(String.valueOf(r.hashCode()),new Date());
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        //只用一次
        Date statDate=startTime.remove(String.valueOf(r.hashCode()));
        Date finishDate=new Date();
        long dif=finishDate.getTime()-statDate.getTime(); //执行间隔时间
        System.out.println("任务耗时:" dif);
        System.out.println("最大允许的线程数:" this.getMaximumPoolSize());
        System.out.println("线程的空闲时间" this.getKeepAliveTime(TimeUnit.MILLISECONDS));
        System.out.println("任务总数:" this.getTaskCount());
        super.afterExecute(r, t);
    }

    public static ExecutorService newMyExecutors(){
        return new MyExecutors(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
    }
}

ThreadPoolDemo

代码语言:javascript复制
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池监控测试
 */
public class ThreadPoolDemo implements Runnable{

    static ExecutorService executorService=MyExecutors.newMyExecutors();

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        ThreadPoolExecutor tpe=(ThreadPoolExecutor)executorService;
        for(int i=0;i<100;i  ){
            executorService.execute(new ThreadPoolDemo());
        }
        executorService.shutdown();
    }
}

0 人点赞