java CyclicBarrier的使用

2021-05-14 17:05:51 浏览数 (2)

之前介绍了java中latch的主要作用和使用方法。本文主要介绍CyclicBarrier的使用方法。

首先Barrier(栅栏)是用来做线程间控制的,它能够等待指定数目的线程都准备好后,再执行一些操作(当然也可以在这些线程前做操作,这取决于什么使用调用barrier的await方法)。

好,下面以一个例子说明。

有worker线程和workerleader线程,要求每3个worker线程执行后,workerleader线程要统一的对worker的执行之间进行统计。

上代码:

worker线程

代码语言:javascript复制
package com.xueyou.demo.concurrent.barrier;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

public class Worker implements Runnable {
    private CyclicBarrier barrier;
    private int workerId;
    private ConcurrentHashMap<Integer, Integer> concurrentHashMap;

    public Worker(CyclicBarrier barrier, int workerId, ConcurrentHashMap<Integer, Integer> concurrentHashMap) {
        this.barrier = barrier;
        this.workerId = workerId;
        this.concurrentHashMap = concurrentHashMap;
    }

    @Override
    public void run() {
        try {
            int sleepTime = new Random().nextInt(3000);
            System.out.println(workerId   " is working...");
            Thread.sleep(sleepTime);
            concurrentHashMap.put(workerId, sleepTime);
            barrier.await();
            System.out.println(workerId   "with time "   sleepTime   " finish");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

workerleader线程 

代码语言:javascript复制
package com.xueyou.demo.concurrent.barrier;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public class WorkerLeader implements Runnable {
    private ConcurrentHashMap<Integer, Integer> concurrentHashMap;
    private final List<Integer> workerIdList;

    public WorkerLeader(ConcurrentHashMap<Integer, Integer> concurrentHashMap, List<Integer> workerIdList) {
        this.concurrentHashMap = concurrentHashMap;
        this.workerIdList = workerIdList;
    }

    @Override
    public void run() {
        /**
         * workerLeader执行实在worker执行之后,因为CyclicBarrier构造函数的第二个参数代表着栅栏中的线程
         * 都到达后(这里的到达其实就是说最后一个线程调用barrier的await之后),才会执行的runnable。
         *
         */
        System.out.println("=====last batch is ok ......");
        for (Integer workerId : workerIdList) {
            System.out.println("workerId is:"   workerId   " ,executeTime is:"   concurrentHashMap.get(workerId));
        }
    }

}

 barrier例子程序

代码语言:javascript复制
package com.xueyou.demo.concurrent.barrier;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

public class BarrierDemo {

    public static final int Worker_Count = 9;

    public static void main(String[] args) {
        ConcurrentHashMap<Integer, Integer> concurrentHashMap = new ConcurrentHashMap<>();
        List<Integer> workIds = new ArrayList<>();
        for (int i = 0; i < Worker_Count; i  ) {
            workIds.add(i);
        }
        /**
         * 这里的barrier构造函数指定了两个内容一个是barrier的执行线程await的个数,另一个是当最后一个线程满足await后,要执行什么动作。
         * 具体可以参考CycliBarrier的构造函数中的说明。
         * 这里简单介绍一下,就是说,当最后一个worker,也就是第三个worker执行await的时候,那么就会触发WorkerLeader的动作。
         * 同时由于barrier是可多次的(相比于latch),所以可以有多个worker执行,但是WorkerLeader每次只取三个进行操作。
         * 这边使用了ConcurrentHashMap,作为多个线程间共享数据的方式,当然,也可以用future。
         */
        CyclicBarrier barrier = new CyclicBarrier(3, new WorkerLeader(concurrentHashMap, workIds));
        for (int i = 0; i < Worker_Count; i  ) {
            new Thread(new Worker(barrier, i, concurrentHashMap)).start();
        }
    }
}

执行结果:

代码语言:javascript复制
1 is working...
2 is working...
0 is working...
3 is working...
4 is working...
5 is working...
6 is working...
7 is working...
8 is working...
=====last batch is ok ......
workerId is:0 ,executeTime is:745
workerId is:1 ,executeTime is:3
workerId is:2 ,executeTime is:null
workerId is:3 ,executeTime is:null
workerId is:4 ,executeTime is:595
workerId is:5 ,executeTime is:null
workerId is:6 ,executeTime is:null
workerId is:7 ,executeTime is:null
workerId is:8 ,executeTime is:null
0with time 745 finish
1with time 3 finish
4with time 595 finish
=====last batch is ok ......
workerId is:0 ,executeTime is:745
workerId is:1 ,executeTime is:3
workerId is:2 ,executeTime is:null
workerId is:3 ,executeTime is:1512
workerId is:4 ,executeTime is:595
workerId is:5 ,executeTime is:2310
workerId is:6 ,executeTime is:null
workerId is:7 ,executeTime is:null
workerId is:8 ,executeTime is:862
5with time 2310 finish
8with time 862 finish
3with time 1512 finish
=====last batch is ok ......
workerId is:0 ,executeTime is:745
workerId is:1 ,executeTime is:3
workerId is:2 ,executeTime is:2589
workerId is:3 ,executeTime is:1512
workerId is:4 ,executeTime is:595
workerId is:5 ,executeTime is:2310
workerId is:6 ,executeTime is:2570
workerId is:7 ,executeTime is:2769
workerId is:8 ,executeTime is:862
7with time 2769 finish
6with time 2570 finish
2with time 2589 finish

Process finished with exit code 0

0 人点赞