天天看點

多線程 ForkJoinPool介紹案列一:通過多線程分多個小任務進行列印資料  無傳回值的案列二:通過多線程分多個小任務進行資料累加  傳回結果集提供一些API 的方法,可以參考下總結

背景:ForkJoinPool的優勢在于,可以充分利用多cpu,多核cpu的優勢,把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上并行執行;當多個“小任務”執行完成之後,再将這些執行結果合并起來即可。這種思想值得學習。

介紹

Java7 提供了ForkJoinPool來支援将一個任務拆分成多個“小任務”并行計算,再把多個“小任務”的結果合并成總的計算結果。

ForkJoinPool是ExecutorService的實作類,是以是一種特殊的線程池。

使用方法:建立了ForkJoinPool執行個體之後,就可以調用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法來執行指定任務了。

其中ForkJoinTask代表一個可以并行、合并的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask代表有傳回值的任務,而RecusiveAction代表沒有傳回值的任務。

下面的UML類圖顯示了ForkJoinPool、ForkJoinTask之間的關系:

多線程 ForkJoinPool介紹案列一:通過多線程分多個小任務進行列印資料  無傳回值的案列二:通過多線程分多個小任務進行資料累加  傳回結果集提供一些API 的方法,可以參考下總結
案列一:通過多線程分多個小任務進行列印資料  無傳回值的

/**
 * 
 */
package forkJoinPool;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

/**
 * @author liuchaojun
 * @date 2018-9-10 下午02:57:29
 * 
 * RecursiveAction  無傳回值的
 * 
 */
public class PrintTask extends RecursiveAction {

	private static final long serialVersionUID = 1L;
	private static final int INDEX = 50;
	private int start;
	private int end;

	/**
	 * 
	 */
	public PrintTask(int start, int end) {
		// TODO Auto-generated constructor stub
		this.start = start;
		this.end = end;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.util.concurrent.RecursiveAction#compute()
	 */
	@Override
	protected void compute() {
		if (end - start < INDEX) {
			for (int i = start; i < end; i++) {
				System.out.println(Thread.currentThread().getName() + "----"
						+ i);
			}
		} else {
			int middle = (end + start) / 2;
			PrintTask taskLeft = new PrintTask(start, middle);
			PrintTask taskRight = new PrintTask(middle, end);
			//taskLeft.invoke();執行給定的任務,在完成後傳回其結果。
			//并行執行兩個“小任務”
/*			taskLeft.fork();
			taskRight.fork();*/
			invokeAll(taskLeft, taskRight);//執行給定的任務
		}
	}

	public static void main(String[] args) throws InterruptedException {
		PrintTask task = new PrintTask(0, 300);
		ForkJoinPool pool = new ForkJoinPool();
		pool.submit(task);
		pool.awaitTermination(2, TimeUnit.SECONDS);//阻塞2秒
		pool.shutdown();
	}
}
           

運作結果

多線程 ForkJoinPool介紹案列一:通過多線程分多個小任務進行列印資料  無傳回值的案列二:通過多線程分多個小任務進行資料累加  傳回結果集提供一些API 的方法,可以參考下總結
多線程 ForkJoinPool介紹案列一:通過多線程分多個小任務進行列印資料  無傳回值的案列二:通過多線程分多個小任務進行資料累加  傳回結果集提供一些API 的方法,可以參考下總結

本機電腦cpu4核,通過上面觀察線程名稱,可以看出4個cpu都在進行

案列二:通過多線程分多個小任務進行資料累加  傳回結果集

package forkJoinPool;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

/**
 * @author liuchaojun
 * @date 2018-9-10 下午03:50:24
 */
public class PrintTask2 {
	public static void main(String[] args) throws Exception {
		int[] arr = new int[200];
		Random r = new Random();
		int tempSum = 0;// 普通總數
		for (int i = 0; i < arr.length; i++) {
			tempSum += (arr[i] = r.nextInt(10));
		}
		System.out.println("普通總數結果為:" + tempSum);

		ForkJoinPool pool = new ForkJoinPool();
		MyTask task = new MyTask(0, arr.length, arr);
		Future<Integer> sum = pool.submit(task);
		System.out.println("多線程的執行結果:" + sum.get());// get 如果需要,等待計算完成,然後檢索其結果。
		pool.awaitTermination(2, TimeUnit.SECONDS);
		pool.shutdown(); // 關閉線程池
	}
}

class MyTask extends RecursiveTask<Integer> {
	private static final long serialVersionUID = 1L;
	private static final int INDEX = 50;// 每個小任務執行50個
	private int start;
	private int end;
	private int arr[];

