Java多线程 (Part3: 线程、进程原理、阻塞队列)

2023-06-16 19:13:27 浏览数 (1)

Java 线程池原理

线程复用

通过 Override Thread类中的start方法,不断循环调用传递过来的Runnable对象

线程池的组成

线程池主要由4部分组成:

  • 线程池管理器: 创建和管理线程池
  • 工作线程: 线程池中的线程
  • 任务接口: 每个任务必须实现的接口,用于工作线程调度其运行
  • 任务队列: 存放待处理的任务,一种缓存机制

线程池 使用 Executor,Executors,ExecutorService,ThreadPoolExecutor,Callable,Future,FutureTask

代码语言:javascript复制
// ThreadPoolExecutor 内部构造
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime
                                , TimeUnit unit, BlockingQueue<Runnable> workQueue) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  1. corePoolSize. 线程池线程数量
  2. maximumPoolSize. 线程池最大线程数量
  3. keepAliveTime 多余空闲线程存活时间
  4. unit keepAliveTime 的 单位
  5. workQueue 任务队列,被提交但未被执行的任务
  6. threadFactory 线程工厂, 用于创建线程
  7. handler 拒绝策略,拒绝任务的策略

拒绝策略

  1. AbortPolicy 直接抛出异常,阻止系统正常运行
  2. CallerRunsPolicy 线程池未关闭时运行当前被丢弃任务
  3. DiscardOldestPolicy 丢弃即将被执行的任务,并尝试重新提交当前任务
  4. DiscardPolicy 丢弃无法处理的任务 不予处理

Java 线程池工作过程

  1. 线程池创建,无线程,任务队列通过参数传入
  2. 调用 execute() 添加任务;小于corePoolSize 立马执行,大于等于放入队列;当队列装满且小于maximumPoolSize,立刻执行;当队列装满且大于等于maximumPoolSize,线程池抛出 RejectExecutionException
  3. 线程完成当前任务,从队列中拿取下一个任务执行
  4. 当线程不执行任务且超过keepAliveTime,当线程数大于corePoolSize,线程会被停掉,当所有任务完成,收缩到corePoolSize大小

Java 阻塞队列原理

线程阻塞的两种情况

  • 队列没有数据,Cusomer端所有线程被自动阻塞,直到有数据放入队列
  • 队列塞满数据,Producer端所有线程被自动阻塞,知道队列中有空的位置

阻塞队列的主要方法

  • 插入 -- add(e)抛出异常 -- offer(e)特殊值 -- put(e)阻塞 -- offer(e,time,unit)超时
  • 移除 -- remove()抛出异常 -- poll()特殊值 -- take()阻塞 -- pool(time,unit)超时
  • 检查 -- element()抛出异常 -- peek()特殊值 -- 不可用 -- 不可用

Java中的阻塞队列

  1. ArrayBlockingQueue: 由数组组成的有界阻塞队列 (公平,非公平)
  2. LinkedBlockingQueue: 由链表组成的有界阻塞队列 (两个独立锁提高并发)
  3. PriorityBlockingQueue: 支持优先级排序的无界阻塞队列 (compareTo 排序实现优先)
  4. DelayQueue: 使用优先级队列的无界阻塞队列 (缓存失效,定时任务)
  5. SynchronousQueue: 不存储元素的阻塞队列 (不存储数据,可用于传递数据)
  6. LinkedTransferQueue: 由链表组成的无界阻塞队列
  7. LinkedBlockingDeque: 由链表组成的双向阻塞队列

CyclicBarrier、CountDownLatch、Semaphore 用法

CountDownLatch (线程计数器)

代码语言:javascript复制
final CountDownLatch latch = new CountDownLatch(2);
new Thread(){
    public void run() { 
        System.out.println("子线程" Thread.currentThread().getName() "正在执行");
        Thread.sleep(3000); 
        System.out.println("子线程" Thread.currentThread().getName() "执行完毕"); 
        latch.countDown();
    };
}.start();
new Thread(){ 
    public void run() {
        System.out.println("子线程" Thread.currentThread().getName() "正在执行"); 
        Thread.sleep(3000); 
        System.out.println("子线程" Thread.currentThread().getName() "执行完毕"); 
        latch.countDown();
    };
}.start();
System.out.println("等待 2 个子线程执行完毕..."); latch.await();
System.out.println("2 个子线程已经执行完毕"); System.out.println("继续执行主线程");

