Java-多线程进阶

2023-10-26 16:34:06 浏览数 (1)

Java多线程进阶

锁的类型

乐观锁vs悲观锁

悲观锁:

总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。

乐观锁:

假设数据一般情况下不会产生并发冲突,所以在数据进行提交更新的时候,才会正式对数据是否产生并发冲突进行检测,如果发现并发冲突了,则让返回用户错误的信息,让用户决定如何去做。

Synchronized 初始使用乐观锁策略. 当发现锁竞争比较频繁的时候, 就会自动切换成悲观锁策略

乐观锁的一个重要功能就是要检测出数据是否发生访问冲突. 我们可以引入一个 “版本号” 来解决访问冲突以及ABA问题

读写锁

读写锁(readers-writer lock),看英文可以顾名思义,在执行加锁操作时需要额外表明读写意图,复数读者之间并不互斥,而写者则要求与任何人互斥。

读写锁就是把读操作和写操作区分对待:

  1. Java 标准库提供了 ReentrantReadWriteLock 类, 实现了读写锁
  2. ReentrantReadWriteLock.ReadLock 类表示一个读锁. 这个对象提供了 lock / unlock 方法进行加锁解锁
  3. ReentrantReadWriteLock.WriteLock 类表示一个写锁. 这个对象也提供了 lock / unlock 方法进行加锁解锁

读写锁特别适合于 “频繁读, 不频繁写” 的场景中. (这样的场景其实也是非常广泛存在的)

重量级锁vs轻量级锁

锁的核心特性 “原子性”, 这样的机制追根溯源是 CPU 这样的硬件设备提供的

CPU 提供了 “原子操作指令”

操作系统基于 CPU 的原子指令, 实现了 mutex 互斥锁

JVM 基于操作系统提供的互斥锁, 实现了 synchronized 和 ReentrantLock 等关键字和类

注意:synchronized 并不仅仅是对 mutex 进行封装, 在 synchronized 内部还做了很多其他的工作

重量级锁: 加锁机制重度依赖了 OS 提供了 mutex,大量的内核态用户态切换,很容易引发线程的调度

轻量级锁: 加锁机制尽可能不使用 mutex, 而是尽量在用户态代码完成. 实在搞不定了, 再使用 mutex,少量的内核态用户态切换,不太容易引发线程调度

synchronized 开始是一个轻量级锁. 如果锁冲突比较严重, 就会变成重量级锁

自旋锁

自旋锁:获取锁失败, 立即再尝试获取锁, 无限循环, 直到获取到锁为止

自旋锁是一种典型的 轻量级锁 的实现方式

优点: 没有放弃 CPU, 不涉及线程阻塞和调度, 一旦锁被释放, 就能第一时间获取到锁

缺点: 如果锁被其他线程持有的时间比较久, 那么就会持续的消耗 CPU 资源. (而挂起等待的时候是不消耗 CPU 的)

synchronized 中的轻量级锁策略大概率就是通过自旋锁的方式实现的

公平锁vs非公平锁

公平锁: 遵守 “先来后到”. B 比 C 先来的. 当 A 释放锁的之后, B 就能先于 C 获取到锁

非公平锁: 不遵守 “先来后到”. B 和 C 都有可能获取到锁

操作系统内部的线程调度就可以视为是随机的. 如果不做任何额外的限制, 锁就是非公平锁

如果要想实现公平锁, 就需要依赖额外的数据结构, 来记录线程们的先后顺序

公平锁和非公平锁没有好坏之分, 关键还是看适用场景

synchronized 是非公平锁

可重入锁vs不可重入锁

可重入锁的字面意思是“可以重新进入的锁”,即允许同一个线程多次获取同一把锁

Java里只要以Reentrant开头命名的锁都是可重入锁,而且JDK提供的所有现成的Lock实现类,包括synchronized关键字锁都是可重入的。

而 Linux 系统提供的 mutex 是不可重入锁

CAS

CAS介绍和原理

