天天看点

ExecutorExecutor体系线程池内部实现

Executor体系

java中,new一个线程对象是耗费资源的操作,对于需要大量线程创建的场景可以使用线程池来解决。

使用线程池不仅能够降低创建和销毁线程的性能开销,如果合理的设置线程池还能够避免无限制的创建线程资源,保持系统稳定。

jdk中内置了Executor框架,可以用于实现线程池,大体结构如下:

ExecutorExecutor体系线程池内部实现

Executor

Executor接口只定义了一个execute方法,可以用于执行一个Runnable类型对象:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     */
    void execute(Runnable command);
}           

ExecutorService

ExecutorService扩展了Executor接口,提供了更丰富的方法:

public interface ExecutorService extends Executor {

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     */
    void shutdown();

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's {@code get} method will return the task's result upon
     * successful completion.
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return the given result upon successful completion.
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     */
    Future<?> submit(Runnable task);
}           

submit和execute的区别

执行一个任务,可以使用submit和execute,这两者有什么区别呢?

1. execute只能接受Runnable类型的任务;

2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。           
/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }           

submit和execute

submit和execute两者的区别:

1. execute只能接受Runnable类型的任务;

2. submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null。           

ThreadPoolExecutor

ThreadPoolExecutor表示一个线程池,ThreadPoolExecutor实现了Executor接口,任何Runnable类型的线程都可以被ThreadPoolExecutor线程池调度。

jdk还提供了Executors类用于便捷的创建线程池,Executors相当于线程池工厂,通过Executors可以获得拥有特定功能的线程池,其主要API如下:

public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue. 
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue.
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}
           

API简析:

newFixedThreadPool:该方法返回一个固定数量的线程池,线程数不变,当有一个任务提交时,若线程池空闲则立即执行;若没有,则会被暂缓在一个任务队列中,等待有空闲的线程去执行。

newSingleThreadExecutor:创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。

newCachedThreadPool:返回一个corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE的线程池,因为corePoolSize为0,所以所有线程在空闲60s后就会被回收。
    线程池的线程数量不确定,但若有空闲线程则直接复用;如果所有线程都在工作,并且此时又有新的任务提交,则会创建新的线程处理任务,并且每一个空闲线程会在超时后后自动回收。

newSingleThreadScheduledThreadPool:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService智商扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。

newScheduledThreadPool:该方法也会返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。           

线程池内部实现

Executors的如下方法,实质上都是对ThreadPoolExecutor的封装:

newFixedThreadPool(int nThreads)
newSingleThreadExecutor()
newCachedThreadPool()           

ThreadPoolExecutor的构造方法如下:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);           

参数含义:

corePoolSize:指定了线程池中的线程数量;

maximumPoolSize:指定了线程池中的最大线程数量;

keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的线程能够存活的时间,超过时间则被销毁;

unit:keepAliveTime的单位;

workQueue:任务队列,用于保存被提交但尚未被执行的任务;

threadFactory:线程工厂,用于创建线程,一般用默认的即可;

handler:拒绝策略,当任务太多来不及处理时,如何拒绝任务。           

任务队列

workQueue接收的类型为BlockingQueue的阻塞队列,且只能存放Runnable类型对象。ThreadPoolExecutor中可能用到的阻塞队列如下。

SynchronousQueue

SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每一个插入操作都要等待一个相应的删除操作。提交的任务不会真实地保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则创建新的线程,如果线程数达到最大值,则执行拒绝策略。

Executors.newCachedThreadPool()

返回的线程池就是使用的这种队列。

ArrayBlockingQueue

这是一个有界的阻塞队列,数组实现,其构造函数如下:

/**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }           

当线程池的线程数小于corePoolSize,则优先创建新的线程,否则将任务加入等待队列。若等待队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务;若大于maximumPoolSize,则执行拒绝策略。

因此,有界队列仅在队列装满时,才可能将线程数量提高到corePoolSize以上,除非系统非常繁忙,否则有界队列能够确保核心线程数维持在corePoolSize。

LinkedBlockingQueue

这个也是有边界的队列,但是是链表实现的,如果初始化的时候不指定边界值则默认是

Interger.MAX_VALUE

/**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }           

PriorityBlockQueue

带有执行优先级的阻塞队列,可以控制任务执行的先后顺序,没有边界。其根据任务自身的优先级优先级顺序先后执行。

execute

从execute的实际执行过程中,可以观察到corePoolSize和maximumPoolSize的具体应用:

/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }           

workerCountOf(c)获取了当前线程的线程总数,当线程总数小于corePoolSize时,会将任务通过addWorker()直接调度执行,否则,通过workQueue.offer()将任务加入任务队列中等待。

如果加入队列失败(例如有界队列达到上限或使用了SynchronousQueue),则将任务直接提交给线程池,如果当前线程已经达到maximumPoolSize,则提交失败,执行拒绝策略。

其执行流程示意图如下:

ExecutorExecutor体系线程池内部实现

拒绝策略

当线程池的线程数达到maximumPoolSize,且任务队列也满了的情况下,此时已超超出了线程池的负载能力,就会使用拒绝策略。jdk中内置了四种拒绝策略,这四种策略均在ThreadPoolExecutor中:

public class ThreadPoolExecutor extends AbstractExecutorService {
    ...
    
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}           

AbortPolicy

该策略会直接抛出异常,组织系统正常工作,这是默认策略:

public class ThreadPoolExecutor extends AbstractExecutorService {
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
}           

CallerRunsPolicy

该策略会直接在调用者线程中,运行当前被丢弃的任务,这种策略下,任务不会被真正的丢弃,但是会影响任务提交的线程性能。

DiscardOldestPolicy

该策略会丢弃最老的一个请求,也就是即将被执行的那个请求,并尝试再次提交当前任务。

DiscardPolicy

该策略默默地丢弃无法处理的任务,不予任何处理,如果不允许任务丢失,则不能使用这种策略。

自定义策略

内置策略均是实现RejectedExecutionHandler接口:

public interface RejectedExecutionHandler {
    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}           

如果内置策略不能满足需求,可以选择通过实现RejectedExecutionHandler接口自定义策略:

public class CustomerRejectPoliceDemo {
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + " Thread ID: " + Thread.currentThread().getId());
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0l, 
                TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), 
                (r, executor) -> System.out.println(r.toString() + " is discard"));
        
        for (int i = 0; i < 30; i++) {
            executorService.submit(task);
        }
    }
}