天天看点

并发处理之master-worker 模式

master-worker模式是一种将顺序执行的任务转为并发执行,顺序执行的任务之间相互之间没有关系

如图:

并发处理之master-worker 模式

相关代码实现简易版:

1)master 实现

package com.lwd.worker_master;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 *  分配任务/合并结果集
 * @author liuwd
 */
public class Master {
    /**
     * 任务队列
     */
    private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
    /**
     * 工作进程
     */
    private Map<String,Thread> threadMap = new HashMap<>(16);
    /**
     * 子任务处理结果集
     */
    private Map<String,Object> resultMap = new HashMap<>(16);

    public Master(Worker worker,int count){
        worker.setWorkerQueue(queue);
        worker.setResultMap(resultMap);
        for (int i = 0; i < count; i++) {
            threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
        }
    }


    /**
     * 是否子任务都结束了
     * @return
     */
    public boolean isComplte(){
        Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
        for (Map.Entry<String, Thread> entry : entries) {
            Thread thread = entry.getValue();
            if(thread.getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    /**
     * 提交任务
     * @param obj
     */
    public void submit(Object obj){
       queue.add(obj);
    }

    /**
     *  返回结果集
     * @return
     */
    public Map<String,Object> getResultMap(){
        return resultMap;
    }

    /**
     * 执行任务 开启进程
     */
    public void execute(){
        Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
        for (Map.Entry<String, Thread> entry : entries) {
            entry.getValue().start();
        }
    }

}      

2)worker实现

package com.lwd.worker_master;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 任务对象 用于处理相关任务
 * @author liuwd
 */
public class Worker implements Runnable{
    /**
     * 任务队列
     */
    private ConcurrentLinkedQueue workerQueue;
    /**
     * 结果集
     */
    private Map<String,Object>  resultMap;


    public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {
        this.workerQueue = workerQueue;
    }


    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while (true){
            Object poll = workerQueue.poll();
            if(null == poll){
                break;
            }
            Object handle = handle(poll);
            resultMap.put(Integer.toString(handle.hashCode()),handle);
        }
    }

    /**
     * 处理任务
     * @param obj
     * @return
     */
    public Object handle(Object obj){
        return obj;
    }
}      

3)RealWork实现

package com.lwd.worker_master;

/**
 *  实际任务类
 * @author liuwd
 */
public class RealWorker extends Worker {
    @Override
    public Object handle(Object obj) {
        Integer i = (Integer)obj;
        return i*i;
    }
}      

4)WorkMasterMain.java 需求运行实现

package com.lwd.worker_master;

import java.util.Iterator;
import java.util.Map;


/**
 *  当前模式的使用主体类
 * @author liuwd
 */
public class WokerMasterMain {

    public static void main(String[] args) {
        Master master = new Master(new RealWorker(), 5);
        Integer integer = squaresSum(master, 100);
        System.out.println(integer);

    }

    /**
     * 1-100平方和
     */
    public static Integer squaresSum(Master master,int num){
        for (int i = 0; i <num ; i++) {
            master.submit(i);
        }
        master.execute();
        int result = 0;
        Map<String, Object> resultMap = master.getResultMap();
        while (resultMap.size()>0&&!master.isComplte()){
            Iterator<String> iterator = resultMap.keySet().iterator();
            while (iterator.hasNext()){
                String key = iterator.next();
                Object o = resultMap.get(key);
                if(null != o){
                    Integer i = (Integer)o;
                    result+=i;
                }
                iterator.remove();
            }

        }
        return result;
    }
}      
上一篇: Java命名规范
下一篇: HashMap