Java的juc并发编程包

2023-03-03 14:19:21 浏览数 (1)

Juc并发编程包

一、介绍

关于Java如何创建线程,大家都可以马上能想到有两种方法,无非不就是继承 Thread类和实现 Runnable接口嘛,顶多再加上个实现 Callable接口。而且 synchronized解决并发问题,如果学艺不精,锁住的对象是哪个都不知道,实在是不友好。

所以,我们在企业开发中基本不这样使用线程。在线程的启动上,我们常使用线程池。对于线程池的使用,可以看我另一篇博客,讲到了线程池的使用。

本文将讲解,线程池所在的包 java.util.concurrent,在这个包下,还有什么值得关注的类和方法。

附上java8在线文档,边看边学

二、线程安全集合

在使用的集合中,ArrayList或者是 HashMap都是平常我们接触比较多的。但很遗憾,这两个集合类,他们在多线程的情况下,并不是安全的。如果需要使用线程安全的集合,将要有特殊的方法和类。

我们先来演示一下,在多线程情况下,此类集合发生的问题。

1)不安全集合示例

代码语言:javascript复制
package com.banmoon.collection;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.TreeSet;

/**
 * 线程不安全的集合
 */
public class Demo1 {

    public static void main(String[] args) {
        testArrayList();
//        testHashMap();
//        testSet();
    }

    public static void testArrayList(){
        ArrayList<String> list = new ArrayList<>();
        for (int i = 1; i <= 10; i  ) {
            new Thread(() -> {
                list.add(Thread.currentThread().getName());
                System.out.println(list);
            }, "线程"   i).start();
        }
    }

    public static void testHashMap(){
        HashMap<Integer, String> map = new HashMap<>();
        for (int i = 0; i < 10; i  ) {
            int temp = i;
            new Thread(() -> {
                map.put(temp, Thread.currentThread().getName());
                System.out.println(map);
            }, "线程"   i).start();
        }
    }

    public static void testSet(){
        TreeSet<String> set = new TreeSet<>();
        for (int i = 1; i <= 10; i  ) {
            new Thread(() -> {
                set.add(Thread.currentThread().getName());
                System.out.println(set);
            }, "线程"   i).start();
        }
    }

}

分别执行 testArrayListtestHashMaptestSet,执行结果如下

执行结果有时候会出现 java.util.ConcurrentModificationException异常,字面意思就是并发修改异常,也就说明了原本喜欢用的 ArrayListHashMap是线程不安全的。

为何线程不安全,主要还是关键字 synchronized,可以查看上述集合的添加方法,并没有添加这个关键字。所以,我们在多线程的时候,要避免使用以上这些不安全的集合类。

2)Collections工具类之sync方法

上述讲的集合类都是线程不安全的,但是有办法使他们转换成线程安全的集合类。只需要 Collections工具类使用对应的方法进行转换即可。

方法的实现就是将集合作为参数构造出了另一个线程安全的集合类。转换的方法还是比较多的,简单讲前三个就好。

代码语言:javascript复制
package com.banmoon.collection;

import java.util.*;

/**
 * 线程安全集合
 */
public class Demo2 {

    public static void main(String[] args) {
        testArrayList();
//        testHashMap();
//        testSet();
    }

    public static void testArrayList(){
        List<String> list = Collections.synchronizedList(new ArrayList<>());
        for (int i = 1; i <= 10; i  ) {
            new Thread(() -> {
                list.add(Thread.currentThread().getName());
                System.out.println(list);
            }, "线程"   i).start();
        }
    }

    public static void testHashMap(){
        Map<Integer, String> map = Collections.synchronizedMap(new HashMap<>());
        for (int i = 0; i < 10; i  ) {
            int temp = i;
            new Thread(() -> {
                map.put(temp, Thread.currentThread().getName());
                System.out.println(map);
            }, "线程"   i).start();
        }
    }

    public static void testSet(){
        Set<String> set = Collections.synchronizedSet(new TreeSet<>());
        for (int i = 1; i <= 10; i  ) {
            new Thread(() -> {
                set.add(Thread.currentThread().getName());
                System.out.println(set);
            }, "线程"   i).start();
        }
    }

}