CAS: 全称Compare and swap,字面意思:”比较并交换“

一个 CAS 涉及到以下操作:

  1. 比较 A 与 V 是否相等(比较)
  2. 如果比较相等,将 B 写入 V(交换)
  3. 返回操作是否成功

当多个线程同时对某个资源进行CAS操作,只能有一个线程操作成功,但是并不会阻塞其他线程,其他线程只会收到操作失败的信号。

CAS 可以视为是一种乐观锁. (或者可以理解成 CAS 是乐观锁的一种实现方式)

针对不同的操作系统,JVM 用到了不同的 CAS 实现原理:

  1. java 的 CAS 利用的的是 unsafe 这个类提供的 CAS 操作
  2. unsafe 的 CAS 依赖了的是 jvm 针对不同的操作系统实现的 Atomic::cmpxchg
  3. Atomic::cmpxchg 的实现使用了汇编的 CAS 操作,并使用 cpu 硬件提供的 lock 机制保证其原子性
CAS应用

实现原子类:

标准库中提供了 java.util.concurrent.atomic 包, 里面的类都是基于这种方式来实现的

典型的就是 AtomicInteger 类. 其中的 getAndIncrement 相当于 i 操作

代码语言:javascript复制
AtomicInteger atomicInteger = new AtomicInteger(0);
// 相当于 i  
atomicInteger.getAndIncrement();

伪代码实现:

代码语言:javascript复制
class AtomicInteger {
    private int value;
    public int getAndIncrement() {
        int oldValue = value;
        while ( CAS(value, oldValue, oldValue 1) != true) {
            oldValue = value;
        }
        return oldValue;
    }
}
自旋锁

基于 CAS 实现更灵活的锁, 获取到更多的控制权

自旋锁伪代码:

代码语言:javascript复制
public class SpinLock {
    private Thread owner = null;
    public void lock(){
        // 通过 CAS 看当前锁是否被某个线程持有.
        // 如果这个锁已经被别的线程持有, 那么就自旋等待.
        // 如果这个锁没有被别的线程持有, 那么就把 owner 设为当前尝试加锁的线程.
        while(!CAS(this.owner, null, Thread.currentThread())){
        }
    }
    public void unlock (){
        this.owner = null;
    }
}
CAS 的 ABA 问题

问题描述:

两个线程 t1 和 t2. 有一个共享变量 num, 初始值为 A

线程 t1 想使用 CAS 把 num 值改成 Z,需要读取数据和修改两个操作

在 t1 执行这两个操作之间, t2 线程可能把 num 的值从 A 改成了 B, 又从 B 改成了 A

大部分的情况下, t2 线程这样的一个反复横跳改动, 对于 t1 是否修改 num 是没有影响的. 但是不排除一些特殊情况

解决方案:

给要修改的值, 引入版本号. 在 CAS 比较数据当前值和旧值的同时, 也要比较版本号是否符合预期,CAS 操作在读取旧值的同时, 也要读取版本号

真正修改的时候,如果当前版本号和读到的版本号相同, 则修改数据, 并把版本号 1;如果当前版本号高于读到的版本号. 就操作失败(认为数据已经被修改过了).

在 Java 标准库中提供了 AtomicStampedReference 类. 这个类可以对某个类进行包装, 在内部就提供了版本管理功能

Synchronized 原理

Synchronized 具有以下特性(只考虑 JDK 1.8):

  1. 开始时是乐观锁, 如果锁冲突频繁, 就转换为悲观锁
  2. 开始是轻量级锁实现, 如果锁被持有的时间较长, 就转换成重量级锁
  3. 实现轻量级锁的时候大概率用到的自旋锁策略
  4. 是一种不公平锁
  5. 是一种可重入锁
  6. 不是读写锁

JVM 将 synchronized 锁分为 无锁、偏向锁、轻量级锁、重量级锁 状态。会根据情况,进行依次升级

