天天看点

Master-Worker模式的java代码模拟实现

Master-Worker模式是常用的并行模式。它的核心思想是系统由两类进程协作工作。Master进程和Worker进程。Master负
    责接收和分配任务。Worker负责处理master分配的子任务。当Worker子进程处理完成后,会将结果返回给Master,由
    Master做归纳总结。它的好处是将一个大任务分解成若干小任务,并行执行,从而提高了系统的吞吐量。
    以下为代码实现部分,各部分有注释。
           

Master的java代码实现

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
	//存放Task工作任务的容器
	private ConcurrentLinkedQueue<Task> tasks = new ConcurrentLinkedQueue<Task>();
	
	//存放work/工作线程的容器
	private HashMap<String, Thread> workers = new HashMap<>();
	
	//存放每个work执行结果的容器
	private ConcurrentHashMap<Integer, Object> resultMap = new ConcurrentHashMap<>();
	
	//num代表需要启动work线程数量
	public Master(Worker worker,Integer num){
		worker.setTasks(this.tasks);
		worker.setResultMap(this.resultMap);
		
		for(int i=0;i<num;i++){
			workers.put("第"+i+"个工作线程", new Thread(worker));
		}
	}
	
	//接收任务并提交任务
	public void submit(Task task){
		tasks.add(task);
	}
	
	//启动各个work线程,让work为“(master)”开始工作
	public void execute(){
		for(Map.Entry<String, Thread> me:workers.entrySet()){
			me.getValue().start();
		}
	}

	public boolean complete() {
		
		for(Map.Entry<String, Thread> me:workers.entrySet()){
			if(me.getValue().getState()!=Thread.State.TERMINATED){
				return false;
			}
		}
		
		return true;
	}

	public int getResult() {
		int result=0;
		for(Map.Entry<Integer, Object> me: resultMap.entrySet()){
			result+=(int)me.getValue();
		}
		return result;
	}
	
}


           

Worker的java代码实现

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {
	private ConcurrentLinkedQueue<Task> tasks;
	private ConcurrentHashMap<Integer, Object> resultMap;

	@Override
	public void run() {
		while(true){
			Task task = tasks.poll();
			if(task==null) break;
			Object handleResult = handle(task);
			resultMap.put(task.getId(), handleResult);
		}
	
	}

	private Object handle(Task task) {
		//假定此方法在处理数据时需要耗时0.5秒
		try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		Integer price = task.getPrice();
		return price;
	}

	public void setTasks(ConcurrentLinkedQueue<Task> tasks) {
		this.tasks = tasks;
		
	}
	
	
	public void setResultMap(ConcurrentHashMap<Integer, Object> resultMap) {
		this.resultMap = resultMap;
		
	}



}

           

Task为任务小单元

public class Task {
	
	private int id;
	private int price ;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public int getPrice() {
		return price;
	}
	public void setPrice(int price) {
		this.price = price;
	} 
	
}


           

//main方法测试‘

import java.util.Random;

public class Main {

	public static void main(String[] args) {
		//指定master需要几个线程为其工作
		Master master = new Master(new Worker(),10);
		
		//r为随机数,以方便在设置price参数时,使设置的数为变量,以便于模拟测试
		int r = new Random().nextInt(100);
		
		//有100个task任务需要处理
		for(int i=0;i<100;i++){
			Task task = new Task();
			task.setId(i);
			task.setPrice(r);
			master.submit(task);	
		}
		
		master.execute();
		//系统当前时间
		long start = System.currentTimeMillis();
		//此处必须使用循环,假如不用循环的话,
		while(true){
			if(master.complete()){
				int result = master.getResult();
				//系统耗时秒数
				System.out.println(System.currentTimeMillis()-start);
				
				System.out.println("最终结果"+result);
				break;
			}
		}
		
	}
}