天天看點

JUC 之阻塞隊列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析

阻塞隊列繼承結構說明:

JUC 之阻塞隊列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析

根據繼承結構可知:

1.Array 和 Linked 實作了統一的繼承結構,不同的地方在于内部實作不用阻塞機制和隊列資料存放結構

2.它們都實作了Iterable 接口,表示它們都是可疊代的,即實作了iterator() 方法,iterator()方法會傳回一個Iterator ,Iterator為疊代器 其中主要的方法為next hasNext 等, 那麼隊列中對疊代器的實作使用内部類實作,在ArrayBlockingQueue 中即為Itr 這個私有的内部類,我們發現在每次調用iterator() 方法時都會生成一個Itr , 在ArrayBlockingQueue 中還有一個Itrs内部類 Itrs使用單向連結清單的方式維護生成的多個疊代器,疊代器的實作原理會在其它的部落格中進行剖析,這裡就不多說了

3. ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象(lock)。ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實作“多線程對競争資源的互斥通路”。而且,ReentrantLock分為公平鎖和非公平鎖,關于具體使用公平鎖還是非公平鎖,在建立ArrayBlockingQueue時可以指定;而且,ArrayBlockingQueue預設會使用非公平鎖。   

主要方法:

要想了解這兩個阻塞隊列,先分析一下隊列提供的主要方法 BlockingQueue

// 将指定元素插入到隊列尾部 如果成功傳回true
 // 如果隊列已滿 抛出IllegalStateException  其内部調用的offer
 boolean add(E e);

 //将指定元素插入隊列尾部,如果成功傳回true 如果隊列已滿傳回false
 boolean offer(E e);

 //将指定元素插入隊列尾部,如果隊列已滿進行阻塞 直到notFull條件喚醒
 void put(E e) throws InterruptedException;

 //将指定元素插入隊列尾部,如果隊列已滿則等待指定時間
 boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

 //移除并擷取隊列頭部元素,如果隊列為空則阻塞 直到notEmpty條件喚醒
 E take() throws InterruptedException;

 //移除并擷取隊列頭部元素,如果隊列為空則等待指定時間
 E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

 //隊列剩餘可使用容量 items.length - count
 int remainingCapacity();

 //移除隊列中指定元素 改方法調用removeAt 兩個方法之後會詳細說明,因為remove移除的可能不是頭部元素是以需要隊列中元素重新排列和修改putIndex值
 boolean remove(Object o);
 
 //是否包含指定元素
 public boolean contains(Object o);
 
 //移除隊列中所有的可用元素,并添加到Collection中
 int drainTo(Collection<? super E> c);
 //移除指定個數的可用元素,并添加到Collection中,從takeIndex開始
 int drainTo(Collection<? super E> c, int maxElements);
 
 //移除并擷取隊列頭部元素,如果為空則抛NoSuchElementException 内部調用poll
 E remove();

 //移除并擷取隊列頭元素,如果為空則傳回null
 E poll();
 //隻擷取不移除隊列頭部元素,如果為空則抛NoSuchElementException 内部調用peek
 E element();
 //隻擷取不移除隊列頭部元素,如果隊列為空則傳回null 内部調用itemAt
 E peek();


 
           

ArrayBlockingQueue 構造方法和成員表量說明:

//存放隊列元素的數組
final Object[] items;

//下次take, poll, peek or remove 元素的位置
int takeIndex;

//下次 put, offer, or add 元素的位置
int putIndex;

//隊列元素的數量
int count;

//進行隊列同步操作的重入鎖
final ReentrantLock lock;

//不為空條件 當調用take隊列為空時進行wait
private final Condition notEmpty;

//不滿條件 當調用put隊列已滿時進行wait
private final Condition notFull;

//維護隊列疊代器 Itrs 内部使用單向連結清單維護多個疊代器
transient Itrs itrs = null;
           

 提供三個構造方法:

//指定數組容量
public ArrayBlockingQueue(int capacity) {
        //預設ReentrantLock 為非公平模式
        this(capacity, false);
    }

//指定數組容量并設定ReentrantLock 公平非公平模式
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

//除了上面兩個設定,還設定隊列初始元素
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
           
JUC 之阻塞隊列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析

ArrayBlockingQueue中的元素存在公平通路與非公平通路的差別,對于公平通路隊列,被阻塞的線程可以按照阻塞的先後順序通路隊列,即先阻塞的線程先通路隊列。而非公平隊列,當隊列可用時,阻塞的線程将進入争奪通路資源的競争中,也就是說誰先搶到誰就執行,沒有固定的先後順序。

非阻塞方法我們先不說明,先詳細剖析一下兩個阻塞方法put take

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果隊列已滿則進行阻塞等待 即添加到notFull Condition的等待隊列中
            while (count == items.length)
                notFull.await();
            //如果沒滿則直接入隊
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        //索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //有元素入隊之後喚醒阻塞在notEmpty Condition 上的take操作
        notEmpty.signal();
    }
           

put方法是一個阻塞的方法,如果隊列元素已滿,那麼目前線程将會被notFull條件對象挂起加到等待隊列中,直到隊列有空檔才會喚醒執行添加操作。但如果隊列沒有滿,那麼就直接調用enqueue(e)方法将元素加入到數組隊列中。說白了就是當隊列滿時通過條件對象Condtion來阻塞目前調用put方法的線程,直到線程又再次被喚醒執行。總得來說添加線程的執行存在以下兩種情況,一是,隊列已滿,那麼新到來的put線程将添加到notFull的條件隊列中等待,二是,有移除線程執行移除操作,移除成功同時喚醒put線程,如下圖所示 

JUC 之阻塞隊列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析

上圖為隊列已滿進行put操作的示意圖

當有線程進行take操作取出元素,示意圖如下:

JUC 之阻塞隊列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析

接下來我們看下take操作,take操作在隊列為空的時候阻塞,當有put操作時喚醒notEmpty Condition

我們先看下源碼:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果隊列為空進行等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }


    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        //取值索引到頭之後重置為0
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //喚醒等待在notFull上的put操作
        notFull.signal();
        return x;
    }
           

從代碼上看邏輯還是很簡單的, 這裡就不畫示意圖了, 邏輯跟put相反

到這裡主要的方法基本上已經都說明了, 但是最後還要說一下remove方法,這個方法的邏輯稍微有點複雜,現在我們剖析一下remove方法

public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //不為空才删除,為空則直接傳回false
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                //循環隊列中的可用元素,如果有指定remove的對象則調用removeAt
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }


    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        //如果要移除的元素位置正好是takeIndex 即在上面的remove循環中第一次就命中
        //這是一種特殊情況,其實就相當于take操作了
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                //重新設定疊代器元素
                itrs.elementDequeued();
        //如果移除的并非takeIndex元素
        } else {
            // an "interior" remove
            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            //此處循環正是移除的邏輯,根據removeIndex 把removeIndex 後面的可用元素統一向前
            //挪動 然後把putIndex之前的值設定為null 重新設定putIndex
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                //對應更新疊代器資料
                itrs.removedAt(removeIndex);
        }
        //喚醒notFull put 操作
        notFull.signal();
    }
           

隻看代碼對于移除可能還不是很清除,那我們就畫下圖了解一下:

JUC 之阻塞隊列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析

到此位置ArrayBlockingQueue的實作原理基本上算是說完了,但是關于疊代器的實作,還在在并發操作中疊代器原理,還有在移除時疊代器是如何整理資料的, 這個之後單獨說明

繼續閱讀