偏向锁:

  1. 第一个尝试加锁的线程, 优先进入偏向锁状态
  2. 偏向锁不是真的 “加锁”, 只是给对象头中做一个 “偏向锁的标记”, 记录这个锁属于哪个线程
  3. 如果后续没有其他线程来竞争该锁, 那么就不用进行其他同步操作了(避免了加锁解锁的开销)
  4. 如果后续有其他线程来竞争该锁(刚才已经在锁对象中记录了当前锁属于哪个线程了, 很容易识别当前申请锁的线程是不是之前记录的线程), 那就取消原来的偏向锁状态, 进入一般的轻量级锁状态

轻量级锁:

  1. 随着其他线程进入竞争, 偏向锁状态被消除, 进入轻量级锁状态(自适应的自旋锁),此处的轻量级锁就是通过 CAS 来实现
  2. 通过 CAS 检查并更新一块内存 (比如 null => 该线程引用)
  3. 如果更新成功, 则认为加锁成功;如果更新失败, 则认为锁被占用, 继续自旋式的等待(并不放弃 CPU)

重量级锁:

  1. 如果竞争进一步激烈, 自旋不能快速获取到锁状态, 就会膨胀为重量级锁,此处的重量级锁就是指用到内核提供的 mutex
  2. 执行加锁操作, 先进入内核态,在内核态判定当前锁是否已经被占用
  3. 如果该锁没有占用, 则加锁成功, 并切换回用户态;如果该锁被占用, 则加锁失败. 此时线程进入锁的等待队列, 挂起. 等待被操作系统唤醒

其他的优化操作:

锁消除:

编译器 JVM 判断锁是否可消除. 如果可以, 就直接消除

有些应用程序的代码中, 用到了 synchronized, 但其实没有在多线程环境下, 那么这些加锁解锁操作是没有必要的, 白白浪费了一些资源开销

锁粗化:

一段逻辑中如果出现多次加锁解锁, 编译器 JVM 会自动进行锁的粗化

实际开发过程中, 使用细粒度锁, 是期望释放锁的时候其他线程能使用锁,但是实际上可能并没有其他线程来抢占这个锁,这种情况 JVM 就会自动把锁粗化, 避免频繁申请释放锁

Callable 接口

Callable 是一个 interface,相当于把线程封装了一个 “返回值”,方便程序得到多线程的方式计算结果

代码语言:javascript复制
Callable<Integer> callable = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 1000; i  ) {
            sum  = i;
        }
    	return sum;
    }
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Thread t = new Thread(futureTask);
t.start();
int result = futureTask.get();
System.out.println(result);	

Callable 和 Runnable 相对, 都是描述一个 “任务”:Callable 描述的是带有返回值的任务,Runnable 描述的是不带返回值的任务

Callable 通常需要搭配 FutureTask 来使用:FutureTask 用来保存 Callable 的返回结果,Callable 往往是在另一个线程中执行的, 啥时候执行完并不确定,FutureTask 就可以负责这个等待结果出来的工作

JUC常见类

ReentrantLock

可重入互斥锁,和 synchronized 定位类似, 都是用来实现互斥效果, 保证线程安全

ReentrantLock 的用法:

  1. lock(): 加锁, 如果获取不到锁就死等
  2. trylock(超时时间): 加锁, 如果获取不到锁, 等待一定的时间之后就放弃加锁
  3. unlock(): 解锁

ReentrantLock 和 synchronized 的区别:

  1. synchronized 是一个关键字, 是 JVM 内部实现的(大概率是基于 C 实现);ReentrantLock 是标准库的一个类, 在 JVM 外实现的(基于 Java 实现)
  2. synchronized 使用时不需要手动释放锁;ReentrantLock 使用时需要手动释放. 使用起来更灵活,但是也容易遗漏 unlock
  3. synchronized 在申请锁失败时, 会死等;ReentrantLock 可以通过 trylock 的方式等待一段时间就放弃
  4. synchronized 是非公平锁;ReentrantLock 默认是非公平锁. 可以通过构造方法传入一个 true 开启公平锁模式
