JUC中线程池原理解析
首先我们先说一下一个线程池基本上会有哪些基本组件构成
- 线程管理器,即线程池,维护线程池中线程数量,当线程空闲数多时尽心线程的回收,当线程紧张时进行线程的创建
- 任务添加与拒绝策略
- 任务队列 BlockingQueue
- 线程的构造工厂 ThreadFactory
- 线程集合
- worker 即线程中相当于runabble 的固定的模板方法
这里我写了一个简易版的线程池,便于让大家了解线程池原理,放到了csdn上
https://blog.csdn.net/c1523456/article/details/81415759
juc 中线程池源码分析
首先我们看juc 中ThreadPoolExecutor类的继承关系图
execute源码分析
分析
如果我们要使用线程肯定是要到线程中去提交任务的,所以我们就以execute 方法为入口点,进行剖析。
public interface Executor {
//我们添加任务时调用的函数
void execute(Runnable command);
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当前线程数小于工作线程数的话,添加一个worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//当前线程大于核心线程数,将任务放入队列中,如果队列没有满则进入if里面,队列满调到else if
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get()
//线程池被关闭时拒绝提交任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//当前线程大于核心线程数,并且放入队列失败,即队列满了,构造新的线程并执行该任务,但是线程数量大于maximumPoolSiz调用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
//addWorker 代码核心逻辑为 创建一个worker,填充我们的任务 并启动他的线程,然后加入工作者队列
private boolean addWorker(Runnable firstTask, boolean core) {
//当前线程数量大于等于 maximumPoolSize 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
w = new Worker(firstTask);
final Thread t = w.thread;
workerStarted = workers.add(w);
t.start();
return workerStarted ;
}
总结
关于何时分配线程,何时添加队列策略
通过execute() 方法的剖析,其大致逻辑为当在execute(Runnable)方法中提交一个新任务,并且运行的线程少于corePoolSize线程时,就会创建一个新线程来处理该请求,即使其他工作线程处于空闲状态。 如果运行的线程大于corePoolSize但小于maximumPoolSize,则只有在队列满时才会创建一个新线程。下面我们对worker 进行分析。
关于何时调用拒绝策略在方法execute(Runnable)中提交的新任务将在Executor关闭时被拒绝,当Executor对最大线程和工作队列容量都使用有限的界限,并且达到饱和时也会被拒绝。
worker 源码分析
分析
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
//线程
final Thread thread;
//我们提交的任务,可能为空
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
//通过线程工厂获取线程,,,,注意这里把this 放入了 ,也就是我们的run 方法是在另外一个线程中调用的屁
this.thread = getThreadFactory().newThread(this);
}
//线程池中线程具体执行逻辑,具体的逻辑在runWorker中
public void run() {
runWorker(this);
}
//暂且忽略锁的逻辑
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
try {
//通过getTask获取任务,值得注意的是getTask == null 也是线程的退出条件哦
while (task != null || (task = getTask()) != null) {
try {
//在任务执行前的回调函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//在任务执行后的回调函数
afterExecute(task, thrown);
}
} finally {
}
}
completedAbruptly = false;
} finally {
//线程因为异常退出的回调函数
processWorkerExit(w, completedAbruptly);
}
}
总结
worker 的执行逻辑很简单,只是while 的去任务队列获取数据,执行其run方法,但是需要特别指指出的是 在任务执行前的回调函数
在任务执行后的回调函数 线程因为异常退出的回调函数 ,我们可以利用这几个钩子方法做任务执行前后的特殊处理哦。还有 值得注意的是getTask == null 也是线程的退出条件哦。
getTask() 方法源码分析
分析
大家可能会问了,一个简单的getTask 方法有什么好说的,不就是从队列中获取数据吗,但是不止这么简单,结合上面线程的退出条件,可以做到动态的线程池中线程的回收工作。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//是否启用超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//条件1 当前线程数大于核心线程数并且获取数据等待keepAliveTime时间后超时
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果启用超时的话,就设置超时时间,否则直接阻塞等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结
简单说这段代码的意思就是 如果当前池中有超过corePoolSize的线程,如果空闲的线程超过了keepAliveTime的时间间隔,那么多余的线程将通过 while (gettask != null)条件成立而退出被终止。
总体思路
必知必会
学习了线程池的实现原理,我们从线程池的构造函数来分析,在实际开发中我们需要注意哪些问题?
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
workQueue的种类及其影响
SynchronousQueue将任务传递给线程,并不会缓存任务,所以其或直接调用线程,如果当前没有可用线程,并且线程数量小于最大线程数时,创建新的线程,如果否,则调用拒绝策略LinkedBlockingQueue因为是无存储大小的,所以不会到任务存储满,线程数维持在核心线程数,它可能会导致工作队列无限制增长而内存溢出ArrayBlockingQueue有界队列有助于防止资源耗尽,但是由于阻塞,可能会导致低吞吐量
线程池状态机
线程池线程数量设置
cpu密集型如加密、计算hash:最佳线程数为cpu核心线程数的1-2倍左右
IO密集型如读取文件、网络:最佳线程数一般会大于cpu核心数N倍,以jvm线程监控情况为依据
通用 线程数 = cpu 核心线程数 * (1 + 平均等待时间 /平均工作时间)
关于JUC 提供的几种任务拒绝策略
- AbortPolicy 抛出RejectedExecutionException异常
- CallerRunsPolicy 在当前线程中调用任务 抛出RejectedExecutionException异常
- DiscardPolicy 什么都不做
- DiscardOldestPolicy 从队列中弹出一个任务丢弃,放入我们的
关于JUC Executors 中提供的几种类型的线程池适用场景分析
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定线程数量的线程池,但是由于其是无边界队列,可以会有内存溢出问题
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
固定线程数量的线程池,但是由于其是无边界队列,可以会有内存溢出问题
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如果当前线程没有空闲线程则会创建线程,可能由于线程数量过大而造成内存溢出
完结
到此为止我们分析就完成了,谢谢大家,如果觉得有帮助的话,可以点个赞哦
学习资料推广
我已经将springamqp 源码解析录制为视频上传到bibi,分为六个章节详细介绍了各个模块的具体内容
https://www.bilibili.com/video/BV1hN411Z7fn?share_source=copy_web
感兴趣的小伙伴可以看看
学习一下
录制不易,记得三联哦!