就不贴执行结果了,自己可以试试,保证不出并发修改异常

3)juc下的安全集合

不对啊,上面的集合都在 java.util下面,怎么没有juc什么事呢?别急,要来了,juc包下的线程安全集合

代码语言:javascript复制
package com.banmoon.collection;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * juc并发包下的集合
 */
public class Demo3 {

    public static void main(String[] args) {
        testArrayList();
//        testHashMap();
//        testSet();
    }

    public static void testArrayList(){
        CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
        for (int i = 1; i <= 10; i  ) {
            new Thread(() -> {
                list.add(Thread.currentThread().getName());
                System.out.println(list);
            }, "线程"   i).start();
        }
    }

    public static void testHashMap(){
        ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
        for (int i = 0; i < 10; i  ) {
            int temp = i;
            new Thread(() -> {
                map.put(temp, Thread.currentThread().getName());
                System.out.println(map);
            }, "线程"   i).start();
        }
    }

    public static void testSet(){
        CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
        for (int i = 1; i <= 10; i  ) {
            new Thread(() -> {
                set.add(Thread.currentThread().getName());
                System.out.println(set);
            }, "线程"   i).start();
        }
    }

}

4)Vector类

关于有人提到一个叫 Vector的类,也是线程安全的。的确是这样,线程是安全的,我们查看他的 add方法

代码语言:javascript复制
public class Vector<E>
    extends AbstractList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable
{

	public synchronized boolean add(E e) {
        modCount  ;
        ensureCapacityHelper(elementCount   1);
        elementData[elementCount  ] = e;
        return true;
    }
}

比较简单,主要使用了 synchronized关键字,来保证了线程安全。

没什么问题,但更推荐使用新的 lock锁写的 CopyOnWriteArrayList等类。

至于什么是 lock,它的优势在哪里,可以继续查看下一章

三、Lock锁

由文档可知,Lock是个接口。使用得从它的几个实现类中入手

1)简单使用

使用 ReentrantLock类,来完成线程安全的取票操作

代码语言:javascript复制
package com.banmoon.collection;

import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock
 * 1、创建公共Lock锁
 * 2、使用lock方法进行上锁
 * 3、在finally代码块中释放锁
 */
public class Demo4 {

    public static void main(String[] args) {
        TicketServer ticketServer = new TicketServer();
        new Thread(ticketServer, "A").start();
        new Thread(ticketServer, "B").start();
        new Thread(ticketServer, "C").start();
    }

}

class TicketServer implements Runnable{

    private int ticketNum = 10;

    // 创建锁
    ReentrantLock lock = new ReentrantLock();

    @Override
    public void run() {
        String name = Thread.currentThread().getName();

        while (true){
            // 上锁
            lock.lock();
            try {
                if(ticketNum<=0)
                    return;
                System.out.println(name   ":取到了第"   ticketNum   "张票");
                // 模拟网络延迟
                Thread.sleep(200);
                ticketNum--;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 最后始终都要释放锁
                lock.unlock();
            }
        }
    }

}

执行结果,与使用 synchronized关键字无异

lock锁的基本操作

  • 创建公共Lock锁,注意所有的线程访问到的 lock都是同一个
  • 使用lock方法进行上锁
  • 在finally代码块中释放锁,必须要手动释放

2)生产者消费者模式

在使用 synchronized关键词时,线程之间使用 wait方法进行阻塞释放锁,以及使用 notify方法进行唤醒阻塞线程。

而在 Lock锁中,需要创建锁的状态监视器,也就是 Condition。在 Condition中,也有相对应的方法,他们则是 await方法和 signal方法。

使用 Condition类来实现生产者消费者模式

代码语言:javascript复制
package com.banmoon.collection;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 简单的生产者消费者模式
 */
public class Demo5 {

    public static void main(String[] args) {
        MyDemo5 myDemo4 = new MyDemo5();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.increment();
        }, "线程A").start();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.decrement();
        }, "线程B").start();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.increment();
        }, "线程C").start();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.decrement();
        }, "线程D").start();
    }

}

class MyDemo5{

    private int number = 0;

    private ReentrantLock lock = new ReentrantLock();

    /**
     * 通过lock创建出监视状态对象
     */
    Condition condition = lock.newCondition();

