Master-Worker模式是常用的并行计算模式 核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后 会将结果返回给Master 由Master做归纳和总结 其好处是能一个大任务分解成若干个小任务 并行执行 从而提高系统的吞吐量。
示例:
ConcurrentLinkedQueue 并发放入队列中 所以使用这个 ConcurrentHashMap 也是并发把结果放入HashMap中
Bug 任务类:
代码语言:javascript复制/**
* @author: xiepanpan
* @Date: 2019/11/20
* @Description: 任务
*/
public class Bug {
/**
* bug编号
*/
private int id;
/**
* 一个bug价值多少钱。。
*/
private int price ;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
Master 类 (项目经理)。。:
代码语言:javascript复制import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author: xiepanpan
* @Date: 2019/11/20
* @Description: 项目经理
*/
public class Master {
//1 有一个盛放任务的容器 (相当于禅道 jira)
private ConcurrentLinkedQueue<Bug> workQueue = new ConcurrentLinkedQueue<Bug>();
//2 需要有一个盛放worker的集合 (开发小组)
private HashMap<String, Thread> workers = new HashMap<String, Thread>();
//3 需要有一个盛放每一个worker执行任务的结果集合
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
//4 构造方法 (项目经理组织开发人员。。)
public Master(Worker worker , int workerCount){
worker.setWorkQueue(this.workQueue);
worker.setResultMap(this.resultMap);
for(int i = 0; i < workerCount; i ){
this.workers.put(Integer.toString(i), new Thread(worker));
}
}
//5 需要一个提交任务的方法 (测试提bug。。)
public void submit(Bug task){
this.workQueue.add(task);
}
//6 需要有一个执行的方法,启动所有的worker方法去执行任务 (程序员开始fix bug)
public void execute(){
for(Map.Entry<String, Thread> me : workers.entrySet()){
me.getValue().start();
}
}
//7 判断是否运行结束的方法 (项目经理问问bug改完了么。。)
public boolean isComplete() {
for(Map.Entry<String, Thread> me : workers.entrySet()){
//线程TERMINATED表示已停止 (程序员开始划水。。)
if(me.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
//8 计算结果方法 (项目经理做总结 去客户现场做汇报。。)
public int getResult() {
int priceResult = 0;
for(Map.Entry<String, Object> me : resultMap.entrySet()){
priceResult = (Integer)me.getValue();
}
return priceResult;
}
}
Worker (程序员):
代码语言:javascript复制import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author: xiepanpan
* @Date: 2019/11/20
* @Description: 码农
*/
public class Worker implements Runnable {
private ConcurrentLinkedQueue<Bug> workQueue;
private ConcurrentHashMap<String, Object> resultMap;
public void setWorkQueue(ConcurrentLinkedQueue<Bug> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}
@Override
public void run() {
while(true){
Bug input = this.workQueue.poll();
if(input == null) break;
Object output = handle(input);
this.resultMap.put(Integer.toString(input.getId()), output);
}
}
private Object handle(Bug input) {
Object output = null;
try {
//处理任务的耗时。。 比如说兼容IE678910 各种分辨率。。。
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}
Main 类:
代码语言:javascript复制import java.util.Random;
public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(), 20);
Random r = new Random();
//测试提了100个bug
for(int i = 1; i <= 100; i ){
Bug t = new Bug();
t.setId(i);
产生一个0到1000的随机数
t.setPrice(r.nextInt(1000));
master.submit(t);
}
master.execute();
long start = System.currentTimeMillis();
while(true){
if(master.isComplete()){
long end = System.currentTimeMillis() - start;
int priceResult = master.getResult();
System.out.println("最终结果:bug总价值" priceResult "RMB," ", 解决这些bug耗时:" String.format("%.3f", (double)end / 1000) "天");
break;
}
}
}
}
运行结果:
一共有100个bug 一个码农改掉一个bug耗时0.5天 一个码农解决掉所有的bug耗时50天 现在20个人并发执行 结果就是50/20=2.5天 (随便玩玩 其他因素不考虑 比如不能一次性fix bug。。)