天天看點

JDK 源碼解析 —— ArrayBlockingQueue

零. 簡介 ArrayBlockingQueue 是一個由 數組作為基礎資料結構的 有界阻塞隊列。出隊入隊使用先進先出算法,即 FIFO (first in first out)。head 是停留在隊列中最長的節點,tail 停留在隊列中最短的節點。從 head 出隊列,從 tail 入隊列。

這是一個典型的「有界緩沖區」,一個固定大小的數組持有從生産者(producers)産生的和被消費者(consumers)消費的資料。 此隊列一旦建立,隊列大小就固定了,不能再改變。嘗試将資料加入到滿隊列中,将會被阻塞;嘗試從空隊列中取資料也同樣會被阻塞。

本類提供生産者和消費者線程通路順序的一個可選的公平政策。預設情況下,為了性能,這個通路順序不被保證。不過,在初始化構造本類的時候,将 fairness 設定為 true,則可以保證線程通路的公平性,這樣設定的壞處是降低吞吐量,好處是減少了可變性和線程的饑餓現象。

一. 代碼

(1)核心變量

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
           

阻塞隊列實作的關鍵參數,一個可重入鎖,和兩個條件,使用經典的雙狀态算法(two-condition algorithm)

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;
           

一些基本參數,基礎的數組對象,隊頭的位置,隊尾的位置,隊列中已有的資料量

(2)構造器

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
           

隊列構造方法,設定固定隊列大小,是否需要公平通路隊列,公平鎖和非公平鎖由 ReentrantLock (公平鎖是正常走鎖排隊申請流程,非公平鎖先嘗試擷取 AQS stat 狀态鎖,然後才走正常鎖排隊申請)提供

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
           

預設構造方法是非公平的

(3) add 方法:新增資料

實際上是調用繼承的抽象類 AbstractQueue 的 add 方法

public boolean add(E e) {
    return super.add(e);
}
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
           

上述的 offer(e) 是接口 Queue 未實作的方法,具體實作在 ArrayBlockingQueue

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
           

如果隊列還沒滿,則加入隊尾并傳回 true; 可以看出來 offer 方法如果插入不了不是進入阻塞狀态,是直接傳回一個 false 狀态

将資料插入隊尾,移動數組下标( inc(putIndex) 保證循環移動),隊列總數 count 加 1,notEmpty.signal 喚醒等待拿資料的線程(在 AQS 的等待隊列中的線程)

private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}
           

(4)put 方法:新增資料 (如果滿了就阻塞)

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        insert(e);
    } finally {
        lock.unlock();
    }
}
           

可以看到如果隊列資料量 count == items.length 數組大小,則線程阻塞 await()

(5)poll:取資料,不是阻塞方法

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : extract();
    } finally {
        lock.unlock();
    }
}

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
}
           

如果資料為空則傳回 null,不為空則傳回資料,并且喚醒 notFull 狀态挂起的線程

(5)take:取資料,如果為空則阻塞

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return extract();
    } finally {
        lock.unlock();
    }
}
           

(6) drainTo 方法:字面意思就是排幹,就是把資料批量導入到一個集合類中,比一個一個 poll 效率高,因為加鎖次數少

public int drainTo(Collection<? super E> c) {
    checkNotNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = takeIndex;
        int n = 0;
        int max = count;
        while (n < max) {
            c.add(this.<E>cast(items[i]));
            items[i] = null;
            i = inc(i);
            ++n;
        }
        if (n > 0) {
            count = 0;
            putIndex = 0;
            takeIndex = 0;
            notFull.signalAll();
        }
        return n;
    } finally {
        lock.unlock();
    }
}
           

三. 總結 看懂 ArrayBlockingQueue 需要先看懂 AbstractQueuedSynchronizer 和 ReentrantLock,阻塞就是靠 ReentrantLock 來實作的,而 ReentrantLock 是靠 AbstractQueuedSynchronizer 來實作加鎖和釋放鎖。主要的算法就是上文提到的 two-condition algorithm,這個算法應該在學生時代《作業系統》課程上見過很多次了