聲明:本文是《 java 7 concurrency cookbook 》的第四章,作者: javier fernández gonzález 譯者:許巧輝 校對:方騰飛,葉磊
執行者分離任務的啟動和結果的處理
通常,當你使用執行者執行并發任務時,你将會送出 runnable或callable任務給這個執行者,并擷取future對象控制這個方法。你可以發現這種情況,你需要送出任務給執行者在一個對象中,而處理結果在另一個對象中。基于這種情況,java并發api提供completionservice類。
completionservice 類有一個方法來送出任務給執行者和另一個方法來擷取已完成執行的下個任務的future對象。在内部實作中,它使用executor對象執行任務。這種行為的優點是共享一個completionservice對象,并送出任務給執行者,這樣其他(對象)可以處理結果。其局限性是,第二個對象隻能擷取那些已經完成它們的執行的任務的future對象,是以,這些future對象隻能擷取任務的結果。
在這個指南中,你将學習如何使用completionservice類把執行者啟動任務和處理它們的結果分開。
準備工作…
這個指南的例子使用eclipse ide實作。如果你使用eclipse或其他ide,如netbeans,打開它并建立一個新的java項目。
如何做…
按以下步驟來實作的這個例子:
1.建立reportgenerator類,并指定其實作callable接口,參數化為string類型。
1
public class reportgenerator implements callable<string> {
2.聲明兩個私有的、string類型的屬性,sender和title,用來表示報告的資料。
private string sender;
2
private string title;
3.實作這個類的構造器,初始化這兩個屬性。
public reportgenerator(string sender, string title){
this.sender=sender;
3
this.title=title;
4
}
4.實作call()方法。首先,讓線程睡眠一段随機時間。
@override
public string call() throws exception {
try {
long duration=(long)(math.random()*10);
5
system.out.printf("%s_%s: reportgenerator: generating a report during %d seconds\n",this.sender,this.title,duration);
6
timeunit.seconds.sleep(duration);
7
} catch (interruptedexception e) {
8
e.printstacktrace();
9
5.然後,生成一個有sender和title屬性的字元串的報告,傳回這個字元串。
string ret=sender+": "+title;
return ret;
6.建立reportrequest類,實作runnable接口。這個類将模拟一些報告請求。
public class reportrequest implements runnable {
7.聲明私有的、string類型的屬性name,用來存儲reportrequest的名稱。
private string name;
8.聲明私有的、completionservice類型的屬性service。completionservice接口是個參數化接口,使用string類型參數化它。
private completionservice<string> service;
9.實作這個類的構造器,初始化這兩個屬性。
public reportrequest(string name, completionservice<string> service){
this.name=name;
this.service=service;
10.實作run()方法。建立1個reportgenerator對象,并使用submit()方法把它送出給completionservice對象。
public void run() {
reportgenerator reportgenerator=new reportgenerator(name,"report");
service.submit(reportgenerator);
11.建立reportprocessor類。這個類将擷取reportgenerator任務的結果,指定它實作runnable接口。
public class reportprocessor implements runnable {
12.聲明一個私有的、completionservice類型的屬性service。由于completionservice接口是個參數化接口,使用string類作為這個completionservice接口的參數。
13.聲明一個私有的、boolean類型的屬性end。
private boolean end;
14.實作這個類的構造器,初始化這兩個屬性。
public reportprocessor (completionservice<string> service){
end=false;
15.實作run()方法。當屬性end值為false,調用completionservice接口的poll()方法,擷取completionservice執行的下個已完成任務的future對象。
while (!end){
future<string> result=service.poll(20, timeunit.seconds);
16.然後,使用future對象的get()方法擷取任務的結果,并且将這些結果寫入到控制台。
if (result!=null) {
string report=result.get();
system.out.printf("reportreceiver: report received:%s\n",report);
} catch (interruptedexception | executionexception e) {
system.out.printf("reportsender: end\n");
10
17.實作setend()方法,用來修改屬性end的值。
public void setend(boolean end) {
this.end = end;
18.實作這個示例的主類,通過建立main類,并實作main()方法。
public class main {
public static void main(string[] args) {
19.使用executors類的newcachedthreadpool()方法建立threadpoolexecutor。
executorservice executor=(executorservice)executors.newcachedthreadpool();
20.建立completionservice,使用前面建立的執行者作為構造器的參數。
completionservice<string> service=new executorcompletionservice<>(executor);
21.建立兩個reportrequest對象,并用線程執行它們。
reportrequest facerequest=new reportrequest("face", service);
reportrequest onlinerequest=new reportrequest("online";,service);
thread facethread=new thread(facerequest);
thread onlinethread=new thread(onlinerequest);
22.建立一個reportprocessor對象,并用線程執行它。
reportprocessor processor=new reportprocessor(service);
thread senderthread=new thread(processor);
23.啟動這3個線程。
system.out.printf("main: starting the threads\n");
facethread.start();
onlinethread.start();
senderthread.start();
24.等待reportrequest線程的結束。
system.out.printf("main: waiting for the report
generators.\n");
facethread.join();
onlinethread.join();
25.使用shutdown()方法關閉執行者,使用awaittermination()方法等待任務的結果。
system.out.printf("main: shutting down the executor.\n");
executor.shutdown();
executor.awaittermination(1, timeunit.days);
26.設定reportsender對象的end屬性值為true,結束它的執行。
processor.setend(true);
system.out.println("main: ends");
這是如何工作的…
在示例的主類中,你使用executors類的newcachedthreadpool()方法建立threadpoolexecutor。然後,使用這個對象初始化一個completionservice對象,因為completionservice需要使用一個執行者來執行任務。利用completionservice執行一個任務,你需要使用submit()方法,如在reportrequest類中。
當其中一個任務被執行,completionservice完成這個任務的執行時,這個completionservice在一個隊列中存儲future對象來控制它的執行。poll()方法用來檢視這個列隊,如果有任何任務執行完成,那麼傳回列隊的第一個元素,它是一個已完成任務的future對象。當poll()方法傳回一個future對象時,它将這個future對象從隊列中删除。這種情況下,你可以傳兩個屬性給那個方法,表明你想要等任務結果的時間,以防隊列中的已完成任務的結果是空的。
一旦completionservice對象被建立,你建立2個reportrequest對象,用來執行3個reportgenerator任務,每個都在completionservice中,和一個reportsender任務,它将會處理已送出給2個reportrequest對象的任務所産生的結果。
不止這些…
completionservice類可以執行callable和runnable任務。在這個示例中,你已經使用callable,但你同樣可以送出runnable對象。由于runnable對象不會産生結果,completionservice類的理念不适用于這些情況。
這個類同樣提供其他兩個方法,用來擷取已完成任務的future對象。這兩個方法如下:
poll():不帶參數版本的poll()方法,檢查是否有任何future對象在隊列中。如果列隊是空的,它立即傳回null。否則,它傳回第一個元素,并從列隊中删除它。
take():這個方法,不帶參數。檢查是否有任何future對象在隊列中。如果隊列是空的,它阻塞線程直到隊列有一個元素。當隊列有元素,它傳回第一進制素,并從列隊中删除它。
參見
在第4章,線程執行者中的執行者執行傳回結果的任務指南