天天看点

JUC中线程池原理解析JUC中线程池原理解析juc 中线程池源码分析必知必会完结学习资料推广

JUC中线程池原理解析

首先我们先说一下一个线程池基本上会有哪些基本组件构成

  1. 线程管理器,即线程池,维护线程池中线程数量,当线程空闲数多时尽心线程的回收,当线程紧张时进行线程的创建
  2. 任务添加与拒绝策略
  3. 任务队列 BlockingQueue
  4. 线程的构造工厂 ThreadFactory
  5. 线程集合
  6. worker 即线程中相当于runabble 的固定的模板方法
JUC中线程池原理解析JUC中线程池原理解析juc 中线程池源码分析必知必会完结学习资料推广

这里我写了一个简易版的线程池,便于让大家了解线程池原理,放到了csdn上

https://blog.csdn.net/c1523456/article/details/81415759

juc 中线程池源码分析

首先我们看juc 中ThreadPoolExecutor类的继承关系图

JUC中线程池原理解析JUC中线程池原理解析juc 中线程池源码分析必知必会完结学习资料推广

execute源码分析

分析

如果我们要使用线程肯定是要到线程中去提交任务的,所以我们就以execute 方法为入口点,进行剖析。

public interface Executor {
	//我们添加任务时调用的函数
    void execute(Runnable command);
}
           
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
     //当前线程数小于工作线程数的话,添加一个worker
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
     //当前线程大于核心线程数,将任务放入队列中,如果队列没有满则进入if里面,队列满调到else if
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get()
                //线程池被关闭时拒绝提交任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
     //当前线程大于核心线程数,并且放入队列失败,即队列满了,构造新的线程并执行该任务,但是线程数量大于maximumPoolSiz调用拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
           
//addWorker 代码核心逻辑为 创建一个worker,填充我们的任务 并启动他的线程,然后加入工作者队列
private boolean addWorker(Runnable firstTask, boolean core) {
          	//当前线程数量大于等于 maximumPoolSize 返回false
           if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
              
            w = new Worker(firstTask);
            final Thread t = w.thread;
    		
            workerStarted = workers.add(w);
            t.start();
            return workerStarted ;
    }

           

总结

关于何时分配线程,何时添加队列策略
通过execute() 方法的剖析,其大致逻辑为当在execute(Runnable)方法中提交一个新任务,并且运行的线程少于corePoolSize线程时,就会创建一个新线程来处理该请求,即使其他工作线程处于空闲状态。 如果运行的线程大于corePoolSize但小于maximumPoolSize,则只有在队列满时才会创建一个新线程。下面我们对worker 进行分析。
关于何时调用拒绝策略
在方法execute(Runnable)中提交的新任务将在Executor关闭时被拒绝,当Executor对最大线程和工作队列容量都使用有限的界限,并且达到饱和时也会被拒绝。

worker 源码分析

分析

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        //线程
        final Thread thread;
      //我们提交的任务,可能为空
        Runnable firstTask;
     
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            //通过线程工厂获取线程,,,,注意这里把this 放入了 ,也就是我们的run 方法是在另外一个线程中调用的屁
            this.thread = getThreadFactory().newThread(this);
        }
		//线程池中线程具体执行逻辑,具体的逻辑在runWorker中
        public void run() {
            runWorker(this);
        }
    //暂且忽略锁的逻辑
    }
           
final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
      
        try {
            //通过getTask获取任务,值得注意的是getTask  ==  null 也是线程的退出条件哦
            while (task != null || (task = getTask()) != null) {
            
                try {
                    //在任务执行前的回调函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //在任务执行后的回调函数
                        afterExecute(task, thrown);
                    }
                } finally {
                
                }
            }
            completedAbruptly = false;
        } finally {
            //线程因为异常退出的回调函数
            processWorkerExit(w, completedAbruptly);
        }
    }
           

总结

worker 的执行逻辑很简单,只是while 的去任务队列获取数据,执行其run方法,但是需要特别指指出的是 在任务执行前的回调函数

在任务执行后的回调函数 线程因为异常退出的回调函数 ,我们可以利用这几个钩子方法做任务执行前后的特殊处理哦。还有 值得注意的是getTask == null 也是线程的退出条件哦。

getTask() 方法源码分析

分析

大家可能会问了,一个简单的getTask 方法有什么好说的,不就是从队列中获取数据吗,但是不止这么简单,结合上面线程的退出条件,可以做到动态的线程池中线程的回收工作。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

         	//是否启用超时
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
           //条件1  当前线程数大于核心线程数并且获取数据等待keepAliveTime时间后超时   
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果启用超时的话,就设置超时时间,否则直接阻塞等待
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
           

总结

简单说这段代码的意思就是 如果当前池中有超过corePoolSize的线程,如果空闲的线程超过了keepAliveTime的时间间隔,那么多余的线程将通过 while (gettask != null)条件成立而退出被终止。

总体思路

JUC中线程池原理解析JUC中线程池原理解析juc 中线程池源码分析必知必会完结学习资料推广

必知必会

学习了线程池的实现原理,我们从线程池的构造函数来分析,在实际开发中我们需要注意哪些问题?

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
           

workQueue的种类及其影响

SynchronousQueue
将任务传递给线程,并不会缓存任务,所以其或直接调用线程,如果当前没有可用线程,并且线程数量小于最大线程数时,创建新的线程,如果否,则调用拒绝策略
LinkedBlockingQueue
因为是无存储大小的,所以不会到任务存储满,线程数维持在核心线程数,它可能会导致工作队列无限制增长而内存溢出
ArrayBlockingQueue
有界队列有助于防止资源耗尽,但是由于阻塞,可能会导致低吞吐量

线程池状态机

JUC中线程池原理解析JUC中线程池原理解析juc 中线程池源码分析必知必会完结学习资料推广

线程池线程数量设置

cpu密集型如加密、计算hash:最佳线程数为cpu核心线程数的1-2倍左右

IO密集型如读取文件、网络:最佳线程数一般会大于cpu核心数N倍,以jvm线程监控情况为依据

通用 线程数 = cpu 核心线程数 * (1 + 平均等待时间 /平均工作时间)

关于JUC 提供的几种任务拒绝策略

  1. AbortPolicy 抛出RejectedExecutionException异常
  2. CallerRunsPolicy 在当前线程中调用任务 抛出RejectedExecutionException异常
  3. DiscardPolicy 什么都不做
  4. DiscardOldestPolicy 从队列中弹出一个任务丢弃,放入我们的

关于JUC Executors 中提供的几种类型的线程池适用场景分析

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           

固定线程数量的线程池,但是由于其是无边界队列,可以会有内存溢出问题

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
           

固定线程数量的线程池,但是由于其是无边界队列,可以会有内存溢出问题

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

如果当前线程没有空闲线程则会创建线程,可能由于线程数量过大而造成内存溢出

完结

到此为止我们分析就完成了,谢谢大家,如果觉得有帮助的话,可以点个赞哦

学习资料推广

我已经将springamqp 源码解析录制为视频上传到bibi,分为六个章节详细介绍了各个模块的具体内容

JUC中线程池原理解析JUC中线程池原理解析juc 中线程池源码分析必知必会完结学习资料推广

https://www.bilibili.com/video/BV1hN411Z7fn?share_source=copy_web

感兴趣的小伙伴可以看看

学习一下

录制不易,记得三联哦!