天天看点

二、Java线程池

2.1、线程是不是越多越好?

答:不是,原因如下:

  1. 线程的创建+销毁时间 > 执行任务的时间,这时就不划算了;
  2. JVM规范,一个线程默认最大栈大小是1M,需要从系统内存分配,线程过多会消耗很多内存。
  3. OS会频繁切换线程上下文,导致每个线程比较慢,影响性能

合理的线程数量:

  1. 计算密集型:CPU(核心)数量的1~2倍。
  2. IO密集型:需要多一些线程,根据具体的IO阻塞时长决定。Tomcat默认200个。

也可考虑根据需要在一个最小数量和最大数量间自动增减线程数。

一般,CPU利用率达到80%,可认为充分利用了CPU。小于80%,说明利用不合理。大于80%,说明CPU利用过头了。

2.2、线程池原理

Java线程池的接口定义

二、Java线程池

相关API:

  • ExecutorService

boolean awaitTermination(long timeout, TimeUnit unit)

监测是否已经关闭,直到所有任务完成,或发生超时,或当前线程被中断。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

执行给定的任务集合,执行完毕后,返回结果。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

                                  long timeout, TimeUnit unit)

执行给定的任务集合,执行完毕或超时后,返回结果,其他任务终止。

<T> T invokeAny(Collection<? extends Callable<T>> tasks)

执行给定的任务,任一执行成功则返回结果,其他任务终止

<T> T invokeAny(Collection<? extends Callable<T>> tasks,  long timeout, TimeUnit unit)

执行给定的任务,任一执行成功或超时后,返回结果,其他任务终止

boolean isShutdown()

线程池是否关闭

boolean isTerminated()

如果关闭后所有任务已完成,返回true。

void shutdown()

优雅关闭线程池,已提交的任务继续执行,但不接收新任务(走拒绝策略)

List<Runnable> shutdownNow()

尝试停止所有正执行的任务(抛出InterruptedException),

停止等待任务的处理,返回等待执行任务的列表。

<T> Future<T> submit(Callable<T> task)

提交一个用于执行Callable任务,返回一个Future,用于获取Callable执行结果

Future<T> submit(Runnable task)

提交可运行任务并执行,返回一个Future对象,执行结果为null。

<T> Future<T> submit(Runnable task, T result)

提交可运行任务并执行,返回Future,执行结果传入result。

  • ScheduledExecutorService

public <V> ScheduledFuture<V> schedule(Callable<V> callable,   long delay, TimeUnit unit)

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

创建并执行一个一次性任务,过了延迟时间就执行。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,

                                                  long initialDelay,

                                                  long period,

                                                  TimeUnit unit)

创建并执行周期性任务,过了给定初始延迟时间会进行首次执行,后续进行周期性执行。

执行过程中发生异常,任务停止

一次任务执行时长超过了周期时长,下次任务会等到该次任务执行结束后立即执行。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,

                                                     long initialDelay,

                                                     long delay,

                                                     TimeUnit unit)

与前面的区别是:一次任务执行时长超过了周期时长,下次任务会在该次任务执行结束时间再进行延时。

2.3、标准线程池

public ThreadPoolExecutor(int corePoolSize, 

                          int maximumPoolSize, 

                          long keepAliveTime, 

                          TimeUnit unit, 

                          BlockingQueue<Runnable> workQueue, 

                          ThreadFactory threadFactory,  // 6

                          RejectedExecutionHandler handler );

corePoolSize:     核心线程数

maximumPoolSize:最大线程数

keepAliveTime:   超过核心线程数的线程存活时间

unit:            超过核心线程数的线程存活时间单位

workQueue:      等待队列

threadFactory:   线程工厂

handler:        当等待队列已满,并且到达最大线程数,任务进来时将执行拒绝处理。

2.3.1 标准线程池的执行流程

二、Java线程池

源码:

<code>

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        if (workerCountOf(c) < corePoolSize) {  

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

</code>

  1. 如果当前任务数小于核心池的大小,那么生成一个新的worker线程,并执行当前提交的任务
  2. 如果当前任务数超过了核心池的大小,或者addWorker()失败了,则判断线程状态是否是running,并将任务放到queue中,此时做了一个double check,目的是:
  • 一是防止任务加入到queue之后,状态忽然变更(如变成shutdown),此时会reject;
  • 二是防止任务加入到队列之后,所有的工作线程都die了,此时会起一个新的worker线程
  1. 如果queue也满了,此时会在maxiumPoolSize限定下,尝试起一个worker线程,如果失败了,则reject。

综上可看出:如果等待队列是无界的,则最大线程数就没有用了。因为等待队列永远不会满,所以当有任务进来时,只要到达核心线程数了,就会添加到队列中。只有等待队列是有界的,当有任务进来时,只要到达核心线程数,并且等待队列满了,才会创建线程直到线程数到达最大线程数为止。

2.4、ScheduleThreadPoolExecutor

public ScheduledThreadPoolExecutor(int corePoolSize,

                                   ThreadFactory threadFactory,

                                   RejectedExecutionHandler handler) {

    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

          new DelayedWorkQueue(), threadFactory, handler);

}

corePoolSize:  核心线程数

threadFactory: 线程工厂

handler:      拒绝策略

默认情况下:

    最大线程数:      Integer.MAX_VALUE

    线程存活时间:    0

    线程存活时间单位:纳秒

    等待队列:        延迟队列

2.5、Executors工具类

2.5.1    newSingleThreadExecutor

单线程的线程池:启动一个线程负责按顺序执行任务,先提交的任务先执行。

1)    任务会被提交到一个队列里,

2)    启动的那个线程会从队里里取任务, 然后执行

3)    执行完,在取下一个。

如果任务执行失败并导致线程结束,会创建一个新的线程去执行队列里后续的任务。

public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService

        (new ThreadPoolExecutor(1, 1,

                                0L, TimeUnit.MILLISECONDS,

                                new LinkedBlockingQueue<Runnable>()));

}

2.5.2    newFixedThreadPool

一个可重用的固定线程数量的线程池,核心线程数与最大线程数相等,且线程永远存活。

  1. 当无任务时,所有的线程都将等待。
  2. 当所有线程都在执行任务,此时再提交任务就在队列中等待,直到有可用线程。
  3. 当有线程由于异常而结束时,线程池就会再补充一条新线程。

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,

                                  new LinkedBlockingQueue<Runnable>());

}

2.5.3 newCachedThreadPool

一个不限制线程数量的动态线程池,核心线程数为0,最大线程数为Integer.MAX_VALUE,线程存活时间为1分钟。当对业务量无法估计时建议使用。

等待队列使用的是同步队列,其特性是:无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,添加元素时必须等待其他线程取走后才能继续添加。这样,当线程池没有空闲线程取任务,offer就会失败,线程池就会新建一个线程用于对入队失败的任务进行处理。

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>());

}

2.5.4 newScheduledThreadPool

一个固定线程数的ScheduledExecutorService对象,在指定延时之后执行或者以固定的频率周期性的执行提交的任务。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {

    return new ScheduledThreadPoolExecutor(corePoolSize);

}

继续阅读