天天看点

个人学习笔记:java常用线程池及相关的接口

1、Runnable接口与Callable接口

(1)Runnable接口

//Runnable接口的唯一方法,无返回值,也不会抛出异常
void run()
           

(2)Callable接口

//callable的唯一方法
//返回计算结果,或者如果无法执行则抛出异常。
V	call() throws Exception
           
2、Future框架

(1)Future接口主要方法:

//尝试取消执行此任务。如果任务已完成,已取消或由于某些其他原因无法取消,则此尝试将失败。
boolean cancel(boolean mayInterruptIfRunning)
//如果此任务在正常完成之前被取消则返回true。
boolean isCancelled()
//如果此任务完成,则返回true。
//完成可能是由于正常终止,例外或取消,在所有这些情况下,此方法将返回true。
boolean isDone()
//需要等待计算完成,然后获取其结果。
V get()throws InterruptedException,ExecutionException
V get(long timeout,TimeUnit unit)throws InterruptedException,ExecutionException,TimeoutException
           

(2)RunnableFuture接口,是Runnable接口Future接口的子接口。除此之外它本身只有一个run方法。

(3)FutureTask类,实现了RunnableFuture接口。

构造函数:

//创建一个FutureTask在运行时执行给定的Callable。
FutureTask(Callable<V> callable)
//在运行时创建一个FutureTask执行给定的Runnable,get并在成功完成时返回给定的结果result。
FutureTask(Runnable runnable, V result)
           

并可通过get方法获取计算结果。

3、Executor接口:

这个接口只有一个方法,在将来的某个时间执行给定的命令。根据Executor实现的判断,该命令可以在新线程,池化线程或调用线程中执行。

4、ExecutorService接口(Executor接口的子接口)

主要方法submit。

//提交值返回任务以执行并返回表示任务的挂起结果的Future。
//Future的get方法将在成功完成后返回任务的结果。
<T> Future<T> submit(Callable<T> task)
//提交Runnable任务以执行并返回表示该任务的Future。
//Future的get方法将在成功完成后返回给定的结果。
//task要提交的任务,result返回的结果
<T> Future<T> submit(Runnable task,T result)
//提交Runnable任务以执行并返回表示该任务的Future。
//Future的get方法将null在成功完成后返回
Future<?> submit(Runnable task)
           
5、Executors类

这个提供了几个非常有用的方法来创建线程池。

(1)newCachedThreadPool()方法。(线程数量可变)

创建一个根据需要创建新线程的线程池,但在它们可用时将重用以前构造的线程。

这些池通常会提高执行许多短期异步任务的程序的性能。 调用execute将重用以前构造的线程(如果可用)。如果没有可用的现有线程,则将创建一个新线程并将其添加到池中。未使用60秒的线程将终止并从缓存中删除。因此,长时间闲置的池不会消耗任何资源。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

(2)newFixedThreadPool()方法

创建一个大小固定线程池,该线程池重用在共享的无界队列中运行的固定数量的线程。池中的线程将一直存在,直到它显式出现shutdown。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           

(3)newScheduledThreadPool()方法

创建一个线程池,可以调度命令在给定的延迟后运行,或者定期执行。

//corePoolSize  池中保留的线程数,即使它们处于空闲状态
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
           

(4)newSingleThreadScheduledExecutor()方法。

创建一个单线程执行程序,保证任务按顺序执行,并且不会有多个任务处于活动状态在任何给定的时间。

(5)newWorkStealingPool()方法

创建一个工作窃取线程池 ,产生的线程是精灵线程(守护线程、后台线程)

//Creates a work-stealing thread pool using all available processors as 
//its target parallelism level.
public static ExecutorService newWorkStealingPool()
           
6、ForkJoinPool类

可以将将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果,与MapReduce计算的过程有相似之处。任务应该是RecursiveAction类或者RecursiveTask的子类。

使用示例:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestForkJoinPool {

    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
    }


    static class AddTask extends RecursiveTask<Long> {

        private static final long serialVersionUID = 1L;
        int start, end;

        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {

            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            }

            int middle = start + (end-start)/2;

            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();

            return subTask1.join() + subTask2.join();
        }

    }

    public static void main(String[] args) throws IOException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        forkJoinPool.execute(task);
        long result = task.join();
        System.out.println(result);
    }
}
           
7、ThreadPoolExecutor类

可以看到Executors中创建newCachedThreadPool和newFixedThreadPool都是调用这个类的构造方法。而newScheduledThreadPool是先调用ScheduledThreadPoolExecutor类的一个构造方法,然后在ScheduledThreadPoolExecutor的构造方法中调用其父类ThreadPoolExecutor类的一个构造方法来创建的。也就是说,newCachedThreadPool、newFixedThreadPool和ScheduledThreadPoolExecutor这三个创建线程池的方法最终都是通过调用ThreadPoolExecutor这个类的构造方法来创建的。