天天看点

JAVA并发(一)任务执行框架Executor任务分类无限制创建线程的不足Executor框架执行策略----线程池扩展ThreadPoolExecutor参考文献

Executor框架是JAVA并发包中的重要框架,负责线程池的创建与任务的执行,本文将从Executor框架出发,逐步介绍任务的分类,以及线程池的创建,包含了BlockingQueue、任务拒绝策略、线程池大小、线程池选择,最后,介绍了ThreadPoolExecutor的扩展方法,对线程池的任务进行统计。

文章目录

  • 任务分类
  • 无限制创建线程的不足
  • Executor框架
    • 执行策略
    • 生命周期
    • 延迟任务与周期任务
    • 无返回值任务
    • 有返回值任务
      • 单任务提交
      • 批量任务提交
        • invokeAll与invokeAny
        • CompletionService
      • 任务运行
      • 任务的结果
    • 小结
  • 执行策略----线程池
    • 线程池的大小
    • 配置ThreadPoolExecutor
      • 线程创建与销毁
      • 阻塞队列BlockingQueue
        • BlockingDeque接口&LinkedBlockingDeque
        • LinkedBlockingQueue
        • TransferQueue&LinkedTransferQueue
        • PriorityBlockingQueue
        • SynchronousQueue
        • ArrayBlockingQueue
        • 有界与无界的选择
        • newFixedThreadPool和newCachedThreadPool区别
      • 饱和策略
      • 线程工厂
  • 扩展ThreadPoolExecutor
  • 参考文献

任务通常是一些抽象的且离散的工作单元,通过将应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,以及提供一种自然的并行工作结构来提升并发性。多线程处理任务的唯一目的是:**提高CPU的利用率,尽量使所有CPU都处于忙碌状态。**

任务分类

任务分为CPU计算型和I/O密集型,针对不同的任务,应选择合适的策略,并非是线程数越多处理的越快,比如:

  • 对于计算型任务,线程的数量过多会耗费过多的资源在线程切换上
  • 对于I/O密集型任务,如果I/O等待的时间明显比计算时间长,就应该增加线程的数量。

无限制创建线程的不足

“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是需要创建大量的线程时:

  • 线程的创建、销毁开销非常高
  • 活跃的线程会消耗内存,如果可运行的线程数>处理器数量,就会存在空闲的线程,这些空闲线程占据内存,给垃圾回收器带来压力,而且这些线程会竞争CPU的资源,也会带来性能消耗;总之,如果已经有足够多的线程使所有的CPU处于忙碌状态,再创建更多的线程只会降低性能。
  • 破坏稳定性,创建的线程过多,可能会导致OutOfMemoryError。

Executor框架

在java中,Executor框架用来解决无限制创建线程的问题,提供了任务执行策略、生命周期、延时任务、周期任务。executor的部分类图如下。

ThreadPoolExecutor实现了execute方法用于无返回值任务的处理,以及任务的拒绝策略、入队策略等,它的父类AbstractExecuratorService实现了ExecutorService的submit、incokeAll、invokeAny、get等方法用于有返回值任务的处理。

JAVA并发(一)任务执行框架Executor任务分类无限制创建线程的不足Executor框架执行策略----线程池扩展ThreadPoolExecutor参考文献

任务执行是线程完成的,任务执行的抽象接口是Executor,它是各种线程池的顶层接口。execute是向线程池提交任务,参数Runnable是具体的任务。

public interface Executor {
	void execute(Runable command);
}
           

执行策略

Executor负责任务的执行,它的各种实现类(线程池)提供了不同的执行策略。执行策略包含以下几个方面:

  • 在什么线程中执行
  • 任务按照什么顺序执行(FIFO、LIFO、优先级)
  • 有多少个任务可以并发执行
  • 在队列中有多少个任务在等待执行
  • 如果系统过载需要拒绝一个任务,应该选择哪一个任务,如何通知应用程序有任务被拒绝
  • 在执行任务前或后,应该进行什么动作

这些执行策略都应该在选择线程池时确定下来,所以任务的执行策略=线程池的选择。线程池的创建有两种方式:Executors类的静态方法,创建ThreadPoolExecutor或ScheduledThreadPoolExecutor的对象。后文线程池中详细介绍各个策略。