代码语言:javascript复制
// ReentrantLock 的构造方法
public ReentrantLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
}
  1. 更强大的唤醒机制:synchronized 是通过 Object 的 wait / notify 实现等待-唤醒,每次唤醒的是一个随机等待的线程;ReentrantLock 搭配 Condition 类实现等待-唤醒, 可以更精确控制唤醒某个指定的线程

如何选择使用:

  1. 锁竞争不激烈的时候, 使用 synchronized, 效率更高, 自动释放更方便
  2. 锁竞争激烈的时候, 使用 ReentrantLock, 搭配 trylock 更灵活控制加锁的行为, 而不是死等
  3. 如果需要使用公平锁, 使用 ReentrantLock;如果需要精准唤醒指定线程, 使用 ReentrantLock
原子类

原子类内部用的是 CAS 实现,所以性能要比加锁实现 i 高很多。

原子类有以下几个:

代码语言:javascript复制
AtomicBoolean
AtomicInteger
AtomicIntegerArray
AtomicLong
AtomicReference
AtomicStampedReference

AtomicInteger常见方法有:

代码语言:javascript复制
addAndGet(int delta); i  = delta;
decrementAndGet(); --i;
getAndDecrement(); i--;
incrementAndGet();   i;
getAndIncrement(); i  ;
线程池

虽然创建销毁线程比创建销毁进程更轻量, 但是在频繁创建销毁线程的时候还是会比较低效

线程池就是为了解决这个问题. 如果某个线程不再使用了, 并不是真正把线程释放, 而是放到一个 "池子"中, 下次如果需要用到线程就直接从池子中取, 不必通过系统来创建了

ExecutorService 和 Executors:

  1. ExecutorService 表示一个线程池实例
  2. Executors 是一个工厂类, 能够创建出几种不同风格的线程池
  3. ExecutorService 的 submit 方法能够向线程池中提交若干个任务
代码语言:javascript复制
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new Runnable() {
    @Override
    public void run() {
    	System.out.println("hello");
    }
});

Executors 创建线程池的几种方式:

  1. newFixedThreadPool: 创建固定线程数的线程池
  2. newCachedThreadPool: 创建线程数目动态增长的线程池
  3. newSingleThreadExecutor: 创建只包含单个线程的线程池
  4. newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer

Executors 本质上是 ThreadPoolExecutor 类的封装

ThreadPoolExecutor:

ThreadPoolExecutor 提供了更多的可选参数, 可以进一步细化线程池行为的设定

理解 ThreadPoolExecutor 构造方法的参数:

把创建一个线程池想象成开个公司. 每个员工相当于一个线程

  1. corePoolSize: 正式员工的数量. (正式员工, 一旦录用, 永不辞退)
  2. maximumPoolSize: 正式员工 临时工的数目. (临时工: 一段时间不干活, 就被辞退)
  3. keepAliveTime: 临时工允许的空闲时间
  4. unit: keepaliveTime 的时间单位, 是秒, 分钟, 还是其他值
  5. workQueue: 传递任务的阻塞队列
  6. threadFactory: 创建线程的工厂, 参与具体的创建线程工作
  7. RejectedExecutionHandler: 拒绝策略, 如果任务量超出公司的负荷了接下来怎么处理.
  8. AbortPolicy(): 超过负荷, 直接抛出异常
  9. CallerRunsPolicy(): 调用者负责处理
  10. DiscardOldestPolicy(): 丢弃队列中最老的任务
  11. DiscardPolicy(): 丢弃新来的任务
代码语言:javascript复制
ExecutorService pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
                    new SynchronousQueue<Runnable>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i  ) {
    pool.submit(new Runnable() {
        @Override
        void run() {
        	System.out.println("hello");
        }
    });
}
信号量 Semaphore

信号量, 用来表示 “可用资源的个数”. 本质上就是一个计数器

Semaphore 的 PV 操作中的加减计数器操作都是原子的, 可以在多线程环境下直接使用

