天天看點

非主流并發工具之 CompletionService 非主流并發工具之 CompletionService

非主流并發工具之 CompletionService

CompletionService

接口的執行個體可以充當生産者和消費者的中間處理引擎,進而達到将送出任務和處理結果的代碼進行解耦的目的。生産者調用

submit

方法送出任務,而消費者調用

poll

(非阻塞)或

take

(阻塞)方法擷取下一個結果:這一特征看起來和阻塞隊列(

BlockingQueue

)類似,兩者的差別在于

CompletionService

要負責任務的處理,而阻塞隊列則不會。

在 JDK 中,該接口隻有一個實作類

ExecutorCompletionService

,該類使用建立時提供的

Executor

對象(通常是線程池)來執行任務,然後将結果放入一個阻塞隊列中:果然本就是一家親啊!

ExecutorCompletionService

将線程池和阻塞隊列糅合在一起,僅僅通過三個方法,就實作了任務的異步處理,可謂并發程式設計初學者的神兵利器!

接下來看一個例子。樓主有一大堆 *.java 檔案,需要計算它們的代碼總行數。利用

ExecutorCompletionService

可以寫出很簡單的多線程處理代碼:

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35

public

int

countLines(List<Path> javaFiles)

throws

Exception {

// 根據處理器數量建立線程池。雖然多線程并不保證能夠提升性能,但适量地

// 開線程一般可以從系統騙取更多資源。

ExecutorService es = Executors.newFixedThreadPool(

Runtime.getRuntime().availableProcessors() *

2

);

// 使用 ExecutorCompletionService 内建的阻塞隊列。

CompletionService cs =

new

ExecutorCompletionService(es);

// 按檔案向 CompletionService 送出任務。

for

(

final

Path javaFile : javaFiles) {

cs.submit(

new

Callable<Integer>() {

@Override

public

Integer call()

throws

Exception {

// 略去計算單個檔案行數的代碼。

return

countLines(javaFile);

}

});

}

try

{

int

loc =

;

int

size = javaFiles.size();

for

(

int

i =

; i < size; i++) {

// take 方法等待下一個結果并傳回 Future 對象。不直接傳回計算結果是為了

// 捕獲計算時可能抛出的異常。

// poll 不等待,有結果就傳回一個 Future 對象,否則傳回 null。

loc += cs.take().get();

}

return

loc;

}

finally

{

// 關閉線程池。也可以将線程池提升為字段以便重用。

// 如果任務線程(Callable#call)能響應中斷,用 shutdownNow 更好。

es.shutdown();

}

}

最後,

CompletionService

也不是到處都能用,它不适合處理任務數量有限但個數不可知的場景。例如,要統計某個檔案夾中的檔案個數,在周遊子檔案夾的時候也會“遞歸地”送出新的任務,但最後到底送出了多少,以及在什麼時候送出完了所有任務,都是未知數,無論

CompletionService

還是線程池都無法進行判斷。這種情況隻能直接用線程池來處理。