天天看点

juc--线程池

利用Executors类来创建三种线程池

这三种线程池分别是FixedThreadPool,SingleThreadPool,CachedThreadPool

类型 特点
FixedThreadPool 可以指定固定的线程数的线程池
SingleThreadPool 只提供一个线程的线程池
CachedThreadPool 能根据当前业务繁忙情况动态地增加和减少线程数的线程池

代码演示

/**
	 * 
	 * @Title: testThree
	 * @Description: 用Executors类创建三种线程池
	 * @return: void
	 */
	public static void testThree() {
		//利用Executors类可以创建三种线程池:固定线程数,1个线程,动态伸缩线程数
		
		//固定线程数的线程池(这里给它5个线程)
		ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
		try {
			for(int i = 0; i < 10; i++) {
				fixedThreadPool.execute(()->{
					System.out.println("fixedThreadPool " + Thread.currentThread().getName() + "正在执行...");
				});
			}
		} finally {
			fixedThreadPool.shutdown();
		}
		
		
		//只包含一个线程的线程池
//		ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
//		try {
//			for(int i = 0; i < 10; i++) {
//				singleThreadPool.execute(()->{
//					System.out.println("singleThreadPool " + Thread.currentThread().getName() + "正在执行...");
//				});
//			}
//		} finally {
//			singleThreadPool.shutdown();
//		}
//		
//		//动态伸缩线程数的线程池
//		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//		try {
//			for(int i = 0; i < 10; i++) {
//				cachedThreadPool.execute(()->{
//					System.out.println("cachedThreadPool " + Thread.currentThread().getName() + "正在执行...");
//				});
//				TimeUnit.SECONDS.sleep(1);
//			}
//		} catch (InterruptedException e) {
//			
//			e.printStackTrace();
//		} finally {
//			cachedThreadPool.shutdown();
//		}
	}
           

这三种线程池其实底层都是调用了ThreadPoolExecutor类的同一个构造方法

//newFixedThreadPool(int nThreads)的源码
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
}

//newSingleThreadExecutor()的源码
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>()));
 }

//newCachedThreadPool的源码
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}
           

可以看出,这三种线程池其实都可以看成是一种

ThreadPoolExecutor构造方法七个参数的含义

前面的三种线程池的底层都是调用的ThreadPoolExecutor五个参数的构造方法,我们点进去这个构造方法,发现其调用的是ThreadPoolExecutor七个参数的构造方法

/**
 * The default rejected execution handler
*/ 
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

//五个参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

//七个参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
           

我们重点来看看这七个参数的含义

int corePoolSize //线程池中的核心常驻线程数
int maximumPoolSize //线程池中同时运行的最大线程数(核心常驻线程+非核心线程)
long keepAliveTime //最大空闲时间,超过这个时间都空闲的非常驻线程都会被回收
TimeUnit unit //配合keepAliveTime使用,是其的时间单位
BlockingQueue<Runnable> workQueue //用来存放Runnable任务的阻塞队列
ThreadFactory threadFactory //产生线程的线程工厂,一般使用默认的(Executors.defaultThreadFactory())即可
RejectedExecutionHandler handler //拒绝策略,当线程池已经启用最大线程数并且阻塞队列已满时会触发拒绝策略,处理那些无法及时处理的任务
           

线程池的构造原理

线程池 == 许多可循环利用的线程 + 队列(存放Runable任务)

线程池结构

juc--线程池

线程池工作流程图解

1.来了两个任务,两个核心常驻线程分别处理这两个任务

juc--线程池

2.又来了两个任务,发现现在能提供服务的线程均被占用着,于是这两个任务进入队列排队等待

juc--线程池

3.又过来3个任务,刚好把队列占满了,于是看了看还有没有非核心线程未启动,发现还有,于是就启动非核心线程,把队列前三个任务分给三个非核心线程处理

juc--线程池
juc--线程池

4.这时候又来了3个任务,又一次把队列塞满

juc--线程池

5.又来了一个任务,发现所有可以处理任务的线程都已经被占用了,又看了看队列,发现队列也满了,再看看有没有非核心线程可以开启,发现非核心线程也被占用完了,于是线程池启动拒绝策略,由拒绝策略决定如何处理这个任务

juc--线程池

6.过了一段时间,任务都处理得差不多了,非核心线程空闲了下来,一旦非核心线程空闲超过keepAliveTime指定的时间,那这些非空闲线程将会被回收,重新回到不启用的状态

juc--线程池

总结线程池的主要处理流程

juc--线程池

几种拒绝策略

拒绝策略 说明
AbortPolicy(默认策略) 直接抛出异常阻止系统正常运行
CallerRunsPolicy 该策略既不会抛弃任务也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
DiscardOldestPolicy 抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务
DiscardPolicy 该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略

工作中我们应该选择创建哪种线程池呢

答案是:我们尽量不要使用Executors类来创建线程池

我们可以查看Executors类创建这三种线程池的源码

FixedThreadPool

new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>())
           

可以看到第五个参数是一个阻塞队列,但该队列并没有显示提供最大容量,我们看一下它默认给的最大容量

/**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
           

该队列默认给的最大容量为Integer.MAX_VALUE,这个队列将“无限”地增大,这将造成大量任务塞在队列里得不到及时处理,而且可能会造成OOM内存溢出

SingleThread

new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>()));
           

通过源码可知,SingleThread也存在和FixedThreadPool一样的问题

CachedThreadPool

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
           

可以看到,其第二个参数说明了该线程池的最大容量是Integer.MAX_VALUE,这是一个非常庞大的数字,一旦出现高并发,该线程池就会创建数量庞大的线程,很容易就出现OOM内存溢出

自定义线程池(工作中推荐使用)

由于通过Executors类创建的这三种线程池有各种问题,我们并不能够干预它,所以,推荐直接通过

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

构造方法创建一个线程池,线程池的各项配置我们可以直接通过传参的方式来定制和修改

按照上面“线程池工作流程图解”里线程池的配置自定义了一个线程池

new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECOND, new LinkedBlockingQueue<Runnable>(5),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());