代码语言:javascript复制
Semaphore semaphore = new Semaphore(4);
Runnable runnable = new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("申请资源");
            semaphore.acquire();
            System.out.println("我获取到资源了");
            Thread.sleep(1000);
            System.out.println("我释放资源了");
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
};
for (int i = 0; i < 20; i  ) {
    Thread t = new Thread(runnable);
    t.start();
}

创建 Semaphore 示例, 初始化为 4, 表示有 4 个可用资源

acquire 方法表示申请资源(P操作), release 方法表示释放资源(V操作)

创建 20 个线程, 每个线程都尝试申请资源, sleep 1秒之后, 释放资源. 观察程序的执行效果

CountDownLatch

同时等待 N 个任务执行结束

代码语言:javascript复制
public class Demo {
    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(10);
        Runnable r = new Runable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(Math.random() * 10000);
                    latch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10; i  ) {
        	new Thread(r).start();
        }
        // 必须等到 10 人全部回来
        latch.await();
        System.out.println("比赛结束");
    }
}

构造 CountDownLatch 实例, 初始化 10 表示有 10 个任务需要完成

每个任务执行完毕, 都调用 latch.countDown() . 在 CountDownLatch 内部的计数器同时自减

主线程中使用 latch.await(); 阻塞等待所有任务执行完毕. 相当于计数器为 0 了

线程安全的集合类

原来的集合类, 大部分都不是线程安全的:Vector, Stack, HashTable是线程安全的(不建议用), 其他的集合类不是线程安全的

多线程环境使用 ArrayList:

自己使用同步机制 (synchronized 或者 ReentrantLock) Collections.synchronizedList(new ArrayList):synchronizedList 是标准库提供的一个基于 synchronized 进行线程同步的 List,synchronizedList 的关键操作上都带有 synchronized

使用 CopyOnWriteArrayList:CopyOnWrite容器即写时复制的容器

当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。

所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

优点:在读多写少的场景下, 性能很高, 不需要加锁竞争

缺点:占用内存较多;新写的数据不能被第一时间读取到.

多线程环境使用队列:

ArrayBlockingQueue:基于数组实现的阻塞队列

LinkedBlockingQueue:基于链表实现的阻塞队列

PriorityBlockingQueue:基于堆实现的带优先级的阻塞队列

TransferQueue:最多只包含一个元素的阻塞队列

多线程环境使用哈希表:

HashMap 本身不是线程安全的

在多线程环境下使用哈希表可以使用:Hashtable;ConcurrentHashMap

Hashtable:只是简单的把关键方法(put,get)加上了 synchronized 关键字

相当于直接针对 Hashtable 对象本身加锁

如果多线程访问同一个 Hashtable 就会直接造成锁冲突,size 属性也是通过 synchronized 来控制同步, 也是比较慢的,一旦触发扩容, 就由该线程完成整个扩容过程. 这个过程会涉及到大量的元素拷贝, 效率会非常低

ConcurrentHashMap:

相比于 Hashtable 做出了一系列的改进和优化

读操作没有加锁(但是使用了 volatile 保证从内存读取结果), 只对写操作进行加锁. 加锁的方式仍然是是用 synchronized, 但是不是锁整个对象, 而是 “锁桶” (用每个链表的头结点作为锁对象), 大大降低了锁冲突的概率

充分利用 CAS 特性. 比如 size 属性通过 CAS 来更新. 避免出现重量级锁的情况.

优化了扩容方式: 化整为零

  1. 发现需要扩容的线程, 只需要创建一个新的数组, 同时只搬几个元素过去
  2. 扩容期间, 新老数组同时存在
  3. 后续每个来操作 ConcurrentHashMap 的线程, 都会参与搬家的过程. 每个操作负责搬运一小部分元素
  4. 搬完最后一个元素再把老数组删掉
  5. 这个期间, 插入只往新数组加
  6. 这个期间, 查找需要同时查新数组和老数组

0 人点赞