構造了一個有傳回值的任務,用sleep(随機數)模拟工作時間:
import java.util.Random;
import java.util.concurrent.Callable;
public class WorkTask implements Callable<Integer> {
private String name;
public WorkTask(String name) {
this.name = name;
}
@Override
public Integer call() {
int sleepTime = new Random().nextInt(1000);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 傳回給調用者的值
return sleepTime;
}
}
兩種方法擷取線程池中任務的傳回結果:
- 自己建立一個阻塞隊列來儲存Future< V >,擷取時循環調用隊列的take()方法。主線程并不能保證首先獲得的是最先完成任務的線程傳回值。它隻是按加入線程池的順序傳回。因為take方法是阻塞方法,後面的任務完成了,前面的任務卻沒有完成,主程式就那樣等待在那兒,隻到前面的完成了,它才知道原來後面的也完成了。
- 使用CompletionService< V >來替代BlockingQueue<Future< V >>,主線程總是能夠拿到最先完成的任務的傳回值,而不管它們加入線程池的順序。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CompletionCase {
private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors()*10;
// 方法一,自己寫集合來實作擷取線程池中任務的傳回結果
public void testByQueue() throws Exception {
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 建立線程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
//隊列,拿任務的執行結果
BlockingQueue<Future<Integer>> queue =
new LinkedBlockingQueue<>();
// 向裡面扔任務
for (int i = 0; i < TOTAL_TASK; i++) {
Future<Integer> future = pool.submit(new WorkTask("ExecTask" + i));
queue.add(future);
}
// 檢查線程池任務執行結果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = queue.take().get();
count.addAndGet(sleptTime);
}
// 關閉線程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms-------------");
}
//方法二,利用CompletionService<V>來擷取線程池中任務的傳回結果
public void testByCompletion() throws Exception{
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 建立線程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
CompletionService<Integer> completionService
= new ExecutorCompletionService<>(pool);
// 向裡面扔任務
for (int i = 0; i < TOTAL_TASK; i++) {
completionService.submit(new WorkTask("ExecTask" + i));
}
// 檢查線程池任務執行結果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = completionService.take().get();
count.addAndGet(sleptTime);
}
// 關閉線程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms-------------");
}
public static void main(String[] args) throws Exception {
CompletionCase t = new CompletionCase();
t.testByQueue();
t.testByCompletion();
}
}
結果:
參考:Mark—筆記_Java并發程式設計