生命周期

Executor接口中,execute方法定义了任务的线程池负责的执行策略,在ExecutorService接口中定义了线程的生命周期。

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    // 其他用于任务提交的方法,与本小节无关,省略。。
}    
           

生命周期有三种:运行、关闭、已终止。

  • ExecutorService在初始创建时就处于运行状态。
  • shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已提交的任务执行完成,包括那些在等待队列中的任务。
  • shutdownNow方法将执行粗暴的关闭过程,尝试取消所有运行中的任务,并且不再启动等待队列中尚未完成的任务。
  • isShutdown方法返回是否掉用过shutdown或shutdownNow,而不关心结果,只要被调用过,返回true,它表示生命周期中的关闭。关闭后,继续提交任务,会抛出RejectExecutionException或者抛弃任务,这取决于所选择的线程池的任务拒绝策略。
  • isTerminated方法返回executService是否已终止,即所有的任务都完成(正常完成或粗暴的关闭)。
  • awaitTermination方法来等待executorService到达终止状态,可以设置超时,超时前未到达终止状态返回false,到达终止状态返回true,等待过程中被中断抛出InterruptedException。
// 示例
private ExecutorService executorService;

public synchronized ExecutorService init() {
    if (executorService == null) {
         executorService = Executors.newSingleThreadExecutor(); // 运行
    }  
    return executorService;
}

public void execTask(Runnable command) {
    if (!executorService.isShutdown()) {
        executorService.execute(command); // 运行
    }
}

public void close() {
    executorService.shutdown(); // 关闭
    if (!executorService.awaitTermination(5, TimeUnit.SECOND)) {
        executorService.shutdownNow();
    }
    // 终止
}
           

延迟任务与周期任务

在java6以前,通过Timer类实现延时任务或周期任务,在java6以后,不要再使用Timer。Timer有以下缺陷:

  • Timer在执行所有定时任务时只会创建一个线程。如果某一个提交的任务执行时间过长,那么将破坏其他TimerTask的精准性。
  • 如果某一个任务抛出了未受检异常(RuntimeException),TimerTask将会终止定时线程。也不会进行恢复。

定时或周期任务可以交给:

Executors.newScheduledThreadPool()

new ScheduledExecutorService()

无返回值任务

Runnable接口的任务没有返回值,直接提交给线程池处理即可,实现也很简单。

任务的执行分为三步:

  • 如果线程池中正在运行的线程数<corePoolSize,尝试创建新线程执行任务,若成功,直接返回,若失败,执行第二步。
  • 尝试将线程放入等待队列

    workQueue.offer(command)

    ,具体怎么实现的视线程池选择的阻塞队列而定,如果成功入队,需要再次校验线程池是否正在运行,如果没有运行,删除并拒绝任务

    remove(comman)

    reject(command)

    ,如果线程池在运行,但是没有线程在运行,开启一个新线程,保持始终有活跃的线程在处理任务,但不是处理当前的任务,因为可能会破坏任务处理的顺序优先级策略,只要有活跃的线程,当前的任务已成功入队,早晚都会被处理。
  • 如果第二步没有成功入队,再尝试创建新线程执行任务
public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0) // 没有线程在运行
                addWorker(null, false);
        } else if (!addWorker(command, false)) { 
            reject(command);
        }
    }
    
}
           

有返回值任务

Runnable作为Executor接口的基本任务表示形式,具有很大的局限性,虽然Runnable能够将任务的计算结果存放在某个共享的数据结构、数据库中,但它不能返回一个值,或者抛出受检的异常(IOExecption或SQLException)。对于耗时的任务,比如查询数据库、从网络上获取资源、计算某个复杂功能等,Callable是一个更好的抽象。它支持返回一个值、抛出受检异常。

Callable与Future接口,以及ExecutorService接口扩展了Executor接口中的任务提交方法。

public interface Callable<V> {
    V call() throws Exception;
}

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

public interface ExecutorService extends Executor {
   // ...省略其它方法,与本节无关
    // 单任务提交
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    
    // 批量提交
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}    
           

