天天看點

CompletionService<V>使用示例

構造了一個有傳回值的任務,用sleep(随機數)模拟工作時間:

import java.util.Random;
import java.util.concurrent.Callable;

public class WorkTask implements Callable<Integer> {
    private String name;
    public WorkTask(String name) {
        this.name = name;
    }

    @Override
    public Integer call() {
        int sleepTime = new Random().nextInt(1000);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 傳回給調用者的值
        return sleepTime;
    }
}
           

兩種方法擷取線程池中任務的傳回結果:

  1. 自己建立一個阻塞隊列來儲存Future< V >,擷取時循環調用隊列的take()方法。主線程并不能保證首先獲得的是最先完成任務的線程傳回值。它隻是按加入線程池的順序傳回。因為take方法是阻塞方法,後面的任務完成了,前面的任務卻沒有完成,主程式就那樣等待在那兒,隻到前面的完成了,它才知道原來後面的也完成了。
  2. 使用CompletionService< V >來替代BlockingQueue<Future< V >>,主線程總是能夠拿到最先完成的任務的傳回值,而不管它們加入線程池的順序。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;


public class CompletionCase {
    private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors()*10;

    // 方法一,自己寫集合來實作擷取線程池中任務的傳回結果
    public void testByQueue() throws Exception {
    	long start = System.currentTimeMillis();
    	AtomicInteger count = new AtomicInteger(0);
        // 建立線程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        //隊列,拿任務的執行結果
        BlockingQueue<Future<Integer>> queue = 
        		new LinkedBlockingQueue<>();

        // 向裡面扔任務
        for (int i = 0; i < TOTAL_TASK; i++) {
            Future<Integer> future = pool.submit(new WorkTask("ExecTask" + i));
            queue.add(future);
        }

        // 檢查線程池任務執行結果
        for (int i = 0; i < TOTAL_TASK; i++) {
        	int sleptTime = queue.take().get();
        	count.addAndGet(sleptTime);
        }

        // 關閉線程池
        pool.shutdown();
        System.out.println("-------------tasks sleep time "+count.get()
        		+"ms,and spend time "
        		+(System.currentTimeMillis()-start)+" ms-------------");
    }
    //方法二,利用CompletionService<V>來擷取線程池中任務的傳回結果
    public void testByCompletion() throws Exception{
        long start = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        // 建立線程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);

        CompletionService<Integer> completionService
                = new ExecutorCompletionService<>(pool);

        // 向裡面扔任務
        for (int i = 0; i < TOTAL_TASK; i++) {
            completionService.submit(new WorkTask("ExecTask" + i));
        }

        // 檢查線程池任務執行結果
        for (int i = 0; i < TOTAL_TASK; i++) {
            int sleptTime = completionService.take().get();
            count.addAndGet(sleptTime);
        }

        // 關閉線程池
        pool.shutdown();
        System.out.println("-------------tasks sleep time "+count.get()
                +"ms,and spend time "
                +(System.currentTimeMillis()-start)+" ms-------------");

    }

    public static void main(String[] args) throws Exception {
        CompletionCase t = new CompletionCase();
        t.testByQueue();
        t.testByCompletion();
    }
}

           

結果:

CompletionService&lt;V&gt;使用示例

參考:Mark—筆記_Java并發程式設計

繼續閱讀