1.Executor框架主要由3大部分组成:
- 任务。包括被执行任务需要实现的接口:Runnable接口或者Callable接口
- 任务的执行。包括执行机制的核心接口Executor,以及继承Executor的ExecutorService接口。Executor框架有两个关键类,ThreadPoolExecutor和ScheduledThreadPoolExecutor。
- 异步计算的结果。包括接口Future和实现Future接口的FutureTask类。
Executor框架示意图
2.Executor框架的使用示意图
主线程创建实现Runnable或者Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装成一个Callable对象,使用Executors.callable(Runnable task)。然后交给ExecutorService用execute或者submit执行,前者不返回结果,后者可以返回实现了Future接口的对象(到目前为止返回的是FutureTask对象),FutureTask实现了Runnable,程序员可以创建FutureTask,然后直接交给ExecutorService。
最后主线程可以执行FutureTask.get()方法等待任务完成(阻塞方法),也可以使用FutureTask.cancell(boolean mayInterruptIfRunning) 来取消任务的执行。
3.Executor框架的成员介绍
3.1 ThreadPoolExecutor线程池
它通常通过工厂类Exectors创建,也可以自定义创建, Executors的创建有三个方法
- newFixedThreadPool:创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。适用于负载比较重的服务器。
源代码如下,corePoolSize和maximumPoolSize都是相同的。使用LinkedBlockingQueue(阻塞最大值Integer.MAX_VALUE)作为阻塞队列,让线程多的都阻塞在这个队列里,所以这里一般不会有拒绝任务,RejectedExecutionHandler.rejectedExecution方法。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor:创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务,适用于需要保证顺序的执行各个任务,并且在任意时间点不会有多个线程时活动的场景
源代码如下,基本和前者类似,corePoolSize和maximumPoolSize设置为了1
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool:创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程,它是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。
源代码如下,核心线程池为0,最大线程池可以达到Integer.MAX_VALUE个,keepAliveTime设置为60表示空闲线程最长等待任务时间60秒后就会被终止,阻塞队列使用没有容量的SynchronousQueue队列,而这个又是无界的,MAME当主线程提交任务的书读高于线程的处理速度,那么CachedThreadPool就会不听的创建线程,极端的情况下CachedThreadPool回应为过多创建线程而耗尽CPU和内存资源。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3.2 ScheduledThreadPoolExecutor周期执行线程池
它继承ThreadPoolExecutor类,主要用来在给定延迟后运行任务,或者定期执行任务,类似Timer,但是可以创建多个后台线程数进行执行。通常通过工厂类Exectors创建
- newScheduledThreadPool:创建一个可延迟执行或定期执行的线程池,包含若干个线程。适用于需要多个后台程序执行周期的任务,同时为了满足资源管理的需求而限制后台线程的数量的场景。
- newSingleThreadScheduledExecutor:只包含一个线程的执行延迟或者定期的线程池。适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的场景。
3.3 FutureTask简介
FutureTask除了实现Future接口外,还实现了Runnable接口,因此FutureTask可以交给Executor执行,也可以调用线程直接执行FutureTask.run() .
1) Future有三种状态
未启动状态。FutureTask.run方法处于未执行之前,FutureTask处于未启动
已经启动。FutureTask.run()方法被执行的过程中,FutureTask处于启动状态
已完成。FutureTask.run执行完;或者FutureTask.cancell();或者执行run()方法抛出异常而结束。
图示如下:
- 当FutureTask处于未启动或者启动状态时,执行FutureTask.get()方法将导致调用线程阻塞,当FutureTask处于已经完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。
- 当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行;
- 当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断此任务线程的方式来试图停止任务;
- 当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务 的线程产生影响(让正在执行的任务运行完成);
- 当FutureTask处于已完成的状态时,执行FutureTaskcancel(...)方法将返回false.
2) FutureTask的使用
FutureTask可以交给EXecutor执行,也可以通过ExecutorService.submit()方法返回一个FutureTask,然后执行FutureTask.get,或者cancel方法,除此之外还可以单独使用FutureTask。