构造了一个有返回值的任务,用sleep(随机数)模拟工作时间:
import java.util.Random;
import java.util.concurrent.Callable;
public class WorkTask implements Callable<Integer> {
private String name;
public WorkTask(String name) {
this.name = name;
}
@Override
public Integer call() {
int sleepTime = new Random().nextInt(1000);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回给调用者的值
return sleepTime;
}
}
两种方法获取线程池中任务的返回结果:
- 自己创建一个阻塞队列来保存Future< V >,获取时循环调用队列的take()方法。主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。
- 使用CompletionService< V >来替代BlockingQueue<Future< V >>,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CompletionCase {
private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors()*10;
// 方法一,自己写集合来实现获取线程池中任务的返回结果
public void testByQueue() throws Exception {
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
//队列,拿任务的执行结果
BlockingQueue<Future<Integer>> queue =
new LinkedBlockingQueue<>();
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
Future<Integer> future = pool.submit(new WorkTask("ExecTask" + i));
queue.add(future);
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = queue.take().get();
count.addAndGet(sleptTime);
}
// 关闭线程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms-------------");
}
//方法二,利用CompletionService<V>来获取线程池中任务的返回结果
public void testByCompletion() throws Exception{
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
CompletionService<Integer> completionService
= new ExecutorCompletionService<>(pool);
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
completionService.submit(new WorkTask("ExecTask" + i));
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = completionService.take().get();
count.addAndGet(sleptTime);
}
// 关闭线程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms-------------");
}
public static void main(String[] args) throws Exception {
CompletionCase t = new CompletionCase();
t.testByQueue();
t.testByCompletion();
}
}
结果:
参考:Mark—笔记_Java并发编程