【并发编程】Master-Worker模式

2022-10-25 15:23:23 浏览数 (1)

Master-Worker模式是常用的并行计算模式 核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后 会将结果返回给Master 由Master做归纳和总结 其好处是能一个大任务分解成若干个小任务 并行执行 从而提高系统的吞吐量。

示例:

ConcurrentLinkedQueue 并发放入队列中 所以使用这个 ConcurrentHashMap 也是并发把结果放入HashMap中

Bug 任务类:

代码语言:javascript复制
/**
 *  @author: xiepanpan
 *  @Date: 2019/11/20 
 *  @Description:  任务
 */
public class Bug {

	/**
	 * bug编号
	 */
	private int id;
	/**
	 * 一个bug价值多少钱。。
	 */
	private int price ;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public int getPrice() {
		return price;
	}
	public void setPrice(int price) {
		this.price = price;
	} 
	
}

Master 类 (项目经理)。。:

代码语言:javascript复制
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 *  @author: xiepanpan
 *  @Date: 2019/11/20 
 *  @Description:  项目经理
 */
public class Master {

	//1 有一个盛放任务的容器  (相当于禅道 jira)
	private ConcurrentLinkedQueue<Bug> workQueue = new ConcurrentLinkedQueue<Bug>();
	
	//2 需要有一个盛放worker的集合 (开发小组)
	private HashMap<String, Thread> workers = new HashMap<String, Thread>();
	
	//3 需要有一个盛放每一个worker执行任务的结果集合
	private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
	
	//4 构造方法  (项目经理组织开发人员。。)
	public Master(Worker worker , int workerCount){
		worker.setWorkQueue(this.workQueue);
		worker.setResultMap(this.resultMap);
		
		for(int i = 0; i < workerCount; i   ){
			this.workers.put(Integer.toString(i), new Thread(worker));
		}
		
	}
	
	//5 需要一个提交任务的方法 (测试提bug。。)
	public void submit(Bug task){
		this.workQueue.add(task);
	}
	
	//6 需要有一个执行的方法,启动所有的worker方法去执行任务 (程序员开始fix bug)
	public void execute(){
		for(Map.Entry<String, Thread> me : workers.entrySet()){
			me.getValue().start();
		}
	}

	//7 判断是否运行结束的方法  (项目经理问问bug改完了么。。)
	public boolean isComplete() {
		for(Map.Entry<String, Thread> me : workers.entrySet()){
			//线程TERMINATED表示已停止  (程序员开始划水。。)
			if(me.getValue().getState() != Thread.State.TERMINATED){
				return false;
			}
		}		
		return true;
	}

	//8 计算结果方法  (项目经理做总结 去客户现场做汇报。。)
	public int getResult() {
		int priceResult = 0;
		for(Map.Entry<String, Object> me : resultMap.entrySet()){
			priceResult  = (Integer)me.getValue();
		}
		return priceResult;
	}
	
	
}

Worker (程序员):

代码语言:javascript复制
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 *  @author: xiepanpan
 *  @Date: 2019/11/20 
 *  @Description: 码农
 */
public class Worker implements Runnable {

	private ConcurrentLinkedQueue<Bug> workQueue;
	private ConcurrentHashMap<String, Object> resultMap;
	
	public void setWorkQueue(ConcurrentLinkedQueue<Bug> workQueue) {
		this.workQueue = workQueue;
	}

	public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
		this.resultMap = resultMap;
	}
	
	@Override
	public void run() {
		while(true){
			Bug input = this.workQueue.poll();
			if(input == null) break;
			Object output = handle(input);
			this.resultMap.put(Integer.toString(input.getId()), output);
		}
	}

	private Object handle(Bug input) {
		Object output = null;
		try {
			//处理任务的耗时。。 比如说兼容IE678910 各种分辨率。。。
			Thread.sleep(500);
			output = input.getPrice();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return output;
	}



}

Main 类:

代码语言:javascript复制
import java.util.Random;

public class Main {

	public static void main(String[] args) {
		
		Master master = new Master(new Worker(), 20);
		
		Random r = new Random();
		//测试提了100个bug
		for(int i = 1; i <= 100; i  ){
			Bug t = new Bug();
			t.setId(i);
			产生一个0到1000的随机数
			t.setPrice(r.nextInt(1000));
			master.submit(t);
		}
		master.execute();
		long start = System.currentTimeMillis();
		
		while(true){
			if(master.isComplete()){
				long end = System.currentTimeMillis() - start;
				int priceResult = master.getResult();
				System.out.println("最终结果:bug总价值"   priceResult "RMB,"   ", 解决这些bug耗时:"   String.format("%.3f", (double)end / 1000) "天");
				break;
			}
		}
		
	}
}

运行结果:

一共有100个bug 一个码农改掉一个bug耗时0.5天 一个码农解决掉所有的bug耗时50天 现在20个人并发执行 结果就是50/20=2.5天 (随便玩玩 其他因素不考虑 比如不能一次性fix bug。。)

0 人点赞