Executor框架是JAVA并发包中的重要框架,负责线程池的创建与任务的执行,本文将从Executor框架出发,逐步介绍任务的分类,以及线程池的创建,包含了BlockingQueue、任务拒绝策略、线程池大小、线程池选择,最后,介绍了ThreadPoolExecutor的扩展方法,对线程池的任务进行统计。
文章目录
- 任务分类
- 无限制创建线程的不足
- Executor框架
-
- 执行策略
- 生命周期
- 延迟任务与周期任务
- 无返回值任务
- 有返回值任务
-
- 单任务提交
- 批量任务提交
-
- invokeAll与invokeAny
- CompletionService
- 任务运行
- 任务的结果
- 小结
- 执行策略----线程池
-
- 线程池的大小
- 配置ThreadPoolExecutor
-
- 线程创建与销毁
- 阻塞队列BlockingQueue
-
- BlockingDeque接口&LinkedBlockingDeque
- LinkedBlockingQueue
- TransferQueue&LinkedTransferQueue
- PriorityBlockingQueue
- SynchronousQueue
- ArrayBlockingQueue
- 有界与无界的选择
- newFixedThreadPool和newCachedThreadPool区别
- 饱和策略
- 线程工厂
- 扩展ThreadPoolExecutor
- 参考文献
任务通常是一些抽象的且离散的工作单元,通过将应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,以及提供一种自然的并行工作结构来提升并发性。多线程处理任务的唯一目的是:**提高CPU的利用率,尽量使所有CPU都处于忙碌状态。**
任务分类
任务分为CPU计算型和I/O密集型,针对不同的任务,应选择合适的策略,并非是线程数越多处理的越快,比如:
- 对于计算型任务,线程的数量过多会耗费过多的资源在线程切换上
- 对于I/O密集型任务,如果I/O等待的时间明显比计算时间长,就应该增加线程的数量。
无限制创建线程的不足
“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是需要创建大量的线程时:
- 线程的创建、销毁开销非常高
- 活跃的线程会消耗内存,如果可运行的线程数>处理器数量,就会存在空闲的线程,这些空闲线程占据内存,给垃圾回收器带来压力,而且这些线程会竞争CPU的资源,也会带来性能消耗;总之,如果已经有足够多的线程使所有的CPU处于忙碌状态,再创建更多的线程只会降低性能。
- 破坏稳定性,创建的线程过多,可能会导致OutOfMemoryError。
Executor框架
在java中,Executor框架用来解决无限制创建线程的问题,提供了任务执行策略、生命周期、延时任务、周期任务。executor的部分类图如下。
ThreadPoolExecutor实现了execute方法用于无返回值任务的处理,以及任务的拒绝策略、入队策略等,它的父类AbstractExecuratorService实现了ExecutorService的submit、incokeAll、invokeAny、get等方法用于有返回值任务的处理。
任务执行是线程完成的,任务执行的抽象接口是Executor,它是各种线程池的顶层接口。execute是向线程池提交任务,参数Runnable是具体的任务。
public interface Executor {
void execute(Runable command);
}
执行策略
Executor负责任务的执行,它的各种实现类(线程池)提供了不同的执行策略。执行策略包含以下几个方面:
- 在什么线程中执行
- 任务按照什么顺序执行(FIFO、LIFO、优先级)
- 有多少个任务可以并发执行
- 在队列中有多少个任务在等待执行
- 如果系统过载需要拒绝一个任务,应该选择哪一个任务,如何通知应用程序有任务被拒绝
- 在执行任务前或后,应该进行什么动作
这些执行策略都应该在选择线程池时确定下来,所以任务的执行策略=线程池的选择。线程池的创建有两种方式:Executors类的静态方法,创建ThreadPoolExecutor或ScheduledThreadPoolExecutor的对象。后文线程池中详细介绍各个策略。
生命周期
Executor接口中,execute方法定义了任务的线程池负责的执行策略,在ExecutorService接口中定义了线程的生命周期。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 其他用于任务提交的方法,与本小节无关,省略。。
}
生命周期有三种:运行、关闭、已终止。
- ExecutorService在初始创建时就处于运行状态。
- shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已提交的任务执行完成,包括那些在等待队列中的任务。
- shutdownNow方法将执行粗暴的关闭过程,尝试取消所有运行中的任务,并且不再启动等待队列中尚未完成的任务。
- isShutdown方法返回是否掉用过shutdown或shutdownNow,而不关心结果,只要被调用过,返回true,它表示生命周期中的关闭。关闭后,继续提交任务,会抛出RejectExecutionException或者抛弃任务,这取决于所选择的线程池的任务拒绝策略。
- isTerminated方法返回executService是否已终止,即所有的任务都完成(正常完成或粗暴的关闭)。
- awaitTermination方法来等待executorService到达终止状态,可以设置超时,超时前未到达终止状态返回false,到达终止状态返回true,等待过程中被中断抛出InterruptedException。
// 示例
private ExecutorService executorService;
public synchronized ExecutorService init() {
if (executorService == null) {
executorService = Executors.newSingleThreadExecutor(); // 运行
}
return executorService;
}
public void execTask(Runnable command) {
if (!executorService.isShutdown()) {
executorService.execute(command); // 运行
}
}
public void close() {
executorService.shutdown(); // 关闭
if (!executorService.awaitTermination(5, TimeUnit.SECOND)) {
executorService.shutdownNow();
}
// 终止
}
延迟任务与周期任务
在java6以前,通过Timer类实现延时任务或周期任务,在java6以后,不要再使用Timer。Timer有以下缺陷:
- Timer在执行所有定时任务时只会创建一个线程。如果某一个提交的任务执行时间过长,那么将破坏其他TimerTask的精准性。
- 如果某一个任务抛出了未受检异常(RuntimeException),TimerTask将会终止定时线程。也不会进行恢复。
定时或周期任务可以交给:
Executors.newScheduledThreadPool()
或
new ScheduledExecutorService()
无返回值任务
Runnable接口的任务没有返回值,直接提交给线程池处理即可,实现也很简单。
任务的执行分为三步:
- 如果线程池中正在运行的线程数<corePoolSize,尝试创建新线程执行任务,若成功,直接返回,若失败,执行第二步。
- 尝试将线程放入等待队列
,具体怎么实现的视线程池选择的阻塞队列而定,如果成功入队,需要再次校验线程池是否正在运行,如果没有运行,删除并拒绝任务workQueue.offer(command)
、remove(comman)
,如果线程池在运行,但是没有线程在运行,开启一个新线程,保持始终有活跃的线程在处理任务,但不是处理当前的任务,因为可能会破坏任务处理的顺序优先级策略,只要有活跃的线程,当前的任务已成功入队,早晚都会被处理。reject(command)
- 如果第二步没有成功入队,再尝试创建新线程执行任务
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) // 没有线程在运行
addWorker(null, false);
} else if (!addWorker(command, false)) {
reject(command);
}
}
}
有返回值任务
Runnable作为Executor接口的基本任务表示形式,具有很大的局限性,虽然Runnable能够将任务的计算结果存放在某个共享的数据结构、数据库中,但它不能返回一个值,或者抛出受检的异常(IOExecption或SQLException)。对于耗时的任务,比如查询数据库、从网络上获取资源、计算某个复杂功能等,Callable是一个更好的抽象。它支持返回一个值、抛出受检异常。
Callable与Future接口,以及ExecutorService接口扩展了Executor接口中的任务提交方法。
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface ExecutorService extends Executor {
// ...省略其它方法,与本节无关
// 单任务提交
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 批量提交
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future表示一个任务的生命周期,接口规范的隐含意义是,任务的生命周期只能前进不能后退,这也与ExecutorService的生命周期一样,都是只进不退的。只不过一个用来表示任务的生命周期,一个用来表示线程池的生命周期。
- cancel 取消任务
- isCancelled 判断任务是否取消
- isDone 是否已完成
- get 阻塞的获取任务的执行结果,直至抛异常或超时,如果任务抛出了非InterruptException,get将该异常封装为ExecutionException,需要通过getCause才能获取被封装的初始异常。
ExecutorService中的任务可以分为批量提交和单任务提交。
单任务提交
对于单任务提交,调用submit方法。
单任务的提交离不开FutureTask,类图如下,它实现了future的get、cancel、isCancelled等方法,也实现了RunableF ,通过submit返回的Future对象获取任务的执行结果。
FutureTask是Future接口的一个最重要的实现类。对于需要获取执行结果的任务,都是通过AbstractExecutorService调用FutureTask的相关方法实现的
// AbsractExecutorService中的submit实现
/** 提交一个callable task,返回一个Future对象,
任务的执行结果通过future.get获取
结果取决于任务的执行
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask); // 将任务交给线程池处理
return ftask; // 返回可获取任务执行结果的Future
}
/** 提交一个callable task,返回一个Future对象,
任务的执行结果通过future.get获取
结果是固定的,即传入的result
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/** 提交一个callable task,返回一个Future对象,
任务的执行结果通过future.get获取
结果取决于任务的执行
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
newTaskFor(task)将Runnable或Callable包装为RunnableFuture,这一过程是FutureTask做的
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
最终,对FutureTask中的callable与state属性分别进行设置。其中Executors.callable则是将根据Runnable对象构造callable对象的静态方法。
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
Executors.callable的实现如下,RunnableAdapter中的result的意思是,如果任务完成时,需要指定返回结果,才传入result,否则,将result传入null即可。因此,RunnableAdapter只是一种特殊的Callable,在平时的开发中,可以使用Executors.callable方法将Runnable对象转为Callable对象
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result; // RunnableAdapter的call方法的返回值是固定的,覆盖callable.call的返回值
}
}
批量任务提交
invokeAll与invokeAny
在FutureTask中,单任务提交使用的submit方法,批量任务提交使用的invokeAll、invokeAny方法。
- invokeAll:直到所有的任务都处理完,或者调用线程被中断、又或者超过指定时限,才会返回
,当返回时,任何未完成的线程都会被取消,所以,每个任务要么完成,要么取消。通过调用各个Future的get或isCancelled方法来判断究竟是哪种情况。List<Future>
- invokeAny:批量任务中,只要有一个任务完成,返回该任务的返回值,对于未完成的任务,将会被取消。
// invokeAll的使用示例
public void testBatchTask(List<CustomCallableTask> tasks) {
List<Future<CustomObject>> futures = executorService.invokeAll(tasks, 10, TimeUnit.SECOND); //阻塞操作
Iterator<CustomCallableTask> taskIter = tasks.iterator();
for(Future<CustomObject> future : futures) {
CustomCallableTask task = taskIter.next();
// task与future 一一对应
if (future.isCancelled) {
// ...任务失败的策略
} else {
// ...任务成功时的策略
}
}
}
invokeAll会按照任务集合的顺序一个一个的调用Future.get()获取执行结果,然后按照顺序输出执行结果。优点是有序性保证了各个Future可以与Callable关连起来,缺点是invokeAll()阻塞的时间可能会很久,如果设置的过短了,有些任务又无法完成。
CompletionService
CompletionService和invokeAll是互补的,它的批量任务之间都是独立的,任务执行结果也是无序的。所以,具体选择哪一个要看业务需求。
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
/**
* 获取下一个已完成任务的执行结果
*/
// 如果没有已完成的任务就等待
Future<V> take() throws InterruptedException;
// 如果没有已完成的任务就返回null
Future<V> poll();
// 如果没有已完成的任务就等待一定的时间,如果等到超时都没有任务完成,返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
ExecutorCompletionService类实现了CompletionService接口,它的实现非常简单,完成一个任务就添加一个Future到BlockingQueue中,获取时就从BlockingQueue中获取,所以先完成的任务先被获取到。BlockingQueue是一个重要的接口,后文将详细说明。
使用示例:
private CompletionService cps = new ExecutorCompletionService<CustomObject>(Executors.newFixedThreadPool());
public List<Futures<CustomObject>> testCompletService(List<CustomCallableTask> tasks) {
for(CustomCallableTask task : tasks) {
cps.submit(task);
}
// 假设只需要获取前5个完成的任务结果,后面的不关心
int count = 0;
List<Futures<CustomObject>> fs = new ArrayList<>();
while(count < 6) {
try {
Future<CustomObject> future = cps.take();
fs.add(future);
} catch (InterruptedException e1) {
// ...
} catch (ExecutionException e2) {
// ...
}
}
return fs;
}
任务运行
任务运行的逻辑也是FutureTask实现的,调用callable.call,
resulte=callable.call()
,获取result后,调用
set(result)
,最终会调用到
LockSupport.unpark(t)
,t是调用Future.get()方法的线程。
public void run() {
// 省略...
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 抛异常
}
if (ran)
set(result); // 在完成任务的处理时,调用set方法,设置返回结果,唤醒被阻塞的get
}
} finally {
// 省略...
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion(); // 调用LockSupport.unpark(t);停止阻塞
}
}
任务的结果
调用Future.get()获取任务的执行结果,实际的逻辑是:
- 调用 LockSupport.parkNanos()或LockSupport.park(); 阻塞调用get方法的线程。
- 等待任务运行方法
执行到run()
,在这些set方法中,会调用LockSupport.unpark(t),这个t就是调用get方法的线程,取消阻塞。set(resulte)或setException()
简而言之,FutureTask是通过LockSupport实现的wait-notify机制的。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
// 省略..
int s = state;
if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 省略..
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); // 带有超时的阻塞
}
else
LockSupport.park(this); // 不带超时的阻塞
}
}
// 根据state设置返回值
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
小结
Executors的任务分为两类,一种是不需要通过Future获取执行结果的,一种是需要通过Future获取执行结果的。
对于不需要获取执行结果的任务,直接提交给线程池,线程池分配一个线程处理即可。建议调用execute方法,该方法在
ThreadPoolExecutor
中实现,也可以调用
submit
方法,但是会造成资源的浪费,毕竟它需要去耗费资源维护线程的执行进度,以及存在一套
wait-notify
机制,有一种特例是,不需要执行结果,但是是批量提交的任务,这个时候就要考虑使用有返回值任务中的批量任务提交了(
invokeAll、invokeAny、completionService
)。
对于需要获取执行结果的任务,依赖于FutureTask类,它是Future的一个具体实现类:
- 如果是单任务的提交,调用submit方法
- 如果是批量任务提交,且需要保证所有任务都完成,调用invokeAll方法,返回的
严格按照输入的List<Future>
顺序输出;List<Callable>
- 如果是批量任务提交,只需要任何一个任务完成,调用invokeAny方法
- 如果是批量任务提交,且不关心任务与返回结果的一一对应关系,更关心尽可能快的获取到所有任务的执行结果,使用completionService。
对于延时或定时任务,交给:
Executors.newScheduledThreadPool()
或
new ScheduledExecutorService()
对于线程的执行策略,则完全取决于选用的线程池。
执行策略----线程池
上一章介绍了Executor的使用,但是还没有介绍如何去选择Executor,即如何选择线程池。Executor的实例化有种方式,第一种是通过Executors的静态方法创建ExecutorService对象,第二类是通过new ThreadPoolExecutor创建对象。不过Executors的静态方法创建线程池时,底层还是通过new ThreadPoolExecutor创建对象,所以,本章将先介绍核心ThreadPoolExecutor,再介绍这些线程池的使用。
线程池的大小
线程池面临的首要问题就是,线程池大小是多少?
前文已介绍过,线程池大小的唯一标准是尽可能提高CPU的利用率,过小,CPU空闲,过大,CPU耗费资源创建、维护、切换、销毁这些活跃的线程。因此,要正确的设置线程池的大小,必须估算出任务的等待时间与计算时间的比值。公式如下:$ \eta指CPU利用率,N为数量,t为时间$
t c o m p u t e + t w a i t t c o m p u t e = N t h r e a d s N c p u s ∗ η = > N t h r e a d s = N c p u s ∗ η ∗ ( 1 + t w a i t t c o m p u t e ) \frac {t_{compute} + t_{wait}} {t_{compute}} = \frac {N_{threads}} {N_{cpus}*\eta} \\ => N_{threads} = N_{cpus} * \eta * (1+\frac {t_{wait}} {t_{compute}}) tcomputetcompute+twait=Ncpus∗ηNthreads=>Nthreads=Ncpus∗η∗(1+tcomputetwait)
配置ThreadPoolExecutor
ThreadPoolExecutor是一个灵活稳定、可各种定制的线程池。它定义了很多构造函数,一个比较典型的构造函数是:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
线程创建与销毁
corePoolSize、maximumPoolSize、keepAliveTime、TimeUnit决定了线程的创建与销毁。
- corePoolSize是线程池的目标大小,在没有任务执行时线程池的大小,并且只有在workQueue满了以后,才会创建超出这个数量的线程。如果大小设置为0,那么一开始提交的任务将会全部进入workQueue,当workQueue满了以后才会创建线程去处理这些任务,所以,是否设置为0也要谨慎考虑。
- maximumPoolSize是线程池的最大大小,它表示可同时活跃的线程数量的上限,超过了这个上限,将会根据RejectedExecutionHandler拒绝策略拒绝任务。
- keepAliveTime、TimeUnit表示如果某个线程的空闲时间超过了keepAliveTime,那么将会被标记位可回收的,只要线程池的当前大小超过corePoolSize,这些线程就会被终止。虽说这些回收能够减少一些资源消耗,不过,不断地回收->创建->回收->创建活跃的线程反而会产生额外的延迟,具体设置的多大,视情况而定。
以Executors中的newFixedThreadPool与newCachedThreadPool做一个示例:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- newFixedThreadPool:固定大小的线程池,所以它的
和corePoolSize
的大小相等,keepAliveTime为0表示不会超时。maximumPoolSize
- newCachedThreadPool:将线程池的最小大小设置为0,最大大小设置为
的最大值,将超时时间设置为超过1分钟,这种方式创造出来的线程池可以被无线扩展,并且当需求降低时自动收缩。Int
阻塞队列BlockingQueue
ThreadPoolExecutor的第四个参数是BlockingQueue,BlockingQueue是阻塞队列。它的作用:当线程池的大小达到最小大小后,后续提交的任务将被放置在阻塞队列,从而限制资源的消耗。
BlockingQueue分为3种:无界队列、有界队列、同步移交。
BlockingQueue的示意图如下,它继承了Queue接口,具有队列(FIFO)的特性,同时,它又加入了Block的特性,如图中红线,当队列满时,入队请求会被阻塞,当队列空时,出队请求被阻塞。
BlockingQueue接口的众多实现类会根据自身的特性进行改造,并非一定严格遵循FIFO(比如PriorityBlockingQueue根据自然顺序或Comparable接口的实现来决定出队顺序,比如SynchronousQueue甚至会优先将工作提交给线程池(maximumPoolSize>线程池大小>minPoolSize)而不是入队)
BlockingDeque接口&LinkedBlockingDeque
BlockingDequeu 是双端阻塞队列,两端都可以进队列,也可以出队列,它只有一个实现类:LinkedBlockingDeque。
LinkedBlockingDeque有两个构造器,若使用无参构造器,是无界队列,若使用含参构造器,是有界队列。
它依靠内部类Node完成
linkFirst、linkLast、unlinkFirst、unlinkLast
四个核心接口。这些操作都需要先获取锁才能执行,具体的链表入队出队操作比较简单,就不详细说了。
public class LinkedBlockingDeque<E> extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
/** Doubly-linked list node class */
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
}
LinkedBlockingQueue
第二个是LinkedBlockingQueue,构造器有两个,分别对应有界与无界队列,入队时,添加到队尾,出队时,从头部取出一个。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 无界构造器
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 有界构造器
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 入队,添加到队尾
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 出队,head.item==null,head是在第一个有效节点前的节点,仅用于标记。出队时将head.next设置为head
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
}
TransferQueue&LinkedTransferQueue
LinkedTransferQueue的功能可以说是所有队列中最完善的,LinkedTransferQueue实际上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。如果需要使用无界队列,第一个要考虑的就是LinkedTransferQueue。
它的性能更强,因为它是java7中新增的,而BlockingQueue的其他实现类是java5添加的,与它功能类似的是SynchronousQueue,在元素交付方面,它的性能是SynchronousQueue的3倍(非公平模式)和14倍(公平模式)。
构造器如下,是无界队列。
简要的原理是:
- 入队:入队包含了BlockingQueue的入队方式,也包含了transfer方式(transfer未必入队)
- 出队:若队列不为空,直接取走,若为空,就创建一个消费者线程等待。
带有transfer的方法是用于一次性交付的,在队列中已有元素的情况下,调用这些方法,可以确保队列中已存在的元素都能被处理。下面的代码块中:
- transfer(E e)若当前存在一个正在等待获取的消费者线程,即立刻将e移交之;否则将元素e插入到队列尾部,并且当前线程进入阻塞状态,直到有消费者线程取走该元素。
- tryTransfer(E e)若当前存在一个正在等待获取的消费者线程,则该方法会即刻转移e,并返回true;若不存在则返回false,但是并不会将e插入到队列中。这个方法不会阻塞当前线程,要么快速返回true,要么快速返回false。
- tryTransfer(E e, long timeout, TimeUnit unit)与 tryTransfer(E e)是一样的,只不过加了个超时,可以多等待一会。
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
PriorityBlockingQueue
优先级队列,包含有界与无界两种。对于有界队列,可以通过传入comparator对象,来定义优先级规则。
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
SynchronousQueue
- 入队:要将一个元素放置于SynchronousQueue中,必需有消费者线程在等待,如果没有消费者线程在等待,要么创建新线程处理(线程池大小小于最大值),要么拒绝任务。
- 出队:若队列不为空,直接取走,若为空,就创建一个消费者线程等待。
可以看出SynchronousQueue与LinkedTransferQueue的区别在入队:
- SynchronousQueue实际没有队列,来一个任务就要立刻召唤一个线程处理,如果召唤不来,就拒绝,所以使用SynchronousQueue时,需要将线程池的最大大小设置为Integer.MAX_VALUE,避免任务饿死。
- LinkedTransferQueue入队时,如果没有线程在等待,会阻塞当前线程、或直接返回、或等一段时间返回,SynchronousQueue入队时,会拒绝任务。
ArrayBlockingQueue
ArrayBlockingQueue的所有构造方法都带有参数capacity,因此,它只能是有界队列。它靠一个Object数组实现,原理相对比较简单,入队出队就是更新数组。
/** The queued items */
final Object[] items;
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
有界与无界的选择
选择BlockingQueue时,第一个问题是选择有界还是无界,原则是:
- 选择无界:只有当任务相互独立时,为队列设置边界才是合理的,如果任务之间存在依赖性,那么有界队列可能会导致线程“饥饿”死锁的问题。此时应该使用无界队列:如newCachedThreadPool,它能提供比固定大小的线程池更好的排队性能,它选择SynchronousQueue作为等待队列。
- 选择有界:需要限制线程池的资源消耗,那么可以选择固定大小的线程池
newFixedThreadPool和newCachedThreadPool区别
newFixedThreadPool、newCachedThreadPoo区别:
- newFixedThreadPool,线程池大小固定,采用无界的LinkedBlockingQueue,线程任务会无限制排队,即便不断地提交,也不会无限制创建线程,优点是节省资源,缺点是任务处理的慢。
- newCachedThreadPool,线程池最大大小无限制,corePoolSize为0,采用同步交付的SynchronousQueue,伸缩性强,任务处理的快,资源消耗大。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
顺便提下newSingleThreadExecutor,它的底层使用无界的LinkedBlockingQueue,且线程池的大小始终为1,效果就是:不管任务有多少,永远只有一个线程在处理,如果有线程意外中断,它会主动创建1个线程继续处理任务。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
饱和策略
饱和策略只在有界队列下才会生效,如果队列是无界的,永远不会饱和。一共有四种饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
- AbortPolicy:默认策略,拒绝任务,抛出RejectedExecutionException
- CallerRunsPolicy:调用者运行,将任务退回给提交任务的线程(执行submit、execute、invokeAny、invokeAll的线程),提交任务的线程会被用来处理这个任务,这样,减缓了任务的提交速度,当服务器过载时,这种过载情况会逐渐向外蔓延,从线程池工作队列到调用者线程,再到TCP层,最终达到客户端,服务器在高负载下不是直接down掉,而是慢慢的down掉,所以有平缓降低性能的作用。
- DiscardPolicy:拒绝任务,但不抛异常
- DiscardOldestPolicy:抛弃最旧的任务,但不抛异常
线程工厂
线程工厂实际上是new ThreadPoolExecutor的第四个参数,ThreadPoolExecutor提供了构造器可以不带该参数,与饱和策略一样,它也有默认值,线程命名就会比较抽象,所以可以根据自定义的线程工厂,来为任务命名,更好的debug。
public class CustomThreadFactory implements ThreadFac {
private final Sting poolName;
private AtomicInteger threadNum = new AtomicInteger(1);
public CustomThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(poolName+threadNum.getAndIncrement());
return t;
}
}
扩展ThreadPoolExecutor
最后,还能扩展ThreadPoolExecutor,用来统计线程池中任务的执行情况。
public class ExtendExecutor extends ThreadPoolExecutor{
private Logger logger = LoggerFactory.getLogger(ExtendExecutor.class);
public ExtendExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
// do something 比如记录线程开始执行任务的时间
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// do something 比如计算线程执行任务的时间、平均时间等。。
}
@Override
protected void terminated() {
// do something 比如统计线程被销毁的频率,用以更好的决策线程池的配置
super.terminated();
}
}
参考文献
- 《并发编程实战》 Doug Lea
- jdk 1.8并发包源码