Future表示一个任务的生命周期,接口规范的隐含意义是,任务的生命周期只能前进不能后退,这也与ExecutorService的生命周期一样,都是只进不退的。只不过一个用来表示任务的生命周期,一个用来表示线程池的生命周期。

  • cancel 取消任务
  • isCancelled 判断任务是否取消
  • isDone 是否已完成
  • get 阻塞的获取任务的执行结果,直至抛异常或超时,如果任务抛出了非InterruptException,get将该异常封装为ExecutionException,需要通过getCause才能获取被封装的初始异常。

ExecutorService中的任务可以分为批量提交和单任务提交。

单任务提交

对于单任务提交,调用submit方法。

单任务的提交离不开FutureTask,类图如下,它实现了future的get、cancel、isCancelled等方法,也实现了RunableF ,通过submit返回的Future对象获取任务的执行结果。

FutureTask是Future接口的一个最重要的实现类。对于需要获取执行结果的任务,都是通过AbstractExecutorService调用FutureTask的相关方法实现的

JAVA并发(一)任务执行框架Executor任务分类无限制创建线程的不足Executor框架执行策略----线程池扩展ThreadPoolExecutor参考文献
// AbsractExecutorService中的submit实现

/**	提交一个callable task,返回一个Future对象,
	任务的执行结果通过future.get获取
	结果取决于任务的执行
*/
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask); // 将任务交给线程池处理
    return ftask; // 返回可获取任务执行结果的Future
}

/**	提交一个callable task,返回一个Future对象,
	任务的执行结果通过future.get获取
	结果是固定的,即传入的result
*/
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

/**	提交一个callable task,返回一个Future对象,
	任务的执行结果通过future.get获取
	结果取决于任务的执行
*/
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
           

newTaskFor(task)将Runnable或Callable包装为RunnableFuture,这一过程是FutureTask做的

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
	return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
           

最终,对FutureTask中的callable与state属性分别进行设置。其中Executors.callable则是将根据Runnable对象构造callable对象的静态方法。

public FutureTask(Runnable runnable, V result) {
	this.callable = Executors.callable(runnable, result);
	this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
           

Executors.callable的实现如下,RunnableAdapter中的result的意思是,如果任务完成时,需要指定返回结果,才传入result,否则,将result传入null即可。因此,RunnableAdapter只是一种特殊的Callable,在平时的开发中,可以使用Executors.callable方法将Runnable对象转为Callable对象

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result; // RunnableAdapter的call方法的返回值是固定的,覆盖callable.call的返回值
    }
}
           

批量任务提交

invokeAll与invokeAny

在FutureTask中,单任务提交使用的submit方法,批量任务提交使用的invokeAll、invokeAny方法。

  • invokeAll:直到所有的任务都处理完,或者调用线程被中断、又或者超过指定时限,才会返回

    List<Future>

    ,当返回时,任何未完成的线程都会被取消,所以,每个任务要么完成,要么取消。通过调用各个Future的get或isCancelled方法来判断究竟是哪种情况。
  • invokeAny:批量任务中,只要有一个任务完成,返回该任务的返回值,对于未完成的任务,将会被取消。
// invokeAll的使用示例
public void testBatchTask(List<CustomCallableTask> tasks) {
    List<Future<CustomObject>> futures = executorService.invokeAll(tasks, 10, TimeUnit.SECOND); //阻塞操作
    
    Iterator<CustomCallableTask> taskIter = tasks.iterator();
    for(Future<CustomObject> future : futures) {
        CustomCallableTask task = taskIter.next();
        // task与future 一一对应
        if (future.isCancelled) {
            // ...任务失败的策略
        } else {
            // ...任务成功时的策略
        }
    }
}
           

invokeAll会按照任务集合的顺序一个一个的调用Future.get()获取执行结果,然后按照顺序输出执行结果。优点是有序性保证了各个Future可以与Callable关连起来,缺点是invokeAll()阻塞的时间可能会很久,如果设置的过短了,有些任务又无法完成。

CompletionService

CompletionService和invokeAll是互补的,它的批量任务之间都是独立的,任务执行结果也是无序的。所以,具体选择哪一个要看业务需求。

