天天看点

CompletionService<V>使用示例

构造了一个有返回值的任务,用sleep(随机数)模拟工作时间:

import java.util.Random;
import java.util.concurrent.Callable;

public class WorkTask implements Callable<Integer> {
    private String name;
    public WorkTask(String name) {
        this.name = name;
    }

    @Override
    public Integer call() {
        int sleepTime = new Random().nextInt(1000);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回给调用者的值
        return sleepTime;
    }
}
           

两种方法获取线程池中任务的返回结果:

  1. 自己创建一个阻塞队列来保存Future< V >,获取时循环调用队列的take()方法。主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。
  2. 使用CompletionService< V >来替代BlockingQueue<Future< V >>,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;


public class CompletionCase {
    private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors()*10;

    // 方法一,自己写集合来实现获取线程池中任务的返回结果
    public void testByQueue() throws Exception {
    	long start = System.currentTimeMillis();
    	AtomicInteger count = new AtomicInteger(0);
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        //队列,拿任务的执行结果
        BlockingQueue<Future<Integer>> queue = 
        		new LinkedBlockingQueue<>();

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            Future<Integer> future = pool.submit(new WorkTask("ExecTask" + i));
            queue.add(future);
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
        	int sleptTime = queue.take().get();
        	count.addAndGet(sleptTime);
        }

        // 关闭线程池
        pool.shutdown();
        System.out.println("-------------tasks sleep time "+count.get()
        		+"ms,and spend time "
        		+(System.currentTimeMillis()-start)+" ms-------------");
    }
    //方法二,利用CompletionService<V>来获取线程池中任务的返回结果
    public void testByCompletion() throws Exception{
        long start = System.currentTimeMillis();
        AtomicInteger count = new AtomicInteger(0);
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);

        CompletionService<Integer> completionService
                = new ExecutorCompletionService<>(pool);

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            completionService.submit(new WorkTask("ExecTask" + i));
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            int sleptTime = completionService.take().get();
            count.addAndGet(sleptTime);
        }

        // 关闭线程池
        pool.shutdown();
        System.out.println("-------------tasks sleep time "+count.get()
                +"ms,and spend time "
                +(System.currentTimeMillis()-start)+" ms-------------");

    }

    public static void main(String[] args) throws Exception {
        CompletionCase t = new CompletionCase();
        t.testByQueue();
        t.testByCompletion();
    }
}

           

结果:

CompletionService&lt;V&gt;使用示例

参考:Mark—笔记_Java并发编程

继续阅读