天天看點

[java隊列]——PriorityBlockingQueuePriorityBlockingQueue介紹PriorityBlockingQueue内部實作PriorityBlockingQueue總結

[java隊列]——PriorityBlockingQueue

  • PriorityBlockingQueue介紹
  • PriorityBlockingQueue内部實作
    • 基本屬性
    • 構造方法
    • 入隊
    • 擴容
    • 出隊
  • PriorityBlockingQueue總結

PriorityBlockingQueue介紹

上一篇[[java隊列]——PriorityQueue介紹了優先級隊列,回顧一下PriorityQueue有哪些特點:

  • 資料實作,自動擴容,無界
  • 非線程安全
  • 使用小頂堆作為實作,入隊就是堆的插入,出隊就是堆的删除堆頂元素

這一篇将介紹另一個優先級隊列PriorityBlockingQueue,阻塞式的優先級隊列,它相比PriorityQueue有以下特點

  • 線程安全
  • 阻塞
  • 同樣是數組實作,自動擴容,無界,同樣底層是使用小頂堆實作優先級

PriorityBlockingQueue内部實作

基本屬性

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    //預設初始容量
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    //數組最大容量
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    //存儲元素的數組
    private transient Object[] queue;
    //元素個數
    private transient int size;
    //比較器,用于比較元素之間的大小
    private transient Comparator<? super E> comparator;
    //可重入鎖,控制并發安全
    private final ReentrantLock lock;
    //可重入鎖上的非空條件
    private final Condition notEmpty;
    //???通過這個變量CAS更新成功的,就可以進行擴容
    private transient volatile int allocationSpinLock;
    //???據說用于序列化
    private PriorityQueue<E> q;
           

結論:

  • 數組實作
  • 可重入鎖+非空條件控制并發
  • 為什麼不需要非滿條件,因為隊列是無界的
  • 結構基本與PriorityQueue一樣,隻是加多了鎖,擴容使用了一個volatile變量CAS來控制并發。

構造方法

public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
           

結論:

  • 構造方法非常簡單,直接看代碼吧

入隊

還是跟前面介紹的一樣,阻塞隊列一般有四個入隊方法。基本都是調用了offer(E e)方法

public boolean add(E e) {
        return offer(e);
    }

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        //入隊加鎖
        lock.lock();
        int n, cap;
        Object[] array;
        //循環判斷如果元素個數大于等于數組長度,則進行擴容。
        while ((n = size) >= (cap = (array = queue).length))
        	//調用tryGrow完畢要繼續判斷,因為調用tryGrow傳回并不代表目前線程已經擴容完畢,有可能是其他線程正在擴容,
        	//如果其他線程正在擴容,目前線程的tryGrow将會直接傳回,要繼續while循環判斷,直到其他線程擴容完畢。
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            //入隊方法與PriorityQueue類似,這裡就不再介紹了
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            //入隊成功,發出信号
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

    public void put(E e) {
        offer(e); // never need to block
    }
    
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }

           

結論:

  • 入隊加鎖,入隊完畢,發出非空信号通知
  • 循環判斷數組元素個數是否已滿,進行tryGrow擴容

擴容

private void tryGrow(Object[] array, int oldCap) {
		//這裡先釋放鎖,目的是由于已經達到擴容條件了,如果有多個線程正在嘗試入隊,這些線程将會被阻塞
		//為了減少阻塞的線程,這裡直接釋放鎖,使用CAS的方式進行擴容。
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        //CAS更新allocationSpinLock,更新成功才能進行擴容
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
            	//跟PriorityQueue一樣的擴容增量規則
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
            	//擴容後解鎖
                allocationSpinLock = 0;
            }
        }
        //若newArray為null,說明cas更新失敗,其他線程正在擴容。這個時候将目前線程讓出cpu
        if (newArray == null) // back off if another thread is allocating
        //yield是讓目前線程讓出cpu,從運作狀态變為就緒狀态,但并不代表,cpu就一定會先執行其他線程。因為是搶占式的,有可能又搶到cpu
            Thread.yield();
        //擴容成功,或者,線程讓出cpu後傳回回來,繼續加鎖
        lock.lock();
        //如果擴容成功并且就數組沒有被替換過。。沒看太懂這裡??
        if (newArray != null && queue == array) {
        	//隊列更新為新數組
            queue = newArray;
            //複制新數組
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
           

結論:

  • 擴容容量規則與PriorityQueue一緻
  • 擴容前先釋放鎖,為了減少阻塞線程的數量。
  • 擴容前先釋放鎖,利用CAS來控制擴容。CAS更新失敗,則讓目前線程讓出CPU
  • 最後再次加鎖,再确認是否擴容成功

出隊

同樣出隊有四個方法

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //出隊加鎖,響應中斷
        lock.lockInterruptibly();
        E result;
        try {
        	//若出隊隊列為空,阻塞等待,dequeue與PriorityQueue的出隊一樣,這裡就不介紹了
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
        	//解鎖
            lock.unlock();
        }
        return result;
    }

	public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }
           

結論:

  • 出隊加鎖,若dequeue傳回為空則一直阻塞,直到收到隊列不為空的通知或者中斷才會傳回
  • dequeue方法基本跟PriorityQueue一緻

PriorityBlockingQueue總結

  • 出入隊線程安全,使用鎖和非空條件實作
  • 擴容線程安全,使用CAS
  • 阻塞
  • 與PriorityQueue同樣是數組實作,自動擴容,無界,同樣底層是使用小頂堆實作優先級,出入隊的堆操作也一樣