public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    /**
    * 获取下一个已完成任务的执行结果
    */
    // 如果没有已完成的任务就等待
    Future<V> take() throws InterruptedException;
    // 如果没有已完成的任务就返回null
    Future<V> poll();
    // 如果没有已完成的任务就等待一定的时间,如果等到超时都没有任务完成,返回null
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

           

ExecutorCompletionService类实现了CompletionService接口,它的实现非常简单,完成一个任务就添加一个Future到BlockingQueue中,获取时就从BlockingQueue中获取,所以先完成的任务先被获取到。BlockingQueue是一个重要的接口,后文将详细说明。

使用示例:

private CompletionService cps = new ExecutorCompletionService<CustomObject>(Executors.newFixedThreadPool());

public List<Futures<CustomObject>> testCompletService(List<CustomCallableTask> tasks) {
    for(CustomCallableTask task : tasks) {
        cps.submit(task);
    }
    
    // 假设只需要获取前5个完成的任务结果,后面的不关心
    int count = 0;
    List<Futures<CustomObject>> fs = new ArrayList<>();
    while(count < 6) {
        try {
           Future<CustomObject> future = cps.take(); 
           fs.add(future);
        } catch (InterruptedException e1) {
           // ... 
        } catch (ExecutionException e2) {
           // ... 
        }
    }
    return fs;
}
           

任务运行

任务运行的逻辑也是FutureTask实现的,调用callable.call,

resulte=callable.call()

,获取result后,调用

set(result)

,最终会调用到

LockSupport.unpark(t)

,t是调用Future.get()方法的线程。

public void run() {
    // 省略...
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex); // 抛异常
            }
            if (ran)
                set(result); // 在完成任务的处理时,调用set方法,设置返回结果,唤醒被阻塞的get
        }
    } finally {
        // 省略...
    }
}

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion(); // 调用LockSupport.unpark(t);停止阻塞
    }
}
           

任务的结果

调用Future.get()获取任务的执行结果,实际的逻辑是:

  • 调用 LockSupport.parkNanos()或LockSupport.park(); 阻塞调用get方法的线程。
  • 等待任务运行方法

    run()

    执行到

    set(resulte)或setException()

    ,在这些set方法中,会调用LockSupport.unpark(t),这个t就是调用get方法的线程,取消阻塞。

简而言之,FutureTask是通过LockSupport实现的wait-notify机制的。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s); 
}

public V get(long timeout, TimeUnit unit)
    // 省略..
    int s = state;
if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    throw new TimeoutException();
return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 省略..
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos); // 带有超时的阻塞
        }
        else
            LockSupport.park(this); // 不带超时的阻塞
    }
}

// 根据state设置返回值
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
           

小结

Executors的任务分为两类,一种是不需要通过Future获取执行结果的,一种是需要通过Future获取执行结果的。

对于不需要获取执行结果的任务,直接提交给线程池,线程池分配一个线程处理即可。建议调用execute方法,该方法在

ThreadPoolExecutor

中实现,也可以调用

submit

方法,但是会造成资源的浪费,毕竟它需要去耗费资源维护线程的执行进度,以及存在一套

wait-notify

机制,有一种特例是,不需要执行结果,但是是批量提交的任务,这个时候就要考虑使用有返回值任务中的批量任务提交了(

invokeAll、invokeAny、completionService

)。

对于需要获取执行结果的任务,依赖于FutureTask类,它是Future的一个具体实现类:

  • 如果是单任务的提交,调用submit方法
  • 如果是批量任务提交,且需要保证所有任务都完成,调用invokeAll方法,返回的

    List<Future>

    严格按照输入的

    List<Callable>

    顺序输出;
  • 如果是批量任务提交,只需要任何一个任务完成,调用invokeAny方法
  • 如果是批量任务提交,且不关心任务与返回结果的一一对应关系,更关心尽可能快的获取到所有任务的执行结果,使用completionService。

对于延时或定时任务,交给:

Executors.newScheduledThreadPool()

new ScheduledExecutorService()

对于线程的执行策略,则完全取决于选用的线程池。

执行策略----线程池