    public void increment(){
        lock.lock();
        try {
            while (number==1)
                condition.await();
            number  ;
            System.out.println(Thread.currentThread().getName()   ":"   number);
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement(){
        lock.lock();
        try {
            while (number==0)
                condition.await();
            number--;
            System.out.println(Thread.currentThread().getName()   ":"   number);
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

执行结果,和 synchronized关键字无异

3)精准唤醒

Condition类和传统的 wait方法、notify方法不同,它可以实现精准的唤醒。

比如上面的生产消费模式,让A线程生产完成后,让B线程进行消费;B线程消费完成后,让C进行生产。

这一点在 Objectnotify方法是做不到的,notify方法唤醒的虽然只有一条线程,但这是cpu进行调度的,人为并不可控制。

代码语言:javascript复制
package com.banmoon.collection;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用多个Condition实现精准唤醒
 */
public class Demo6 {

    public static void main(String[] args) {
        MyDemo6 myDemo4 = new MyDemo6();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.increment();
        }, "线程A").start();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.decrement();
        }, "线程B").start();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.increment();
        }, "线程C").start();
        new Thread(() -> {
            for(int i = 0; i < 10; i  ) myDemo4.decrement();
        }, "线程D").start();
    }
}

class MyDemo6 {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();
    private Condition condition4 = lock.newCondition();

    public void increment(){
        lock.lock();
        try {
            while (number==1)
                await();
            number  ;
            System.out.println(Thread.currentThread().getName()   ":"   number);
            signal();
        }  finally {
            lock.unlock();
        }
    }

    public void decrement(){
        lock.lock();
        try {
            while (number==0)
                await();
            number--;
            System.out.println(Thread.currentThread().getName()   ":"   number);
            signal();
        }  finally {
            lock.unlock();
        }
    }

    /**
     * 判断当前线程,并触发等待
     */
    public void await(){
        try {
            String name = Thread.currentThread().getName();
            switch (name){
                case "线程A": condition1.await(); break;
                case "线程B": condition2.await(); break;
                case "线程C": condition3.await(); break;
                case "线程D": condition4.await(); break;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 判断当前线程,并触发唤醒
     */
    public void signal(){
        String name = Thread.currentThread().getName();
        switch (name){
            case "线程A": condition2.signal(); break;
            case "线程B": condition3.signal(); break;
            case "线程C": condition4.signal(); break;
            case "线程D": condition1.signal(); break;
        }
    }

}

执行结果,发现了确实按照了我们的顺序来进行唤醒

根据锁创建出监视状态对象,用来判断线程的等待和唤醒

简单来说,就是用 Condition来做一个标记,我们需要做的就是,判断当前的条件,是否使用 Condition进行等待或者唤醒。

上述代码会出现BUG,但我不说,哈哈哈哈哈。 大家可以把 17行的线程B23行的线程D位置互换,你会发现问题的。 可以试着解决一下这个BUG,可以更好理解 Condition的精准唤醒。

4)读写锁

上述使用的都是 ReentrantLock类,这次讲讲另外的两个。处于 ReentrantReadWriteLock类下的两个静态内部类,ReadLockWriteLock

为什么会有读写锁呢,它解决了同步带来的效率低下问题。

在变量的使用上,我们无非就是读和写,只有多线程写入才会造成线程安全的问题,而多线程读永远不会修改变量值,也就不会造成线程的安全问题了。

正因为如此,读写锁出现了,它可以限制只能单线程写入,但可以允许多线程读取变量,由此来保证效率的最大化使用

代码语言:javascript复制
package com.banmoon.collection;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 读写锁
 */
public class Demo7 {

    public static void main(String[] args) {
        MyDemo7 demo7 = new MyDemo7();
        for (int i = 0; i < 10; i  ) {
            new Thread(() -> {
                demo7.add(Thread.currentThread().getName());
            }, "写线程"   i).start();
            new Thread(() -> {
                demo7.toString(true);
            }, "读线程"   i).start();
        }

    }

}

class MyDemo7{

    private List<String> list = new ArrayList<>();

    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//    private ReentrantLock lock = new ReentrantLock();

    public void add(String str){
        lock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()   ":正在添加");
            list.add(str);
            System.out.println(Thread.currentThread().getName()   ":添加成功");
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void toString(boolean b){
        lock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()   ":正在读取");
            System.out.println(Thread.currentThread().getName()   ":"   list);
            System.out.println(Thread.currentThread().getName()   ":读取成功");
        } finally {
            lock.readLock().unlock();
        }
    }

}

