天天看點

基于數組的有界阻塞隊列ArrayBlockingQueue源碼分析

一:功能介紹

         基于數組的有界阻塞隊列,基于FIFO的存儲模式,支援公平非公平鎖。

二:源碼分析

//數組
    final Object[] items;
    //出隊索引
    int takeIndex;
    //入隊索引
    int putIndex;
    //隊列大小
    int count;
    //可重入鎖
    final ReentrantLock lock;
    //等待通知條件
    private final Condition notEmpty;
    //等待通知條件
    private final Condition notFull;
           

  構造函數

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        //初始化數組容量
        this.items = new Object[capacity];
        //内容采用可重入鎖ReentrantLock實作,支援公平非公平選擇
        lock = new ReentrantLock(fair);
        //阻塞隊列,等待條件
        notEmpty = lock.newCondition();
        //阻塞隊列,等待條件
        notFull =  lock.newCondition();
   }
           

   入隊操作

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //可中斷擷取鎖,如果出現了interrupted,不用一直阻塞
        lock.lockInterruptibly();
        try {
            //如果隊列已滿
            while (count == items.length)
                //入隊線程阻塞
                notFull.await();
            //插入資料
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        //将新的資料指派在數組的某一個索引處
        items[putIndex] = x;
        //重新指派putIndex,設定下一個被取出元素的索引
        putIndex = inc(putIndex);
        //隊列大小+1
        ++count;
        //喚醒take線程
        notEmpty.signal();
    }
     final int inc(int i) {
        //如果隊列滿了,重新初始化為0
        return (++i == items.length) ? 0 : i;
    }
           

    出隊操作

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //同上,擷取中斷鎖
        lock.lockInterruptibly();
        try {
            //隊列沒有值,阻塞
            while (count == 0)
                notEmpty.await();
            //傳回被取走的資料
            return extract();
        } finally {
            lock.unlock();
        }
    }
     private E extract() {
        final Object[] items = this.items;
        //擷取takeIndex處的元素
        E x = this.<E>cast(items[takeIndex]);
        //置空takeIndex處的元素,引用不存在,便于GC,釋放記憶體
        items[takeIndex] = null;
        //重新指派takeIndex,設定下一個被取出的元素
        takeIndex = inc(takeIndex);
        //隊列大小-1
        --count;
        //喚醒put線程
        notFull.signal();
        return x;
    }
           

  移除資料

public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //從takeIndex處開始計算,每次i加1,最大為隊列最大容量count
            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
                //如果移除元素在數組某個下标找到
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    void removeAt(int i) {
        final Object[] items = this.items;
        //如果準備移除的索引和下一個被取出的元素索引一樣,直接移除
        if (i == takeIndex) {
            //指派null,便于GC
            items[takeIndex] = null;
            //重新設定下一個被取出元素的索引
            takeIndex = inc(takeIndex);
        //如果需要删除的元素索引不是目前被取出的索引
        } else {
            //一直循環,直到删除為止
            for (;;) {
                //假設隊列容量是4,目前存了3個元素,即takeIndex=0,putIndex=3,目前我打算删除數組下标為1的元素
                // nexti第一次為2
                int nexti = inc(i);
                if (nexti != putIndex) {
                    //相當于将隊列往前移
                    items[i] = items[nexti];
                    //相當于i+1
                    i = nexti;
                //待删除的索引與待put的索引相等,比如putIndex=2,i=1,inc(i) = 2
                } else {
                    //索引i處置null,偏于GC
                    items[i] = null;
                    //重新指派下一個即将放入元素的索引
                    putIndex = i;
                    break;
                }
            }
        }
        //隊列大小-1
        --count;
        //喚醒put線程,公平的話按FIFO順序,非公平的話可以搶占
        notFull.signal();
    }
           

    周遊隊列

public Iterator<E> iterator() {
        return new Itr();
    }
   
    private class Itr implements Iterator<E> {
        //隊列裡面還剩的元素個數
        private int remaining;
        //下一次調用next()傳回的索引
        private int nextIndex;
        //下一次調用next()傳回的元素
        private E nextItem; 
        //上一次調用next()傳回的元素
        private E lastItem; 
        //上一次調用next()傳回的索引 
        private int lastRet; 

        Itr() {
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
                lastRet = -1;
                //隻有隊列裡面還有元素
                if ((remaining = count) > 0)
                    //擷取takeIndex處的元素
                    nextItem = itemAt(nextIndex = takeIndex);
            } finally {
                lock.unlock();
            }
        }

        public boolean hasNext() {
            return remaining > 0;
        }

        public E next() {
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
                //如果隊列沒有值
                if (remaining <= 0)
                    throw new NoSuchElementException();
                lastRet = nextIndex;
                //擷取下一次擷取索引處的元素
                E x = itemAt(nextIndex);  // check for fresher value
                if (x == null) {
                    x = nextItem;         // we are forced to report old value
                    lastItem = null;      // but ensure remove fails
                }
                else
                 //将剛擷取的元素當做上一次擷取的元素
                    lastItem = x;
                 //當下一次擷取的元素不存在的時候
                while (--remaining > 0 && // skip over nulls
                       (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
                    ;
                return x;
            } finally {
                lock.unlock();
            }
        }

        public void remove() {
            final ReentrantLock lock = ArrayBlockingQueue.this.lock;
            lock.lock();
            try {
                int i = lastRet;
                if (i == -1)
                    throw new IllegalStateException();
                lastRet = -1;
                E x = lastItem;
                lastItem = null;
                // only remove if item still at index
                if (x != null && x == items[i]) {
                    boolean removingHead = (i == takeIndex);
                    removeAt(i);
                    if (!removingHead)
                        nextIndex = dec(nextIndex);
                }
            } finally {
                lock.unlock();
            }
        }
    }