CyclicBarrier(回环栅栏-等待至Barrier状态再全部同时执行)

让一组线程等待到某个状态后再全部同时执行;调用 await()方法后 线程就处于barrier了

代码语言:javascript复制
public static void main(String[] args) {
    int N = 4;
    CyclicBarrier barrier = new CyclicBarrier(N);
    for(int i=0;i<N;i  )
        new Writer(barrier).start();    
}

static class Writer extends Thread { 
    private CyclicBarrier cyclicBarrier;
    public Writer(CyclicBarrier cyclicBarrier) { 
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() { 
        try {
                Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
                System.out.println(" 线 程 " Thread.currentThread().getName() "写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) { 
                e.printStackTrace();
            } catch(BrokenBarrierException e){ 
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务,比如数据操作");
    } 
}

Semaphore(信号量-控制同时访问的线程个数)

例如:若一个工厂有5台机器,但有8个工人,一台机器同时只能被一个工人使用,只有使用完,其他工人才能继续使用

代码语言:javascript复制
public static void main (String[] args) {
    int N = 8; //工人数
    Semaphore semaphore = new Semaphore(5); //机器数目
    for(int i=0;i<N;i  )
        new Worker(i,semaphore).start();
} 

static class Worker extends Thread { 
    private int num;
    private Semaphore semaphore;
    public Worker(int num,Semaphore semaphore){
        this.num = num;
        this.semaphore = semaphore; 
    }

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println("工人" this.num "占用一个机器在生产..."); 
            Thread.sleep(2000); 
            System.out.println("工人" this.num "释放出机器"); 
            semaphore.release();
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    } 
}

Volatile关键字使用

变量可见性

保证volatile的变量被所有线程可见

禁止重排序

volatile是比synchronized更轻量的同步所,一个变量被多个线程共享

Volatile 使用场景

  1. 对该变量的写操作不依赖当前值,或是单纯的变量赋值
  2. 不同的volatile变量之间,不能相互依赖

如何在两个线程中共享数据

Class上使用Synchronized 方法 (同步锁)

代码语言:javascript复制
public class MyData {
    private int j = 0;
    public synchronized void add() {
        j  ;
        System.out.println("线程"   Thread.currentThread().getName() "j为: " j);
    }
    public synchronized void dec() {
        j--;
        System.out.println("线程"   Thread.currentThread().getName() "j为: " j);
        public int getData() {
            return j;
        }
    }
}

public class AddRunnable implements Runnable {
    MyData data;
    public AddRunnable(MyData data) {
        this.data = data;
    }
    public void run() {
        data.add();
    }
}

public class DecRunnable implements Runnable {
    MyData data;
    public DecRunnable(MyData data) {
        this.data = data;
    }
    public void run() {
        data.dec();s
    }
}

public static void main (String[] args) {
    MyData data = new MyData();
    Runnable add = new AddRunnable(data);
    Runnable dec = new DecRunnable(data);
    for (int i=0; i < 2; i  ) {
        new Thread(add).start();
        new Thread(dec).start();
    }
}

Runnable上使用Synchronized方法 (同步锁)

代码语言:javascript复制
public class MyData {
    private int j=0;
    public synchronized void add() {
        j  ;
        System.out.println("线程"   Thread.currentThread().getName()   "j为: "   j);
    }
    public synchronized void dec() {
        j--;
        System.out.println("线程"   Thread.currentThread().getName()   "j为: "   j);
    }
    public int getData() {
        return j;
    }
}

public class TestThread {
    public static void main (String[] args) {
        final MyData data = new MyData();
        for (int i=0; i<2; i  ) {
            new Thread(new Runnable(){
                public void run() {
                    data.add();
                }
            }).start();
            new Thread(new Runnable(){
                public void run() {
                    data.dec();
                }
            }).start();
        }
    }
}

ThreadLocal作用 (线程本地 存储)

threadlocal 只在thread的生命周期内起作用

ThreadLocalMap

每一个线程都有自己独有的ThreadLocalMap,将 ThreadLocal 的静态实例作为key;有 set(v), get(), remove() 方法使用

ThreadLocal使用场景

ThreadLocal 被用来解决 数据库连接Session管理

代码语言:javascript复制
private static final ThreadLocal threadSession = new ThreadLocal();
public static Session getSession() throws InfrastructureException {
    Session s = (Session) threadSession.get();
    try {
        if (null == s) {
            s = getSessionFactory().openSession;
            threadSession.set(s);
        }
    } catch (HibernateException ex) {
        throw new InfrastructureException(ex);
    }
    return s;
}

Synchronized 和 ReentrantLock 的相同与不同

相同

  • 都是可重入锁,同一个线程可以多次获得同一个锁
  • 都保证了可见性和互斥性
  • 都用来协调多线程共享的对象和变量

不同

  • ReentrantLock 显式获得、释放Lock,Synchronized 隐式获得、释放Lock
  • ReentrantLock 可响应中断、可轮回;Synchronized 不可
  • ReentrantLock 是API级别,Sychronized 是 JVM级别
  • ReentrantLock 是可实现公平锁
  • ReentrantLock 是同步非阻塞,采用乐观并发策略;Synchronized 是同步阻塞,使用悲观并发策略
  • ReentrantLock 是 接口,Synchronized 是关键字
  • ReentrantLock 需要通过 unlock() 释放锁,Synchronized自动释放锁
  • ReentrantLock 可以响应中断,Synchronized 不能响应中断
  • ReentrantLock 可以知道是否成功获取锁,Synchronized 不能
  • ReentrantLock 可以实现读写锁,Synchronized 不能

ConcurrentHashMap 与并发

减小Lock颗粒度

缩小锁定Object的范围

ConcurrentHashMap 分段锁 (Segment Lock)

ConcurrentHashMap 内部被分为若干个小的HashMap,成为 Segment;

默认情况下一个ConcurrentHashmap 被细分为16个Segment

只对相应的Segment 加锁, Segment 和 Segment 之间是并行的

Java 中的线程调度

抢占式调度

线程执行、切换都由系统控制,这种调度机制不会让一个thread的堵塞导致整个process堵塞

协同式调度

某一线程执行完主动通知系统切换另一个线程; 不存在线程同步问题; 线程切换可以预知

一个thread阻塞会导致整个process堵塞

Java 线程调度 (抢占式调度)

JVM线程采用抢占式调度;优先级越高越先执行;优先级高不代表可以独自占用CPU时间片

线程让出CPU情况

  1. 线程主动放弃CPU
  2. 线程因为某些原因被阻塞
  3. 线程运行结束

进程调度算法

优先调度算法

  • First Come First Service 先来先服务
  • Short Job First 短作业(进程)优先调度
  • Short Process First 短进程优先

高优先权优先调度算法

  • 非抢占式优先权算法
  • 抢占式优先权调度算法
  • 高响应比优先调度算法

基于时间片的轮转调度算法

  • Round Robin 时间片轮转算法
  • 多级反馈队列调度算法

CAS (Compare And Swap/Set)

CAS(V,E,N) V代表更新的变量,E表示旧值,N表示新值;当V=E,V才被设为N;如果V!=E,表示其他线程更新;当前不做操作

CAS 是 乐观锁

原子包 java.util.concurrent.atomic 锁自旋

代码语言:javascript复制
public class AtomicInteger extends Number implements java.io.Serializable {
    private volatile int value;
    public final int get() {
        return value;
    }
    public final int getAndIncrement() {
        for(;;) {// CAS 自旋,一直尝试直到成功
            int current = get();
            int next = current   1;
            if (compareAndSet(current, next))
                return current;
        }
    }
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
}

ABA问题

数据在并发过程中被其他thread影响形成阶段性脏数据

部分CAS采用version number (版本号解决ABA问题)

AQS (抽象队列同步器)

AQS = AbstractQueuedSynchronizer; AQS 定义了一套多线程访问共享资源的同步器

ReentrantLock,Semaphore,CountDownLatch 都依赖于他

Exclusive 独享资源 (比如 ReentrantLock)

独占,只有一个thread能执行

Share 共享资源 (比如 Semaphore, CountDownLatch)

共享,多个threads可以执行

AQS主要实现方法

  1. isHeldExclusivelu() 线程是否独占资源
  2. tryAcquire(int) 独占方式 尝试获取资源
  3. tryRelease(int) 独占方式 尝试释放资源
  4. tryAcquireShared(int) 共享方式,尝试获取资源
  5. tryReleaseShared(int) 共享方式,尝试释放资源

0 人点赞