之前介绍了java中latch的主要作用和使用方法。本文主要介绍CyclicBarrier的使用方法。
首先Barrier(栅栏)是用来做线程间控制的,它能够等待指定数目的线程都准备好后,再执行一些操作(当然也可以在这些线程前做操作,这取决于什么使用调用barrier的await方法)。
好,下面以一个例子说明。
有worker线程和workerleader线程,要求每3个worker线程执行后,workerleader线程要统一的对worker的执行之间进行统计。
上代码:
worker线程
代码语言:javascript复制package com.xueyou.demo.concurrent.barrier;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
public class Worker implements Runnable {
private CyclicBarrier barrier;
private int workerId;
private ConcurrentHashMap<Integer, Integer> concurrentHashMap;
public Worker(CyclicBarrier barrier, int workerId, ConcurrentHashMap<Integer, Integer> concurrentHashMap) {
this.barrier = barrier;
this.workerId = workerId;
this.concurrentHashMap = concurrentHashMap;
}
@Override
public void run() {
try {
int sleepTime = new Random().nextInt(3000);
System.out.println(workerId " is working...");
Thread.sleep(sleepTime);
concurrentHashMap.put(workerId, sleepTime);
barrier.await();
System.out.println(workerId "with time " sleepTime " finish");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
workerleader线程
代码语言:javascript复制package com.xueyou.demo.concurrent.barrier;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class WorkerLeader implements Runnable {
private ConcurrentHashMap<Integer, Integer> concurrentHashMap;
private final List<Integer> workerIdList;
public WorkerLeader(ConcurrentHashMap<Integer, Integer> concurrentHashMap, List<Integer> workerIdList) {
this.concurrentHashMap = concurrentHashMap;
this.workerIdList = workerIdList;
}
@Override
public void run() {
/**
* workerLeader执行实在worker执行之后,因为CyclicBarrier构造函数的第二个参数代表着栅栏中的线程
* 都到达后(这里的到达其实就是说最后一个线程调用barrier的await之后),才会执行的runnable。
*
*/
System.out.println("=====last batch is ok ......");
for (Integer workerId : workerIdList) {
System.out.println("workerId is:" workerId " ,executeTime is:" concurrentHashMap.get(workerId));
}
}
}
barrier例子程序
代码语言:javascript复制package com.xueyou.demo.concurrent.barrier;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
public class BarrierDemo {
public static final int Worker_Count = 9;
public static void main(String[] args) {
ConcurrentHashMap<Integer, Integer> concurrentHashMap = new ConcurrentHashMap<>();
List<Integer> workIds = new ArrayList<>();
for (int i = 0; i < Worker_Count; i ) {
workIds.add(i);
}
/**
* 这里的barrier构造函数指定了两个内容一个是barrier的执行线程await的个数,另一个是当最后一个线程满足await后,要执行什么动作。
* 具体可以参考CycliBarrier的构造函数中的说明。
* 这里简单介绍一下,就是说,当最后一个worker,也就是第三个worker执行await的时候,那么就会触发WorkerLeader的动作。
* 同时由于barrier是可多次的(相比于latch),所以可以有多个worker执行,但是WorkerLeader每次只取三个进行操作。
* 这边使用了ConcurrentHashMap,作为多个线程间共享数据的方式,当然,也可以用future。
*/
CyclicBarrier barrier = new CyclicBarrier(3, new WorkerLeader(concurrentHashMap, workIds));
for (int i = 0; i < Worker_Count; i ) {
new Thread(new Worker(barrier, i, concurrentHashMap)).start();
}
}
}
执行结果:
代码语言:javascript复制1 is working...
2 is working...
0 is working...
3 is working...
4 is working...
5 is working...
6 is working...
7 is working...
8 is working...
=====last batch is ok ......
workerId is:0 ,executeTime is:745
workerId is:1 ,executeTime is:3
workerId is:2 ,executeTime is:null
workerId is:3 ,executeTime is:null
workerId is:4 ,executeTime is:595
workerId is:5 ,executeTime is:null
workerId is:6 ,executeTime is:null
workerId is:7 ,executeTime is:null
workerId is:8 ,executeTime is:null
0with time 745 finish
1with time 3 finish
4with time 595 finish
=====last batch is ok ......
workerId is:0 ,executeTime is:745
workerId is:1 ,executeTime is:3
workerId is:2 ,executeTime is:null
workerId is:3 ,executeTime is:1512
workerId is:4 ,executeTime is:595
workerId is:5 ,executeTime is:2310
workerId is:6 ,executeTime is:null
workerId is:7 ,executeTime is:null
workerId is:8 ,executeTime is:862
5with time 2310 finish
8with time 862 finish
3with time 1512 finish
=====last batch is ok ......
workerId is:0 ,executeTime is:745
workerId is:1 ,executeTime is:3
workerId is:2 ,executeTime is:2589
workerId is:3 ,executeTime is:1512
workerId is:4 ,executeTime is:595
workerId is:5 ,executeTime is:2310
workerId is:6 ,executeTime is:2570
workerId is:7 ,executeTime is:2769
workerId is:8 ,executeTime is:862
7with time 2769 finish
6with time 2570 finish
2with time 2589 finish
Process finished with exit code 0