	/**
	 * @param start
	 * @param end
	 * @param arr
	 */
	public MyTask(int start, int end, int[] arr) {

		this.start = start;
		this.end = end;
		this.arr = arr;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.util.concurrent.RecursiveTask#compute()
	 */
	@Override
	protected Integer compute() {
		int sum = 0;
		if (end - start < INDEX) {
			for (int i = start; i < end; i++) {
				sum += arr[i];
			}
			return sum;
		} else {
			int middle = (end + start) / 2;
			MyTask taskLeft2 = new MyTask(start, middle, arr);
			MyTask taskRight2 = new MyTask(middle, end, arr);
			/*invokeAll(taskLeft2, taskRight2);*/
			taskLeft2.fork();
			taskRight2.fork();
			int leftValue = taskLeft2.join();// 當計算完成時傳回計算結果。
			int rightValue = taskRight2.join();
			return leftValue + rightValue;
		}
	}
}
           

運作結果: 

多線程 ForkJoinPool介紹案列一:通過多線程分多個小任務進行列印資料  無傳回值的案列二:通過多線程分多個小任務進行資料累加  傳回結果集提供一些API 的方法,可以參考下總結

提供一些API 的方法,可以參考下

ForkJoinPool類

    • 構造方法摘要

      Constructors 
      構造方法與描述

      ForkJoinPool()

      Creates a

      ForkJoinPool

      with parallelism equal to

      Runtime.availableProcessors()

      , using the default thread factory, no UncaughtExceptionHandler, and non-async LIFO processing mode.

      ForkJoinPool(int parallelism)

      Creates a

      ForkJoinPool

      with the indicated parallelism level, the default thread factory, no UncaughtExceptionHandler, and non-async LIFO processing mode.

      ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)

      Creates a

      ForkJoinPool

      with the given parameters.
    • 方法摘要

      Methods 
      修飾符與類型 方法與描述

      boolean

      awaitTermination(long timeout, TimeUnit unit)

      Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

      protected int

      drainTasksTo(Collection<? super ForkJoinTask<?>> c)

      Removes all available unexecuted submitted and forked tasks from scheduling queues and adds them to the given collection, without altering their execution status.

      void

      execute(ForkJoinTask<?> task)

      Arranges for (asynchronous) execution of the given task.

      void

      execute(Runnable task)

      Executes the given command at some time in the future.

      int

      getActiveThreadCount()

      Returns an estimate of the number of threads that are currently stealing or executing tasks.

      boolean

      getAsyncMode()

      Returns

      true

      if this pool uses local first-in-first-out scheduling mode for forked tasks that are never joined.

      ForkJoinPool.ForkJoinWorkerThreadFactory

      getFactory()

      Returns the factory used for constructing new workers.

      int

      getParallelism()

      Returns the targeted parallelism level of this pool.

      int

      getPoolSize()

      Returns the number of worker threads that have started but not yet terminated.

      int

      getQueuedSubmissionCount()

      Returns an estimate of the number of tasks submitted to this pool that have not yet begun executing.

      long

      getQueuedTaskCount()

      Returns an estimate of the total number of tasks currently held in queues by worker threads (but not including tasks submitted to the pool that have not begun executing).

      int

      getRunningThreadCount()

      Returns an estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization.

      long

      getStealCount()

      Returns an estimate of the total number of tasks stolen from one thread's work queue by another.

      Thread.UncaughtExceptionHandler

      getUncaughtExceptionHandler()

      Returns the handler for internal worker threads that terminate due to unrecoverable errors encountered while executing tasks.

      boolean

      hasQueuedSubmissions()

      Returns

      true

      if there are any tasks submitted to this pool that have not yet begun executing.

      <T> T

      invoke(ForkJoinTask<T> task)

      Performs the given task, returning its result upon completion.

      <T> List<Future<T>>

