多线程之并发工具类

2019-10-23 17:51:07 浏览数 (3)

CountDownLatch

在开发过程中经常会碰到一个任务需要开启多个线程,然后将多个线程的执行结果汇总。比如说查询全量数据,考虑数据量的问题,我们基本上会做分页,这时候就需要多次循环调用。CountDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。 执行原理 CountDownLatch内部实现了AQS,初始化CountDownLatch的时候,会调用Sync的构造方法将count赋值给state变量。多个线程调用countDown的时候,是使用CAS递减state的值;调用await方法的线程会被放在AQS阻塞队列中,等待计数器为0时,唤醒该线程。 核心方法

代码语言:javascript复制
//构造方法,初始化计数器为count
 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //Sync实现了AQS
        this.sync = new Sync(count);
    }
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
//挂起timeout,超过这个时间继续执行
 public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
//将count -1
public void countDown() {
        sync.releaseShared(1);
    }
//获取当前count值
 public long getCount() {
        return sync.getCount();
    }

走个栗子

代码语言:javascript复制
@Slf4j
public class CountDownLatchTest implements Runnable {

    //假设 10000条数据,每页100条需要,100次请求
    static final CountDownLatch countDownLatch = new CountDownLatch(100);

    private Integer i;

    public CountDownLatchTest(Integer i) {
        this.i = i;
    }

    @Override
    public void run() {
        try {
            //请求服务查询数据
            log.info("查到数据:{}-{}", i * 100, (i   1) * 100);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) {
        //接口支持的最大并发 = corePoolSize, 总任务量 = maximumSize
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));

        for (int i = 0; i < 10; i  ) {
            threadPoolExecutor.submit(new CountDownLatchTest(i));
        }
        countDownLatch.await();
    }
}

CyclicBarrier

等待所有线程达到一个屏障时在执行. 原理 CyclicBarrier是由ReentrantLock可重入锁和Condition共同实现的。 核心方法

代码语言:javascript复制
    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;
    /* The command to run when tripped */
    private final Runnable barrierCommand;
    /** The current generation */
    private Generation generation = new Generation();
  
//定义一个CyclicBarrier,parties 代表拦截的线程数量
 public CyclicBarrier(int parties) {
        this(parties, null);
    }

//定义一个CyclicBarrier,parties 代表拦截的线程数量,由最后一个线程执行barrierAction
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

//调用
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

//返回当前在屏障处等待的设备
public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }

//屏障是否可用
public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
//返回参与屏障的数量
public int getParties() {
        return parties;
    }

跑个栗子

代码语言:javascript复制
@Slf4j
public class CyclicBarrierTest implements Runnable {
    private CyclicBarrier barrier;

    private String name;
    private Long time;

    public CyclicBarrierTest(CyclicBarrier barrier, String name, Long time) {
        this.barrier = barrier;
        this.name = name;
        this.time = time;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(time * 1000);
            System.out.println(name   "到了");
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

@Slf4j
public class MainThread implements Runnable {
    @Override
    public void run() {
        System.out.println("人到齐了,开饭.....");
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new MainThread());
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小花",2L));
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小红",1L));
        executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小明",5L));

    }
}

cyclicbarriertest.gif

Semaphore

Semaphore 信号量,控制并发时线程的数量。

跑个栗子

代码语言:javascript复制
//健身房有好几个跑步机
public class FitnessRoom {

    class TreadMill {
        private Integer num;

        public Integer getNum() {
            return num;
        }

        public void setNum(Integer num) {
            this.num = num;
        }

        public TreadMill(Integer num) {
            this.num = num;
        }
    }

    private TreadMill[] treadMills = new TreadMill[]{new TreadMill(1), new TreadMill(2), new TreadMill(3), new TreadMill(4)};
    private boolean[] use = new boolean[4];

    Semaphore semaphore = new Semaphore(4, true);

    //获取一个跑步机
    public TreadMill get() throws InterruptedException {
        semaphore.acquire(1);
        return getAvailable();
    }

    // 遍历找一个没人的跑步机
    public TreadMill getAvailable() {
        for (int i = 0; i < use.length; i  ) {
            if (!use[i]) {
                use[i] = true;
                return treadMills[i];
            }
        }
        return null;
    }

    /**
     * 释放跑步机
     * @param treadMill
     */
    public void release(TreadMill treadMill) {
        for (int i = 0; i < use.length; i  ) {
            if (treadMills[i] == treadMill) {
                if (use[i]) {
                    use[i] = false;
                }
            }

        }
    }
}

public class MainThread implements Runnable {

    private String name;
    private FitnessRoom fitnessRoom;

    public MainThread(String name, FitnessRoom fitnessRoom) {
        this.name = name;
        this.fitnessRoom = fitnessRoom;
    }

    @Override
    public void run() {
        try {
            FitnessRoom.TreadMill treadMill = fitnessRoom.get();
            if (null != treadMill) {
                System.out.println(name   "在"   treadMill.getNum()   "号跑步机上跑步");
                TimeUnit.SECONDS.sleep(2);
                System.out.println(name   "跑完了");
                //跑步机腾出来了
                fitnessRoom.release(treadMill);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        FitnessRoom fitnessRoom = new FitnessRoom();
        for (int i = 0; i < 20; i  ) {
            executorService.execute(new MainThread(i   "", fitnessRoom));
        }
    }
}

semaphore.gif

Exchanger

两个线程之间交互数据工具类

原子操作类

AtomicBoolean AtomicInteger AtomicLong AtomicIntegerArray AtomicLongArray AtomicReferenceArray AtomicReference AtomicReferenceFieldUpdater AtomicMarkableReference AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicStampedFieldUpdater AtomicReferenceFieldUpdater

0 人点赞