执行结果,主要查看与 ReentrantLock类的区别

可以看到的是,写入线程不会被抢锁,而读线程能被其他读线程插入。

四、辅助类

在juc并发包中,还有一些辅助工具类,让我们可以更好的使用多线程。

1)CountDownLatch

代码语言:javascript复制
package com.banmoon.utils;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(5);
        for (int i = 1; i <= 5; i  ) {
            new Thread(() -> {
                try {
                    System.out.println(StrUtil.format("{}:{}", Thread.currentThread().getName(), DateUtil.now()));
                    TimeUnit.SECONDS.sleep(2);
                    count.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程" i).start();
        }

        // 等待上面几个线程完成,才能进行下面的逻辑
        count.await();
        System.out.println("主线程:"   DateUtil.now());
    }

}

CountDownLatch对线程进行减法计数,计算还有多少线程没有完成任务。

直到指定的线程数,到达指定位置后,才进行下一步的操作。

2)CyclicBarrier

如果上述 CountDownLatch是通过减法计算来达到屏障,那么 CyclicBarrier就是通过加法计算来达到屏障。

代码语言:javascript复制
package com.banmoon.utils;

import cn.hutool.core.util.StrUtil;

import java.util.concurrent.*;

public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("======= 当前阶段已完成 =======");
        });
        new Thread(new Demo(cyclicBarrier), "线程A").start();
        new Thread(new Demo(cyclicBarrier), "线程B").start();
        new Thread(new Demo(cyclicBarrier), "线程C").start();
    }
  
}

class Demo implements Runnable{

    private CyclicBarrier cyclicBarrier;

    public Demo(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            String threadName = Thread.currentThread().getName();
            System.out.println(StrUtil.format("{}:准备就绪", threadName));
            TimeUnit.SECONDS.sleep(3);
            // 等待所有线程都就绪
            cyclicBarrier.await();

            System.out.println(StrUtil.format("{}:上台发言", threadName));
            TimeUnit.SECONDS.sleep(3);
            // 等待所有线程都上台发言
            cyclicBarrier.await();

            System.out.println(StrUtil.format("{}:散会回家", threadName));
            TimeUnit.SECONDS.sleep(1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

CountDownLatch类似,当线程全部达到一个共同屏障时,从而再向下进行执行。

但和其不一样的是,CyclicBarrier可以执行多次,设立多点屏障。出现问题导致的计数异常后,也可以重新进行计数

3)Semaphore

Semaphore通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

比如去餐厅吃饭,没有座位需要进行等待,直到别人吃完有座位后,才能进入。

代码语言:javascript复制
package com.banmoon.utils;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i  ) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()   ":排队完成,抓紧吃饭");
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()   ":吃完了离开");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }, "线程"   i).start();
        }

    }

}

适用于有限的资源使用,对同一个资源最多允许几个线程可以访问它。直到访问结束后才释放,让其他线程进入可以访问。

synchornized包住一段代码块比较类似,但 Semaphore允许多个线程,而前者只有一个。

五、阻塞队列

在Java线程池的讲解中,我初步的讲了阻塞队列的功能。但在此,我还是得详细讲讲,什么是阻塞队列。

阻塞队列,顾名思义就是会阻塞的队列。而队列的基本操作就只有两个,存和取。所以阻塞就此产生,有些存会发生阻塞,有些取会发生阻塞。

下面就一起来看看juc包中的阻塞队列吧

1)ArrayBlockingQueue

Array结构,没问题吧,基于数组结构实现的队列。既然叫阻塞队列,那就必然会有阻塞,有阻塞会有锁吧。简单看看源码