      invokeAll(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete.

      boolean

      isQuiescent()

      Returns

      true

      if all worker threads are currently idle.

      boolean

      isShutdown()

      Returns

      true

      if this pool has been shut down.

      boolean

      isTerminated()

      Returns

      true

      if all tasks have completed following shut down.

      boolean

      isTerminating()

      Returns

      true

      if the process of termination has commenced but not yet completed.

      static void

      managedBlock(ForkJoinPool.ManagedBlocker blocker)

      Blocks in accord with the given blocker.

      protected <T> RunnableFuture<T>

      newTaskFor(Callable<T> callable)

      Returns a RunnableFuture for the given callable task.

      protected <T> RunnableFuture<T>

      newTaskFor(Runnable runnable, T value)

      Returns a RunnableFuture for the given runnable and default value.

      protected ForkJoinTask<?>

      pollSubmission()

      Removes and returns the next unexecuted submission if one is available.

      void

      shutdown()

      Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

      List<Runnable>

      shutdownNow()

      Attempts to cancel and/or stop all tasks, and reject all subsequently submitted tasks.

      <T> ForkJoinTask<T>

      submit(Callable<T> task)

      Submits a value-returning task for execution and returns a Future representing the pending results of the task.

      <T> ForkJoinTask<T>

      submit(ForkJoinTask<T> task)

      Submits a ForkJoinTask for execution.

      ForkJoinTask<?>

      submit(Runnable task)

      Submits a Runnable task for execution and returns a Future representing that task.

      <T> ForkJoinTask<T>

      submit(Runnable task, T result)

      Submits a Runnable task for execution and returns a Future representing that task.

      String

      toString()

      Returns a string identifying this pool, as well as its state, including indications of run state, parallelism level, and worker and task counts.
  • ExecutorService接口

    • 方法摘要

      Methods 
      修飾符與類型 方法與描述

      boolean

      awaitTermination(long timeout, TimeUnit unit)

      Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

      <T> List<Future<T>>

      invokeAll(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete.

      <T> List<Future<T>>

      invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

      Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires, whichever happens first.

      <T> T

      invokeAny(Collection<? extends Callable<T>> tasks)

      Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do.

      <T> T

      invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

      Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do before the given timeout elapses.

      boolean

      isShutdown()

      Returns true if this executor has been shut down.

      boolean

      isTerminated()

      Returns true if all tasks have completed following shut down.

      void

      shutdown()

      Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

      List<Runnable>

      shutdownNow()

      Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

      <T> Future<T>

      submit(Callable<T> task)

      Submits a value-returning task for execution and returns a Future representing the pending results of the task.

      Future<?>

      submit(Runnable task)

      Submits a Runnable task for execution and returns a Future representing that task.

      <T> Future<T>

      submit(Runnable task, T result)

      Submits a Runnable task for execution and returns a Future representing that task.
  • 總結

1,invokeAll(task)方法,主動執行其它的ForkJoinTask,并等待Task完成。(同步的)

2,fork方法,讓一個task執行(異步的)

3,join方法,讓一個task執行(同步的,它和fork不同點是同步或者異步的差別)

4,可以使用join來取得ForkJoinTask的傳回值。由于RecursiveTask類實作了Future接口,是以也可以使用get()取得傳回值。 

get()和join()有兩個主要的差別: 

join()方法不能被中斷。如果你中斷調用join()方法的線程,這個方法将抛出InterruptedException異常。 

如果任務抛出任何未受檢異常,get()方法将傳回一個ExecutionException異常,而join()方法将傳回一個RuntimeException異常。

5,ForkJoinTask在不顯示使用ForkJoinPool.execute/invoke/submit()方法進行執行的情況下,也可以使用自己的fork/invoke方法進行執行。 

使用fork/invoke方法執行時,其實原理也是在ForkJoinPool裡執行,隻不過使用的是一個“在ForkJoinPool内部生成的靜态的”ForkJoinPool。

6,ForkJoinTask有兩個子類,RecursiveAction和RecursiveTask。他們之間的差別是,RecursiveAction沒有傳回值,RecursiveTask有傳回值。

7,看看ForkjoinTask的Complete方法的使用場景 

這個方法好要是用來使一個任務結束。這個方法被用在結束異步任務上,或者為那些能不正常結束的任務,提供一個選擇。

8,Task的completeExceptionally方法是怎麼回事。 

這個方法被用來,在異步的Task中産生一個exception,或者強制結束那些“不會結束”的任務 

這個方法是在Task想要“自己結束自己”時,可以被使用。而cancel方法,被設計成被其它TASK調用。 

當你在一個任務中抛出一個未檢查異常時,它也影響到它的父任務(把它送出到ForkJoinPool類的任務)和父任務的父任務,以此類推。

9,可以使用ForkJoinPool.execute(異步,不傳回結果)、invoke(同步,傳回結果)、submit(異步,傳回結果)方法,來執行ForkJoinTask。

10,ForkJoinPool有一個方法commonPool(),這個方法傳回一個ForkJoinPool内部聲明的靜态ForkJoinPool執行個體。 在jdk1.8裡面才有。文檔上說,這個方法适用于大多數的應用。這個靜态執行個體的初始線程數,為“CPU核數-1 ”(Runtime.getRuntime().availableProcessors() - 1)。 

ForkJoinTask自己啟動時,使用的就是這個靜态執行個體。

繼續閱讀