天天看點

Java并發和多線程2:3種方式實作數組求和

本篇示範3個數組求和的例子。

例子1:單線程

例子2:多線程,同步求和(如果沒有計算完成,會阻塞)

例子3:多線程,異步求和(先累加已經完成的計算結果)

例子1-代碼

package cn.fansunion.executorservice;


public class BasicCaculator {


	public static long sum(int[] numbers){
	    long sum = 0;
	    for(int i=0;i<numbers.length;i++){
	    	sum += numbers[i];
	    }
	    return sum;
	}
}
           

例子2-代碼

ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,傳回Future。如果Executor背景線程池還沒有完成Callable的計算,這調用傳回Future對象的get()方法,會阻塞直到計算完成。

package cn.fansunion.executorservice;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
//并發計算數組的和,“同步”求和
public class ConcurrentCalculator {


	private ExecutorService exec;
	//這個地方,純粹是“一廂情願”,“并行執行”不受咱們控制,取決于作業系統的“态度”
	private int cpuCoreNumber;
	private List<Future<Long>> tasks = new ArrayList<Future<Long>>();
	
	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;


		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}


		public Long call() throws Exception {
			Long sum = 0L;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}


	public ConcurrentCalculator() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
	}


	public Long sum(final int[] numbers) {
		// 根據CPU核心個數拆分任務,建立FutureTask并送出到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			FutureTask<Long> task = new FutureTask<Long>(subCalc);
			tasks.add(task);
			if (!exec.isShutdown()) {
				exec.submit(task);
			}
		}
		return getResult();
	}


	/**
	 * 疊代每個隻任務,獲得部分和,相加傳回
	 */
	public Long getResult() {
		Long result = 0l;
		for (Future<Long> task : tasks) {
			try {
				// 如果計算未完成則阻塞
				Long subSum = task.get();
				result += subSum;
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}


	public void close() {
		exec.shutdown();
	}
}
           

例子3-代碼

在剛在的例子中,getResult()方法的實作過程中,疊代了FutureTask的數組,如果任務還沒有完成則目前線程會阻塞。

如果我們希望任意字任務完成後就把其結果加到result中,而不用依次等待每個任務完成,可以使CompletionService。

生産者submit()執行的任務。使用者take()已完成的任務,并按照完成這些任務的順序處理它們的結果 。也就是調用CompletionService的take方法是,會傳回按完成順序放回任務的結果。

CompletionService内部維護了一個阻塞隊列BlockingQueue,如果沒有任務完成,take()方法也會阻塞。

修改剛才的例子2,使用CompletionService:

package cn.fansunion.executorservice;


import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


//并發計算數組的和,“異步”求和
public class ConcurrentCalculatorAsync {


	private ExecutorService exec;
	private CompletionService<Long> completionService;
	//這個地方,純粹是“一廂情願”,“并行執行”不受咱們控制,取決于作業系統的“态度”
	private int cpuCoreNumber;


	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;


		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}


		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}


	public ConcurrentCalculatorAsync() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
		completionService = new ExecutorCompletionService<Long>(exec);
	}


	public Long sum(final int[] numbers) {
		// 根據CPU核心個數拆分任務,建立FutureTask并送出到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length){
				end = numbers.length;
			}
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			if (!exec.isShutdown()) {
				completionService.submit(subCalc);
			}
			
		}
		return getResult();
	}


	/**
	 * 疊代每個隻任務,獲得部分和,相加傳回
	 */
	public Long getResult() {
		Long result = 0l;
		for (int i = 0; i < cpuCoreNumber; i++) {			
			try {
				Long subSum = completionService.take().get();
				result += subSum;			
				System.out.println("subSum="+subSum+",result="+result);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}


	public void close() {
		exec.shutdown();
	}
}
           

運作代碼

package cn.fansunion.executorservice;


import java.math.BigDecimal;
//數組求和3個Demo
public class ArraySumDemo {
	public static void main(String[] args) {
		int n = 200000000;
		int[] numbers = new int[n];
		for(int i=1;i<=n;i++){
			numbers[i-1]=i;
		}
		basic(numbers);
		
		long time = System.currentTimeMillis();
		concurrentCaculatorAsync(numbers);
		long endTime=System.currentTimeMillis();
		System.out.println("多核并行計算,異步相加:"+time(time,endTime));
		
		long time2 = System.currentTimeMillis();
		concurrentCaculator(numbers);
		long endTime2=System.currentTimeMillis();
		System.out.println("多核并行計算,同步相加:"+time(time2,endTime2));
	}


	private static void basic(int[] numbers) {
		long time1 = System.currentTimeMillis();
		long sum=BasicCaculator.sum(numbers);
		long endTime1 = System.currentTimeMillis();
		System.out.println("單線程:"+time(time1,endTime1));
		System.out.println("Sum:"+sum);
	}


	private static double time(long time, long endTime) {
		long costTime = endTime-time;
		BigDecimal bd = new BigDecimal(costTime);
		//本來想着,把毫秒轉換成秒的,最後發現計算太快了
		BigDecimal unit = new BigDecimal(1L);
		BigDecimal s= bd.divide(unit,3);
		return s.doubleValue();
	}


	//并行計算,“同步”求和
	private static void concurrentCaculator(int[] numbers) {
		ConcurrentCalculator calc = new ConcurrentCalculator();
		Long sum = calc.sum(numbers);
		System.out.println(sum);
		calc.close();
	}


	//并行計算,“異步”求和
	private static void concurrentCaculatorAsync(int[] numbers) {
		ConcurrentCalculatorAsync calc = new ConcurrentCalculatorAsync();
		Long sum = calc.sum(numbers);
		System.out.println("Sum:"+sum);
		calc.close();
	}
}
           

控制台輸出

單線程:93.0

Sum:20000000100000000

subSum=3750000175000002,result=3750000175000002

subSum=1250000075000001,result=5000000250000003

subSum=6250000275000003,result=11250000525000006

subSum=8749999574999994,result=20000000100000000

Sum:20000000100000000

多核并行計算,異步相加:786.0

20000000100000000

多核并行計算,同步相加:650.0

個人看法:3段代碼的時間僅供參考,沒有排除幹擾因素。

總的來說,單線程執行更快一些,應該是由于“數組求和”本身,并不需要其它額外資源,不會阻塞。

而多線程,反而增加了“線程排程”的時間開銷。

還可以看出,CPU計算還是非常快的。“200000000”2億個整數相加,用了不到0.1秒的時間。

插曲

最開始看代碼的時候,誤解了。以為“根據CPU核心個數拆分任務”,這個時候的“多線程”就是“并行”了。

實際上,不一定,除了要看CPU的核數,還要看作業系統的配置設定。

// 根據CPU核心個數拆分任務,建立FutureTask并送出到Executor

for (int i = 0; i < cpuCoreNumber; i++) {

}

最開始,我還在考慮“單線程”、“多核并行+多線程并發”、“單核+多線程并發”,等好幾種情況來實作“數組求和”。

最後,感覺自己還是想多了。“并行”應該不受自己控制,隻能控制是“單線程”或者“多線程”。

“java并發程式設計-Executor架構”這篇文章中的“例子:并行計算數組的和。” 這句話,誤導了我,根本不能保證是“并行計算”。

友情提示:網絡上的文章,僅供參考學習,需要自己的判斷。

關于Java-多核-并行-多線程,我初步認為“多線程可以并行執行,但不受我們自己的控制,取決于作業系統”。

網友的一些看法:

看法1:

   java線程可以在運作在多個cpu核上嗎?

   我是一直都以為這個問題的答案是肯定的,也就是說可以運作在多核上。

但是有一天見到這樣的一個理論,我就頓時毀三觀了。

JVM在作業系統中是作為一個程序的,java所有的線程都運作自這個JVM程序中,

是以說java線程某個時間隻可能運作在一個核上。

這個說法對我的打擊太大了,我不能接受。于是就開始多方求證。網上搜尋 和朋友一起讨論,

最終證明了java線程是可以運作在多核上的,為什麼呢?

下面一句話将驚醒夢中人:

現代os都将線程作為最小排程機關,程序作為資源配置設定的最小機關。 在windows中程序是不活動的,

隻是作為線程的容器。

也就是說,java中的所有線程确實在JVM程序中,但是CPU排程的是程序中的線程。

看法2:

   JAVA中的多線程能在多CPU機器上并行執行嗎?注意,我說的不是并發執行哦 。

   我們用java寫一個多線程程式,就啟動了一個JVM程序,是以這些線程都是在這一個JVM程序之中的,我不知道同一時刻,能不能有多個CPU運作同一程序,進而并行執行這同一程序中的不同線程?一直很疑惑

你的思路是對的,CPU就是為了迎合作業系統的多線程進而提高系統的計算效率.但是具體配置設定任務到各個核心中去執行的并非JAVA與JVM而是作業系統.

也就是說,你所執行的多線程,可能會被配置設定到同一個CPU核心中運作.也可能非配到不同的cpu中運作.如果可以控制CPU的配置設定,那也應該是作業系統的api才能實作的了。

我用JAVA建立了一個線程,這時候有主線程和子線程都在運作,那意思雙核CPU有可能在同一時刻點并行運作這兩個線程咯?

我翻了好多JAVA的有關多線程的章節,似乎都沒有說道多核CPU運作JAVA多線程,貌似都是已單核為例講解的,是以我一直覺得可能都是并發的而不是并行的?

不是,你要将你的軟體線程和計算機的CPU處理線程區分開呀.簡單說,你是無法控制CPU對于任務的配置設定的.

更多代碼示例:

http://git.oschina.net/fansunion/Concurrent(逐漸更新中)

參考資料:

java并發程式設計-Executor架構

http://www.iteye.com/topic/366591

java線程可以在運作在多個cpu核上嗎?

http://blog.csdn.net/maosijunzi/article/details/42527553

JAVA中的多線程能在多CPU上并行執行嗎?注意,我說的不是并發執行哦

http://zhidao.baidu.com/link?url=e11sEOSNFoLTfVyP-5FfpktIXEgbMQkbLAzvgh8mn4V16n_qQas89voj5gVhOEkho0jRA7fp_vbnElxKgeQCDrOxGkcu6xAWaUniqpcWg33

繼續閱讀