代码语言:javascript复制
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
  
    // 内容存在这的
    final Object[] items;
  
    // 锁,保护所有的访问
    final ReentrantLock lock;
  
    // 关于取操作的 Condition
    private final Condition notEmpty;
  
    // 关于存操作的 Condition
    private final Condition notFull;
  
    // 构造方法,传入一个容量。还有两个重载,具体自己看源码吧
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
  
    // 取数
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 上锁
        lock.lockInterruptibly();
        try {
            // 判断个数是否为0,是则阻塞进行等待
            while (count == 0)
                notEmpty.await();
            // 返回取数结果
            return dequeue();
        } finally {
            // 解锁
            lock.unlock();
        }
    }
  
  
    // 存数
	public void put(E e) throws InterruptedException {
        // 检查元素是否为空,为空则会抛出空指针异常
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 上锁
        lock.lockInterruptibly();
        try {
            // 如果个数已经和队列容量相等了,则阻塞进行等待
            while (count == items.length)
                notFull.await();
            // 存数
            enqueue(e);
        } finally {
            // 解锁
            lock.unlock();
        }
    }
  
}

ArrayBlockingQueue有一把 Lock锁以及它的两个 Condition监听器,分别来对队列为空或满的时候,进行阻塞操作。

上面源码只是一小部分,dequeueenqueue方法就交给你们去看啦,不要怕,挺简单的。

测试一下,取和存之间照成的阻塞

代码语言:javascript复制
package com.banmoon.queue;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        putTest();
        System.out.println("========= 分割线 =========");
        takeTest();
    }

    public static void putTest() throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
//        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3);

        new Thread(() -> {
            try {
                // 3秒后再取出
                TimeUnit.SECONDS.sleep(3);
                String ele = queue.take();
                System.out.println(ele   "已取出:"   DateUtil.formatTime(new Date()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        queue.put("1");
        System.out.println(StrUtil.format("{}已插入,{}", 1, DateUtil.formatTime(new Date())));
        queue.put("2");
        System.out.println(StrUtil.format("{}已插入,{}", 2, DateUtil.formatTime(new Date())));
        queue.put("3");
        System.out.println(StrUtil.format("{}已插入,{}", 3, DateUtil.formatTime(new Date())));
        queue.put("4");
        System.out.println(StrUtil.format("{}已插入,{}", 4, DateUtil.formatTime(new Date())));
    }

    private static void takeTest() throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
//        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3);

        // 其他线程插入数据
        for (int i = 1; i <= 3; i  ) {
            String str = StrUtil.toString(i);
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    queue.put(str);
                    System.out.println(StrUtil.format("{}已插入,{}", str, DateUtil.formatTime(new Date())));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 直接取数
        System.out.println(queue.take()   "已取出:"   DateUtil.formatTime(new Date()));
        System.out.println(queue.take()   "已取出:"   DateUtil.formatTime(new Date()));
        System.out.println(queue.take()   "已取出:"   DateUtil.formatTime(new Date()));


    }

}

执行结果,不要问我为什么分割线下面取出插入搞反了,问就是打印慢了。在插入的结束的马上,就马上唤醒被阻塞的取的线程。

就这样会出现取出在前的打印问题。

2)LinkedBlockingQueue

在简单查看了 ArrayBlockingQueue的源码,它只有一把 Lock锁,

LinkedBlockingQueue不同,它有两把锁。简单看看源码吧

代码语言:javascript复制
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // 好的出现了,静态内部类,链表形式的指针引向。一个节点对象存一个元素,同时指向下一个节点对象
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
  
    // 容量
    private final int capacity;
  
    // 当前的元素个数
    private final AtomicInteger count = new AtomicInteger();
  
    // 头部节点
    transient Node<E> head;
  
    // 最后的节点
    private transient Node<E> last;
  
    // take、poll、etc等方法,取操作持有的锁
    private final ReentrantLock takeLock = new ReentrantLock();

	// 不为空的状态Condition
    private final Condition notEmpty = takeLock.newCondition();

    // put、offer、etc等方法,存操作持有的锁
    private final ReentrantLock putLock = new ReentrantLock();

    // 容量未满的状态Condition
    private final Condition notFull = putLock.newCondition();
  
}

对应的取存操作方法都差不多,这边的测试代码就不贴了。和上面的 ArrayBlockingQueue差不多,把注释掉的代码打开换成 LinkedBlockingQueue就好。

3)存取的4组API

