天天看點

線程池ThreadPoolExecutor源碼解析

轉載請标明出處:

https://blog.csdn.net/bingospunky/article/details/80234457

本文出自馬彬彬的部落格

建立ThreadPoolExecutor

ExecutorService fixedThreadPool = new ThreadPoolExecutor(, , , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
           

我們使用Executors.newXXX方法建立線程池也是通過上面的new方法進行建立的,隻是傳遞的初始化參數不同罷了。

第1、2個參數是線程池線程的數量,第3、4個參數是當線程沒有runnable執行時,多長時間結束,第5個參數是當runnable多于線程數時,把runnable存放的隊列。

原理

ThreadPoolExecutor有一個private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));參數表示目前執行的線程,ctl是一個32位的整數,AtomicInteger是為了線程安全。這個ctl前3位相當于占位功能,後29位表示目前啟動的線程數量,比如11100000000000000000000000000000表示沒有執行的線程,11100000000000000000000000000001表示目前有一個線程在執行。當添加線程或者線程結束時,會響應的改變ctl的值。

有一個Worker的類,這個Worker繼承AbstractQueuedSynchronizer,實作鎖的功能,在執行runnable時互斥。這個Worker包含一個Thread,Thread的run方法是while循環,不斷的從queue裡擷取runnable執行。

當我們執行的runnable數量 超過線程數+queue.size 時如何

當我們使用Executors.newXXX建立線程池時一般不會遇到這個問題,因為這樣建立的隊列容量是Integer.MAX,當我們想控制線程池的queue容量時,就有可能會遇到超過上限的這種情況。

當出現這種情況時,會抛出異常。execute方法代碼如下:

Code1

ThreadPoolExecutor.ThreadPoolExecutor

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == )
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
           

我就不自己解釋了,借用一下代碼裡的一段注釋來解釋。

/*
 * Proceed in  steps:
 *
 *  If fewer than corePoolSize threads are running, try to
 * start a new thread with the given command as its first
 * task.  The call to addWorker atomically checks runState and
 * workerCount, and so prevents false alarms that would add
 * threads when it shouldn't, by returning false.
 *
 *  If a task can be successfully queued, then we still need
 * to double-check whether we should have added a thread
 * (because existing ones died since last checking) or that
 * the pool shut down since entry into this method. So we
 * recheck state and if necessary roll back the enqueuing if
 * stopped, or start a new thread if there are none.
 *
 *  If we cannot queue task, then we try to add a new
 * thread.  If it fails, we know we are shut down or saturated
 * and so reject the task.
 */
           

當我們的runnable出現異常時如何處理的

從使用經驗上來看,我們的runnable方法抛出了異常,後面的runnable依然可以執行。那麼這是如何實作的呢。

首先來看Worker裡的Thread的run方法。

Code2

ThreadPoolExecutor.runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
           

Code2第19行抛出異常後,會執行第37行的processWorkerExit方法,這時completedAbruptly=true,表示這時異常結束的前面的while循環。我們再看一下processWorkerExit方法的代碼。

Code3

ThreadPoolExecutor.processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ?  : corePoolSize;
                if (min ==  && ! workQueue.isEmpty())
                    min = ;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
           

由于completedAbruptly值為true,先執行第3行代碼,改變前文中提到的ctl的值,讓其表示的線程數減1。Code3第8行,remove掉該Worker。會執行Code3第22行代碼,使用runnable=null,添加一個Worker對象,這句話的意思就是啟動一個新的線程,去queue裡擷取runnable執行。

是以,我們自己runnable抛出了異常,是kill掉了執行它的線程,并且補充一個新的線程。

ThreadPoolExecutor構造方法裡的第3、4個方法如何生效

ThreadPoolExecutor第3、4個方法表示當線程空閑多久後kill。

這個線程kill是針對多餘core線程數生效的,或則我們可以設定對于所有線程生效。如果設定對于所有線程生效,我們需要執行ThreadPoolExecutor.allowCoreThreadTimeOut(true)設定。

那麼這是如何生效的呢。我們還是來看Code2的代碼。該線程一直在while裡執行,當它跳出while時,就是線程結束的條件。跳出while的條件就是Code2第8行的getTask方法傳回null。接下來看getTask方法。

Code4

ThreadPoolExecutor.getTask

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc >  || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
           

Code4第13行,當allowCoreThreadTimeOut或者當先線程數超過core線程數時,設定timed=true。Code第21行,當timed==true時,queue擷取runnable時阻塞我們設定的時間。這樣來實作當線程空閑某一段時間後,線程kill的。

補充

ThreadPoolExecutor記錄線程數量等使用的是一個AtomicInteger類型的變量ctl,把這個變量前3位和後29位分開使用的,使用了位運算,下面的代碼方面我們檢視資料的變化。

System.out.println(- & );
    System.out.println(- & ~);
    System.out.println(Integer.toBinaryString(-));
    System.out.println(Integer.toBinaryString(-));
    System.out.println(Integer.toBinaryString());
    System.out.println(Integer.toBinaryString());
    System.out.println(Integer.toBinaryString(-));
    System.out.println(Integer.toBinaryString(-));
    System.out.println(Integer.toBinaryString(- << ));
    System.out.println(Integer.toBinaryString((- << ) | ));
           

繼續閱讀