上一章介绍了Executor的使用,但是还没有介绍如何去选择Executor,即如何选择线程池。Executor的实例化有种方式,第一种是通过Executors的静态方法创建ExecutorService对象,第二类是通过new ThreadPoolExecutor创建对象。不过Executors的静态方法创建线程池时,底层还是通过new ThreadPoolExecutor创建对象,所以,本章将先介绍核心ThreadPoolExecutor,再介绍这些线程池的使用。

线程池的大小

线程池面临的首要问题就是,线程池大小是多少?

前文已介绍过,线程池大小的唯一标准是尽可能提高CPU的利用率,过小,CPU空闲,过大,CPU耗费资源创建、维护、切换、销毁这些活跃的线程。因此,要正确的设置线程池的大小,必须估算出任务的等待时间与计算时间的比值。公式如下:$ \eta指CPU利用率,N为数量,t为时间$

t c o m p u t e + t w a i t t c o m p u t e = N t h r e a d s N c p u s ∗ η = > N t h r e a d s = N c p u s ∗ η ∗ ( 1 + t w a i t t c o m p u t e ) \frac {t_{compute} + t_{wait}} {t_{compute}} = \frac {N_{threads}} {N_{cpus}*\eta} \\ => N_{threads} = N_{cpus} * \eta * (1+\frac {t_{wait}} {t_{compute}}) tcompute​tcompute​+twait​​=Ncpus​∗ηNthreads​​=>Nthreads​=Ncpus​∗η∗(1+tcompute​twait​​)

配置ThreadPoolExecutor

ThreadPoolExecutor是一个灵活稳定、可各种定制的线程池。它定义了很多构造函数,一个比较典型的构造函数是:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {...}
           

线程创建与销毁

corePoolSize、maximumPoolSize、keepAliveTime、TimeUnit决定了线程的创建与销毁。

  • corePoolSize是线程池的目标大小,在没有任务执行时线程池的大小,并且只有在workQueue满了以后,才会创建超出这个数量的线程。如果大小设置为0,那么一开始提交的任务将会全部进入workQueue,当workQueue满了以后才会创建线程去处理这些任务,所以,是否设置为0也要谨慎考虑。
  • maximumPoolSize是线程池的最大大小,它表示可同时活跃的线程数量的上限,超过了这个上限,将会根据RejectedExecutionHandler拒绝策略拒绝任务。
  • keepAliveTime、TimeUnit表示如果某个线程的空闲时间超过了keepAliveTime,那么将会被标记位可回收的,只要线程池的当前大小超过corePoolSize,这些线程就会被终止。虽说这些回收能够减少一些资源消耗,不过,不断地回收->创建->回收->创建活跃的线程反而会产生额外的延迟,具体设置的多大,视情况而定。

以Executors中的newFixedThreadPool与newCachedThreadPool做一个示例:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory 																			threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
           
  • newFixedThreadPool:固定大小的线程池,所以它的

    corePoolSize

    maximumPoolSize

    的大小相等,keepAliveTime为0表示不会超时。
  • newCachedThreadPool:将线程池的最小大小设置为0,最大大小设置为

    Int

    的最大值,将超时时间设置为超过1分钟,这种方式创造出来的线程池可以被无线扩展,并且当需求降低时自动收缩。

阻塞队列BlockingQueue

ThreadPoolExecutor的第四个参数是BlockingQueue,BlockingQueue是阻塞队列。它的作用:当线程池的大小达到最小大小后,后续提交的任务将被放置在阻塞队列,从而限制资源的消耗。

BlockingQueue分为3种:无界队列、有界队列、同步移交。

JAVA并发(一)任务执行框架Executor任务分类无限制创建线程的不足Executor框架执行策略----线程池扩展ThreadPoolExecutor参考文献

BlockingQueue的示意图如下,它继承了Queue接口,具有队列(FIFO)的特性,同时,它又加入了Block的特性,如图中红线,当队列满时,入队请求会被阻塞,当队列空时,出队请求被阻塞。

BlockingQueue接口的众多实现类会根据自身的特性进行改造,并非一定严格遵循FIFO(比如PriorityBlockingQueue根据自然顺序或Comparable接口的实现来决定出队顺序,比如SynchronousQueue甚至会优先将工作提交给线程池(maximumPoolSize>线程池大小>minPoolSize)而不是入队)