在上述的阻塞队列中,只举例了一组存取的方法。不过除了这一组,还有其他存取操作的API。

这边以 ArrayBlockingQueue为例,总结进行测试一下。

功能说明

队列空或者满的时候会抛出异常

add(E e)

remove()

满了还去存则直接返回false空了还去取就会返回null

offer(E e)

poll()

满了还去存就会一直阻塞,直到被唤醒空了还去取就会一直阻塞,直到被唤醒

put(E e)

take()

满了还去存就会阻塞一段时间,超过后就返回false空了还去取就会阻塞一段时间,超过时间后就会返回null

offer(E e, long timeout, TimeUnit unit)

poll(long timeout, TimeUnit unit)

简单测试下这四组API,可以根据自己业务需求来选择对应的API

代码语言:javascript复制
package com.banmoon.queue;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class FourApiTest {

    public static void main(String[] args) throws InterruptedException {
//        test1();
//        test2();
//        test3();
        test4();
    }

    /**
     * 会抛出异常
     */
    private static void test1() {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        System.out.println(queue.add("A"));
        System.out.println(queue.add("B"));
//        System.out.println(queue.add("C"));// 队列已满,将报错

        System.out.println("======== 分割线 ========");

        System.out.println(queue.remove());
        System.out.println(queue.remove());
//        System.out.println(queue.remove());// 队列已空,将报错
    }

    /**
     * 满了还去存则直接返回false,空了还去取就会返回null
     */
    private static void test2() {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        System.out.println(queue.offer("A"));
        System.out.println(queue.offer("B"));
        System.out.println(queue.offer("C"));// 队列已满,存失败就直接返回false

        System.out.println("======== 分割线 ========");

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());// 队列已空,直接返回null
    }

    /**
     * 满了还去存就会一直阻塞,直到被唤醒,空了还去取就会一直阻塞,直到被唤醒
     * @throws InterruptedException
     */
    private static void test3() throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        queue.put("A");
        System.out.println("A已插入");
        queue.put("B");
        System.out.println("B已插入");
//        queue.put("C");// 队列已满,到此将会阻塞
//        System.out.println("C已插入");

        System.out.println("======== 分割线 ========");

        System.out.println(queue.take());
        System.out.println(queue.take());
//        System.out.println(queue.take());// 队列已空,将会持续阻塞
    }

    /**
     * 满了还去存就会阻塞一段时间,超过后就返回false
     * 空了还去取就会阻塞一段时间,超过时间后就会返回null
     */
    private static void test4() throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        String template = "{}:{}";
        System.out.println(StrUtil.format(template, queue.offer("A", 2, TimeUnit.SECONDS), DateUtil.now()));
        System.out.println(StrUtil.format(template, queue.offer("B", 2, TimeUnit.SECONDS), DateUtil.now()));
        System.out.println(StrUtil.format(template, queue.offer("C", 2, TimeUnit.SECONDS), DateUtil.now()));// 队列已满,等待2秒后返回false

        System.out.println("======== 分割线 ========");

        System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now()));
        System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now()));
        System.out.println(StrUtil.format(template, queue.poll(2, TimeUnit.SECONDS), DateUtil.now()));// 队列已空,等待2秒后返回null
    }

}

test1执行结果,会抛出异常

test2执行结果,满了还去存则直接返回false,空了还去取就会返回null

test3执行结果,可以看到程序一直都没有关闭。存的在等待位置,取的在等元素

test4执行结果,注意看时间,不会死等

4)SynchronousQueue

这是一个比较特殊的阻塞队列,存取互相阻塞。生产者存入一个元素就马上阻塞,必须被另一个线程消费者取出这个元素,生产者才解锁。

简单测试一下

代码语言:javascript复制
package com.banmoon.queue;

