天天看點

并發處理之master-worker 模式

master-worker模式是一種将順序執行的任務轉為并發執行,順序執行的任務之間互相之間沒有關系

如圖:

并發處理之master-worker 模式

相關代碼實作簡易版:

1)master 實作

package com.lwd.worker_master;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 *  配置設定任務/合并結果集
 * @author liuwd
 */
public class Master {
    /**
     * 任務隊列
     */
    private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
    /**
     * 工作程序
     */
    private Map<String,Thread> threadMap = new HashMap<>(16);
    /**
     * 子任務處理結果集
     */
    private Map<String,Object> resultMap = new HashMap<>(16);

    public Master(Worker worker,int count){
        worker.setWorkerQueue(queue);
        worker.setResultMap(resultMap);
        for (int i = 0; i < count; i++) {
            threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
        }
    }


    /**
     * 是否子任務都結束了
     * @return
     */
    public boolean isComplte(){
        Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
        for (Map.Entry<String, Thread> entry : entries) {
            Thread thread = entry.getValue();
            if(thread.getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    /**
     * 送出任務
     * @param obj
     */
    public void submit(Object obj){
       queue.add(obj);
    }

    /**
     *  傳回結果集
     * @return
     */
    public Map<String,Object> getResultMap(){
        return resultMap;
    }

    /**
     * 執行任務 開啟程序
     */
    public void execute(){
        Set<Map.Entry<String, Thread>> entries = threadMap.entrySet();
        for (Map.Entry<String, Thread> entry : entries) {
            entry.getValue().start();
        }
    }

}      

2)worker實作

package com.lwd.worker_master;

import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 任務對象 用于處理相關任務
 * @author liuwd
 */
public class Worker implements Runnable{
    /**
     * 任務隊列
     */
    private ConcurrentLinkedQueue workerQueue;
    /**
     * 結果集
     */
    private Map<String,Object>  resultMap;


    public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {
        this.workerQueue = workerQueue;
    }


    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while (true){
            Object poll = workerQueue.poll();
            if(null == poll){
                break;
            }
            Object handle = handle(poll);
            resultMap.put(Integer.toString(handle.hashCode()),handle);
        }
    }

    /**
     * 處理任務
     * @param obj
     * @return
     */
    public Object handle(Object obj){
        return obj;
    }
}      

3)RealWork實作

package com.lwd.worker_master;

/**
 *  實際任務類
 * @author liuwd
 */
public class RealWorker extends Worker {
    @Override
    public Object handle(Object obj) {
        Integer i = (Integer)obj;
        return i*i;
    }
}      

4)WorkMasterMain.java 需求運作實作

package com.lwd.worker_master;

import java.util.Iterator;
import java.util.Map;


/**
 *  目前模式的使用主體類
 * @author liuwd
 */
public class WokerMasterMain {

    public static void main(String[] args) {
        Master master = new Master(new RealWorker(), 5);
        Integer integer = squaresSum(master, 100);
        System.out.println(integer);

    }

    /**
     * 1-100平方和
     */
    public static Integer squaresSum(Master master,int num){
        for (int i = 0; i <num ; i++) {
            master.submit(i);
        }
        master.execute();
        int result = 0;
        Map<String, Object> resultMap = master.getResultMap();
        while (resultMap.size()>0&&!master.isComplte()){
            Iterator<String> iterator = resultMap.keySet().iterator();
            while (iterator.hasNext()){
                String key = iterator.next();
                Object o = resultMap.get(key);
                if(null != o){
                    Integer i = (Integer)o;
                    result+=i;
                }
                iterator.remove();
            }

        }
        return result;
    }
}      
上一篇: Java命名規範
下一篇: HashMap