JAVA并发(一)任务执行框架Executor任务分类无限制创建线程的不足Executor框架执行策略----线程池扩展ThreadPoolExecutor参考文献

BlockingDeque接口&LinkedBlockingDeque

BlockingDequeu 是双端阻塞队列,两端都可以进队列,也可以出队列,它只有一个实现类:LinkedBlockingDeque。

JAVA并发(一)任务执行框架Executor任务分类无限制创建线程的不足Executor框架执行策略----线程池扩展ThreadPoolExecutor参考文献

LinkedBlockingDeque有两个构造器,若使用无参构造器,是无界队列,若使用含参构造器,是有界队列。

它依靠内部类Node完成

linkFirst、linkLast、unlinkFirst、unlinkLast

四个核心接口。这些操作都需要先获取锁才能执行,具体的链表入队出队操作比较简单,就不详细说了。

public class LinkedBlockingDeque<E> extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable { 
	
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
	/** Doubly-linked list node class */
	static final class Node<E> {
    	E item;
   		Node<E> prev;
    	Node<E> next;
    	Node(E x) {
        	item = x;
    	}
	}
}
           

LinkedBlockingQueue

第二个是LinkedBlockingQueue,构造器有两个,分别对应有界与无界队列,入队时,添加到队尾,出队时,从头部取出一个。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
    // 无界构造器
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

	// 有界构造器
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    static class Node<E> {
        E item;
        Node<E> next;
       	Node(E x) { item = x; }
    }

    // 入队,添加到队尾
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

	// 出队,head.item==null,head是在第一个有效节点前的节点,仅用于标记。出队时将head.next设置为head
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    } 
}
           

TransferQueue&LinkedTransferQueue

LinkedTransferQueue的功能可以说是所有队列中最完善的,LinkedTransferQueue实际上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。如果需要使用无界队列,第一个要考虑的就是LinkedTransferQueue。

它的性能更强,因为它是java7中新增的,而BlockingQueue的其他实现类是java5添加的,与它功能类似的是SynchronousQueue,在元素交付方面,它的性能是SynchronousQueue的3倍(非公平模式)和14倍(公平模式)。

构造器如下,是无界队列。

简要的原理是:

  • 入队:入队包含了BlockingQueue的入队方式,也包含了transfer方式(transfer未必入队)
  • 出队:若队列不为空,直接取走,若为空,就创建一个消费者线程等待。

带有transfer的方法是用于一次性交付的,在队列中已有元素的情况下,调用这些方法,可以确保队列中已存在的元素都能被处理。下面的代码块中:

  • transfer(E e)若当前存在一个正在等待获取的消费者线程,即立刻将e移交之;否则将元素e插入到队列尾部,并且当前线程进入阻塞状态,直到有消费者线程取走该元素。
  • tryTransfer(E e)若当前存在一个正在等待获取的消费者线程,则该方法会即刻转移e,并返回true;若不存在则返回false,但是并不会将e插入到队列中。这个方法不会阻塞当前线程,要么快速返回true,要么快速返回false。
  • tryTransfer(E e, long timeout, TimeUnit unit)与 tryTransfer(E e)是一样的,只不过加了个超时,可以多等待一会。
public boolean tryTransfer(E e) {
    return xfer(e, true, NOW, 0) == null;
}

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}
           

PriorityBlockingQueue

优先级队列,包含有界与无界两种。对于有界队列,可以通过传入comparator对象,来定义优先级规则。

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
}
           

SynchronousQueue

  • 入队:要将一个元素放置于SynchronousQueue中,必需有消费者线程在等待,如果没有消费者线程在等待,要么创建新线程处理(线程池大小小于最大值),要么拒绝任务。
  • 出队:若队列不为空,直接取走,若为空,就创建一个消费者线程等待。