import cn.hutool.core.util.StrUtil;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueTest {

    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                String name = Thread.currentThread().getName();
                queue.put("A");
                System.out.println(name   "插入A成功");
                TimeUnit.SECONDS.sleep(1);
                queue.put("B");
                System.out.println(name   "插入B成功");
                TimeUnit.SECONDS.sleep(1);
                queue.put("C");
                System.out.println(name   "插入C成功");
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程A").start();

        new Thread(() -> {
            try {
                String name = Thread.currentThread().getName();
                String template = "{}取出了{}";
                System.out.println(StrUtil.format(template, name, queue.take()));
                TimeUnit.SECONDS.sleep(1);
                System.out.println(StrUtil.format(template, name, queue.take()));
                TimeUnit.SECONDS.sleep(1);
                System.out.println(StrUtil.format(template, name, queue.take()));
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程B").start();

    }

}

执行结果,存取交替进行,没问题

现在我们把线程B的代码注释掉,让其只有生产的线程。再次查看结果

好的,被阻塞了。也就是在15行存入后,线程A直接阻塞了,要不然怎么连16行的插入成功信息都没有打印呢?

所以,我们需要进入 SynchronousQueue的源码,简单查看下

代码语言:javascript复制
public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
  
    // 首先出现了就是一个抽象静态内部类,还有它的两个子类
	abstract static class Transferer<E> {
        abstract E transfer(E e, boolean timed, long nanos);
    }
  
    // Transferer的子类,也是一个静态内部类。栈结构,后进先出
    static final class TransferStack<E> extends Transferer<E> {}
  
    // Transferer的子类,也是一个静态内部类。队列结构,先进先出
    static final class TransferQueue<E> extends Transferer<E> {}
  
    // 转让器
    private transient volatile Transferer<E> transferer;
  
    // 无参构造器
	public SynchronousQueue() {
        this(false);
    }

    // 构造器,传入是否公平,如果公平,则创建队列结构的转让器,否则创建栈结构的转让器
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
  
	// 存方法
    public void put(E e) throws InterruptedException {
        // 判断是否为空
        if (e == null) throw new NullPointerException();
		// 直接通过转让器来存
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
  
    // 取方法
    public E take() throws InterruptedException {
        // 又是通过转让器来取?还是同样的方法?
        E e = transferer.transfer(null, false, 0);
        // 如果取到的元素为null,直接抛出打断异常
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
  
}

看到这,我有点蒙,看来还是逃避不过要查看 TransferStack或者 TransferQueue这两个内部类啊。后续开个专章来讲述源码好了。简单说下它的流程,transferer方法

  1. 根据实参是否为null来判断出是哪种操作,并为其打上标记,这边分为 DATAREQUEST
    1. DATA说明是存,REQUEST说明是取
  2. 判断队列是否为空,为空就将此操作线程作为节点阻塞住
  3. 判断队列是否为空,不为空就判断队尾的节点模式和此操作的模式是否匹配
    1. 匹配的意思是指,一个 DATA操作就要匹配一个 REQUEST
  4. 如果模式相等,没有匹配上,就将此操作线程作为节点阻塞住
  5. 如果模式不相等,可以匹配上,则将 DATA操作的元素交给 REQUEST,然后进行消除。

TransferStackTransferQueue的区别在与 判断队尾节点的模式是否相等,TransferQueue是将头部的节点进行匹配消除,而 TransferStack全部都是队尾的节点。这也体现的队列的栈的区别,一个是先进先出,一个是后进先出。 源码内部还使用到了CAS自旋锁,计划出一章关于锁类型的文章。

5)LinkedTransferQueue

如果理解了前面的 SynchronousQueue的话,那么 LinkedTransferQueue就很好理解了。

简单来说,SynchronousQueue中的 TransferQueue直接维护在 LinkedTransferQueue里面,少了一层抽象内部类。

如果 SynchronousQueue存取操作都会阻塞,只有配对上才会唤醒。那么 LinkedTransferQueue做了一定的简化和增强,其中一项就是可以自己决定是否阻塞存取操作。

一起来测试一下吧

代码语言:javascript复制
package com.banmoon.queue;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.Date;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

public class LinkedTransferQueueTest {

    public static void main(String[] args) throws InterruptedException {
        test1();
//        test2();
//        test3();
    }

    /**
     * 效果与SynchronousQueue基本一致
     * transfer插入时,没有匹配的取操作则会阻塞
     * @throws InterruptedException
     */
    private static void test1() throws InterruptedException {
        LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

        new Thread(() -> {
            try {
                for (int i = 0; i < 3; i  ) {
                    String str = queue.take();
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(StrUtil.format("取到了{},{}", str, DateUtil.formatTime(new Date())));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        queue.transfer("A");
        System.out.println("插入了A");
        queue.transfer("B");
        System.out.println("插入了B");
        queue.transfer("C");
        System.out.println("插入了C");
    }

    /**
     * 存元素时
     * 如果有取操作阻塞的话,则进行匹配,返回true
     * 没有取操作的话,不阻塞,直接返回false
     */
    private static void test2() throws InterruptedException {
        LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

        for (int i = 0; i < 3; i  ) {
            new Thread(() -> {
                try {
                    System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date())));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        // 等待三个取的操作都执行完
        TimeUnit.SECONDS.sleep(2);

        System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A"), DateUtil.formatTime(new Date())));
        System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B"), DateUtil.formatTime(new Date())));
        System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C"), DateUtil.formatTime(new Date())));
    }

    /**
     * 存操作时,没有取操作匹配,将会等待一段时间再进行返回
     * @throws InterruptedException
     */
    private static void test3() throws InterruptedException {
        LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

        new Thread(() -> {
            for (int i = 0; i < 3; i  ) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(StrUtil.format("取到了{},{}", queue.take(), DateUtil.formatTime(new Date())));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        System.out.println(StrUtil.format("插入A:{},{}", queue.tryTransfer("A", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date())));
        System.out.println(StrUtil.format("插入B:{},{}", queue.tryTransfer("B", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date())));
        System.out.println(StrUtil.format("插入C:{},{}", queue.tryTransfer("C", 3, TimeUnit.SECONDS), DateUtil.formatTime(new Date())));
    }


}

执行 test1transfer方法插入的操作会阻塞,直到有取的操作进入相匹配,这效果和 SynchronousQueue一样

执行 test2。简单来说,存元素时,如果已有取操作阻塞了,将返回true。否则,直接返回false

执行 test3,存操作时,没有取操作匹配,将会等待一段时间再进行返回

5)PriorityBlockingQueue

PriorityBlockingQueue优先级阻塞队列,存元素时不会阻塞,取元素时为空则阻塞。和其他阻塞队列相比,他的数组维护了一个二叉堆,对元素进行了优先级的排序。

测试下列代码,自定义实现比较器,这边就按照数字大小排序

代码语言:javascript复制
package com.banmoon.queue;

import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(3, new MyComparator());
        queue.put(1);
        queue.put(3);
        queue.put(2);

        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());

    }

}

class MyComparator implements Comparator<Integer> {

    @Override
    public int compare(Integer o1, Integer o2) {
        if(o1==o2)
            return 0;
        return o1>o2? -1: 1;
    }
}

执行结果

6)LinkedBlockingDeque

这个和 LinkedBlockingQueue好像,我可以称其为 LinkedBlockingQueue的升级版。

LinkedBlockingDeque被称为双端队列,是因为 LinkedBlockingDeque可以往链表两头插入元素。他的存储结构是链表,同 LinkedBlockingQueue一致,又因为存取可以在双端进行,所以不能像 LinkedBlockingQueue一样给两把锁,这里保持了和 ArrayBlockingQueue一样,仅有一把锁保持线程安全。

代码语言:javascript复制
package com.banmoon.queue;

import java.util.concurrent.LinkedBlockingDeque;

public class LinkedBlockingDequeTest {

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(4);
        deque.put("A");
        deque.put("B");
        deque.put("C");
        deque.putFirst("D");

        System.out.println(deque.take());
        System.out.println(deque.takeLast());
    }

}

执行结果,此队列中还有其他的api,都可以指定在队首或者队尾存元素,取元素同理

六、最后

在以前,并没有熟悉去使用过并发包的东西,在这次整理后,我对并发包有一定的了解。尤其是阻塞队列这一块,在看源码的时候还是挺有意思的。

当然本文没有列出所有使用 Lock锁的api方法,提供上jkd8在线文档,学习要对着文档。

关于本文出现的代码示例,已提交至码云,只看文章不懂时,一定要敲代码进行理解。

我是半月,祝你幸福!!!

0 人点赞