天天看点

Executor框架

1.Executor框架主要由3大部分组成:

  •  任务。包括被执行任务需要实现的接口:Runnable接口或者Callable接口
  •  任务的执行。包括执行机制的核心接口Executor,以及继承Executor的ExecutorService接口。Executor框架有两个关键类,ThreadPoolExecutor和ScheduledThreadPoolExecutor。
  • 异步计算的结果。包括接口Future和实现Future接口的FutureTask类。

Executor框架示意图

Executor框架

2.Executor框架的使用示意图

Executor框架

    主线程创建实现Runnable或者Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装成一个Callable对象,使用Executors.callable(Runnable task)。然后交给ExecutorService用execute或者submit执行,前者不返回结果,后者可以返回实现了Future接口的对象(到目前为止返回的是FutureTask对象),FutureTask实现了Runnable,程序员可以创建FutureTask,然后直接交给ExecutorService。

   最后主线程可以执行FutureTask.get()方法等待任务完成(阻塞方法),也可以使用FutureTask.cancell(boolean mayInterruptIfRunning) 来取消任务的执行。

3.Executor框架的成员介绍

3.1 ThreadPoolExecutor线程池

    它通常通过工厂类Exectors创建,也可以自定义创建, Executors的创建有三个方法

  •        newFixedThreadPool:创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。适用于负载比较重的服务器。

    源代码如下,corePoolSize和maximumPoolSize都是相同的。使用LinkedBlockingQueue(阻塞最大值Integer.MAX_VALUE)作为阻塞队列,让线程多的都阻塞在这个队列里,所以这里一般不会有拒绝任务,RejectedExecutionHandler.rejectedExecution方法。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
        //使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
                                  new LinkedBlockingQueue<Runnable>());
}
           
  •       newSingleThreadExecutor:创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务,适用于需要保证顺序的执行各个任务,并且在任意时间点不会有多个线程时活动的场景

源代码如下,基本和前者类似,corePoolSize和maximumPoolSize设置为了1

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
           
  •       newCachedThreadPool:创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程,它是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。

      源代码如下,核心线程池为0,最大线程池可以达到Integer.MAX_VALUE个,keepAliveTime设置为60表示空闲线程最长等待任务时间60秒后就会被终止,阻塞队列使用没有容量的SynchronousQueue队列,而这个又是无界的,MAME当主线程提交任务的书读高于线程的处理速度,那么CachedThreadPool就会不听的创建线程,极端的情况下CachedThreadPool回应为过多创建线程而耗尽CPU和内存资源。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

 3.2 ScheduledThreadPoolExecutor周期执行线程池

      它继承ThreadPoolExecutor类,主要用来在给定延迟后运行任务,或者定期执行任务,类似Timer,但是可以创建多个后台线程数进行执行。通常通过工厂类Exectors创建

  •       newScheduledThreadPool:创建一个可延迟执行或定期执行的线程池,包含若干个线程。适用于需要多个后台程序执行周期的任务,同时为了满足资源管理的需求而限制后台线程的数量的场景。
  •       newSingleThreadScheduledExecutor:只包含一个线程的执行延迟或者定期的线程池。适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的场景。

3.3 FutureTask简介

      FutureTask除了实现Future接口外,还实现了Runnable接口,因此FutureTask可以交给Executor执行,也可以调用线程直接执行FutureTask.run() .

 1) Future有三种状态 

         未启动状态。FutureTask.run方法处于未执行之前,FutureTask处于未启动

         已经启动。FutureTask.run()方法被执行的过程中,FutureTask处于启动状态

         已完成。FutureTask.run执行完;或者FutureTask.cancell();或者执行run()方法抛出异常而结束。

图示如下:

Executor框架
  •       当FutureTask处于未启动或者启动状态时,执行FutureTask.get()方法将导致调用线程阻塞,当FutureTask处于已经完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或者抛出异常。
  •      当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会执行;
  •      当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断此任务线程的方式来试图停止任务;
  •      当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务 的线程产生影响(让正在执行的任务运行完成);
  •      当FutureTask处于已完成的状态时,执行FutureTaskcancel(...)方法将返回false.

2) FutureTask的使用

      FutureTask可以交给EXecutor执行,也可以通过ExecutorService.submit()方法返回一个FutureTask,然后执行FutureTask.get,或者cancel方法,除此之外还可以单独使用FutureTask。            

继续阅读