可以看出SynchronousQueue与LinkedTransferQueue的区别在入队:

  • SynchronousQueue实际没有队列,来一个任务就要立刻召唤一个线程处理,如果召唤不来,就拒绝,所以使用SynchronousQueue时,需要将线程池的最大大小设置为Integer.MAX_VALUE,避免任务饿死。
  • LinkedTransferQueue入队时,如果没有线程在等待,会阻塞当前线程、或直接返回、或等一段时间返回,SynchronousQueue入队时,会拒绝任务。

ArrayBlockingQueue

ArrayBlockingQueue的所有构造方法都带有参数capacity,因此,它只能是有界队列。它靠一个Object数组实现,原理相对比较简单,入队出队就是更新数组。

/** The queued items */
final Object[] items;

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

/**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
           

有界与无界的选择

选择BlockingQueue时,第一个问题是选择有界还是无界,原则是:

  • 选择无界:只有当任务相互独立时,为队列设置边界才是合理的,如果任务之间存在依赖性,那么有界队列可能会导致线程“饥饿”死锁的问题。此时应该使用无界队列:如newCachedThreadPool,它能提供比固定大小的线程池更好的排队性能,它选择SynchronousQueue作为等待队列。
  • 选择有界:需要限制线程池的资源消耗,那么可以选择固定大小的线程池

newFixedThreadPool和newCachedThreadPool区别

newFixedThreadPool、newCachedThreadPoo区别:

  • newFixedThreadPool,线程池大小固定,采用无界的LinkedBlockingQueue,线程任务会无限制排队,即便不断地提交,也不会无限制创建线程,优点是节省资源,缺点是任务处理的慢。
  • newCachedThreadPool,线程池最大大小无限制,corePoolSize为0,采用同步交付的SynchronousQueue,伸缩性强,任务处理的快,资源消耗大。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}  
           

顺便提下newSingleThreadExecutor,它的底层使用无界的LinkedBlockingQueue,且线程池的大小始终为1,效果就是:不管任务有多少,永远只有一个线程在处理,如果有线程意外中断,它会主动创建1个线程继续处理任务。

public static ExecutorService newSingleThreadExecutor() {    
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,
                     new LinkedBlockingQueue<Runnable>()));
}
           

饱和策略

饱和策略只在有界队列下才会生效,如果队列是无界的,永远不会饱和。一共有四种饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

  • AbortPolicy:默认策略,拒绝任务,抛出RejectedExecutionException
  • CallerRunsPolicy:调用者运行,将任务退回给提交任务的线程(执行submit、execute、invokeAny、invokeAll的线程),提交任务的线程会被用来处理这个任务,这样,减缓了任务的提交速度,当服务器过载时,这种过载情况会逐渐向外蔓延,从线程池工作队列到调用者线程,再到TCP层,最终达到客户端,服务器在高负载下不是直接down掉,而是慢慢的down掉,所以有平缓降低性能的作用。
  • DiscardPolicy:拒绝任务,但不抛异常
  • DiscardOldestPolicy:抛弃最旧的任务,但不抛异常

线程工厂

线程工厂实际上是new ThreadPoolExecutor的第四个参数,ThreadPoolExecutor提供了构造器可以不带该参数,与饱和策略一样,它也有默认值,线程命名就会比较抽象,所以可以根据自定义的线程工厂,来为任务命名,更好的debug。

public class CustomThreadFactory implements ThreadFac {
	private final Sting poolName;
    private AtomicInteger threadNum = new AtomicInteger(1);
    
    public CustomThreadFactory(String poolName) {
        this.poolName = poolName;
    }
    
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(poolName+threadNum.getAndIncrement());
        return t;
    }
}
           

扩展ThreadPoolExecutor

最后,还能扩展ThreadPoolExecutor,用来统计线程池中任务的执行情况。

public class ExtendExecutor extends ThreadPoolExecutor{
    private Logger logger = LoggerFactory.getLogger(ExtendExecutor.class);

    public ExtendExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // do something 比如记录线程开始执行任务的时间
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // do something 比如计算线程执行任务的时间、平均时间等。。
    }

    @Override
    protected void terminated() {
        // do something 比如统计线程被销毁的频率,用以更好的决策线程池的配置
        super.terminated();
    }
}
           

参考文献

  • 《并发编程实战》 Doug Lea
  • jdk 1.8并发包源码