在Java的java.util.concurrent包中定義了和多線程并發相關的操作,有許多好用的工具類,今天就來看下阻塞隊列。阻塞隊列很好的解決了多線程中資料的安全傳輸問題,其中最典型的例子就是客園很好的解決“生産者--消費者”問題。下面來看其中一個實作類ArrayBlockingQueue。看到這個名字,就很好了解這個隊列肯定是使用數組實作的隊列,即使用數組實作的“先進先出”的隊列,下面看其具體的實作。(均為JDK8)
一、構造方法
在ArrayBlockingQueue類中有下面的3個構造方法,
1、ArrayBlockingQueue(int)
接收一個整型的參數,這個整型參數指的是隊列的長度,其定義如下,
public ArrayBlockingQueue(intcapacity) {this(capacity, false);
}
可以看到這個方法調用的是ArrayBlockingQueue(int,boolean)方法,那麼看下這個方法,
2、ArrayBlockingQueue(int,boolean)
接收兩個參數,一個整型,一個boolean類型,前邊已經知道整型參數是隊列的長度,那麼boolean類型參數代表什麼意思那,其定義如下,
public ArrayBlockingQueue(int capacity, booleanfair) {if (capacity <= 0)throw newIllegalArgumentException();this.items = newObject[capacity];
lock= newReentrantLock(fair);
notEmpty=lock.newCondition();
notFull=lock.newCondition();
}
可以看到在這個構造方法中進行了相關邏輯實作,對items進行了數組初始化,boolean類型的參數是作為可重入鎖的參數進行初始化,規定可重入鎖是公平還是不公平,預設為false,另外初始化了notEmpty、notFull兩個信号量。
3、ArrayBlockingQueue(int,boolean,Collection extends E>)
接收兩三個參數,第一個整型,第二個boolean類型,第三個集合類型,此構造方法不常用,其定義如下,
public ArrayBlockingQueue(int capacity, booleanfair,
Collection extends E>c) {this(capacity, fair);final ReentrantLock lock = this.lock;
lock.lock();//Lock only for visibility, not mutual exclusion
try{int i = 0;try{for(E e : c) {
checkNotNull(e);
items[i++] =e;
}
}catch(ArrayIndexOutOfBoundsException ex) {throw newIllegalArgumentException();
}
count=i;
putIndex= (i == capacity) ? 0: i;
}finally{
lock.unlock();
}
}
通過上面的三個構造方法可以構造一個ArrayBlockingQueue的隊列,在構造方法中初始化了實列變量,下面是一些執行個體變量,
private static final long serialVersionUID = -817911632652898426L;
//儲存隊列元素的數組
finalObject[] items;
//取出元素的位置
inttakeIndex;
//添加元素的位置
intputIndex;
//隊列中元素的數量
intcount;
//鎖對象
finalReentrantLock lock;
//不空的信号量
private finalCondition notEmpty;
//不滿的信号量
private finalCondition notFull;
transient Itrs itrs = null;
二、隊列的操作
需要使用阻塞隊列,那麼就需要向隊列中添加或取出元素,在ArrayBlocking中已經實作了相關操作,對于添加/取出均是成對出現,提供的方法中有抛出異常、傳回false、線程阻塞等幾種情形。
1、add/peek
add/peek是一對互斥的操作,add向隊列種放入元素,peek取出元素。
1.1、add
add的定義如下
public booleanadd(E e) {return super.add(e);
}
從上面可以看出add方法調用了其父類的實作,父類實作如下
public booleanadd(E e) {if(offer(e))return true;else
throw new IllegalStateException("Queue full");
}
父類實作種調用的offer方法,在offer方法傳回true時,add方法傳回true,其他則抛出“Queue full”的異常。offer方法下面會講到。
1.2、peek
peek方法定義如下
publicE peek() {final ReentrantLock lock = this.lock;
lock.lock();try{return itemAt(takeIndex); //null when queue is empty
} finally{
lock.unlock();
}
}
在peek方法種使用可重入鎖,傳回takeIndex處的元素,前面注釋中寫道,此變量代表的是待取出元素的索引。itemAt方法定義如下,
@SuppressWarnings("unchecked")final E itemAt(inti) {return(E) items[i];
}
此方法未進行任何的判斷直接傳回takeIndex出的數組元素。
2、put/take
put/take是一對互斥的操作,put向隊列種放入元素,take取出元素,其實作方式和add/peek不一樣。
2.1、put
put的定義如下
public void put(E e) throwsInterruptedException {
checkNotNull(e);final ReentrantLock lock = this.lock;
lock.lockInterruptibly();try{while (count ==items.length)
notFull.await();
enqueue(e);
}finally{
lock.unlock();
}
}
此方法會判斷元素是否null,然後判斷目前隊列中的元素數量和隊列的長度,如果二者相等則阻塞目前線程;如果不相等則執行enqueue(e)方法,其定義如下,
private voidenqueue(E x) {//assert lock.getHoldCount() == 1;//assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex]=x;if (++putIndex ==items.length)
putIndex= 0;
count++;
notEmpty.signal();
}
把待插入的元素放在數組的putIndex位置,如果執行完插入後putIndex等于數組的長度,說明隊列已經滿了,那麼把putIndex的值置為0,即下次插入的位置為0,下次要插入成功的必要條件是取出了一個元素,取出的位置為takeIndex。
2.2、take
take方法是取出一個元素,其定義如下,
public E take() throwsInterruptedException {final ReentrantLock lock = this.lock;
lock.lockInterruptibly();try{while (count == 0)
notEmpty.await();returndequeue();
}finally{
lock.unlock();
}
}
此方法中首先判斷目前隊列的元素數量如果為0,則目前線程進行等待,等待notEmpty.singal(),如果不為空則執行dequeue()方法,其定義如下,
privateE dequeue() {//assert lock.getHoldCount() == 1;//assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x=(E) items[takeIndex];
items[takeIndex]= null;if (++takeIndex ==items.length)
takeIndex= 0;
count--;if (itrs != null)
itrs.elementDequeued();
notFull.signal();returnx;
}
此方法取出takeIndex位置的元素,把數組中此位置的引用置為null,判斷takeIndex和數組的長度,如果相等證明,已經取到了最後一個元素,下次再取元素需要從位置0開始,為此把takeIndex置為0。
3、offer(E)/poll
offer/poll是一對互斥的操作,offer向隊列種放入元素,poll取出元素,
3.1、offer
offer的定義如下
public booleanoffer(E e) {
checkNotNull(e);final ReentrantLock lock = this.lock;
lock.lock();try{if (count ==items.length)return false;else{
enqueue(e);return true;
}
}finally{
lock.unlock();
}
}
此方法判斷如果隊列中的元素數量和隊列長度相等,則直接傳回false,否則執行enqueue方法,put方法會将線程挂起,直到被中斷或插入成功。
3.2、poll
poll方法如下
publicE poll() {final ReentrantLock lock = this.lock;
lock.lock();try{return (count == 0) ? null: dequeue();
}finally{
lock.unlock();
}
}
此方法判斷目前隊列中的元素個數如果為0傳回null,否則執行dequeue操作,take方法會将線程挂起,直到被中斷或取出成功。
4、offer(E,long,TimeUnit)/poll(long,TimeUnnit)
這兩個方法是普通offer/poll方法的加強版,在隊列滿時指定了重試的時間,如果超過指定的時間後還是無法添加或取出則傳回false。
4.1、offer
offer方法如下
public boolean offer(E e, longtimeout, TimeUnit unit)throwsInterruptedException {
checkNotNull(e);
//逾時時間long nanos =unit.toNanos(timeout);final ReentrantLock lock = this.lock;
lock.lockInterruptibly();try{while (count ==items.length) {if (nanos <= 0)return false;//超過規定時間,傳回false
nanos=notFull.awaitNanos(nanos);
}
enqueue(e);return true;
}finally{
lock.unlock();
}
}
此方法會在指定的規定時間内一直重試,如果規定時間内無法退出循環即添加元素,則傳回false。
4.2、poll
poll方法如下,
public E poll(long timeout, TimeUnit unit) throwsInterruptedException {long nanos =unit.toNanos(timeout);final ReentrantLock lock = this.lock;
lock.lockInterruptibly();try{while (count == 0) {if (nanos <= 0)return null;
nanos=notEmpty.awaitNanos(nanos);
}returndequeue();
}finally{
lock.unlock();
}
}
此方法同樣是在取出元素時進行規定時間的重試,超過規定時間則傳回null。
三、方法比較
1、添加方法比較
序号
方法名
隊列滿時處理方式
方法傳回值
1
add(E e)
抛出“Queue full”異常
boolean
2
offer(E e)
傳回false
boolean
3
put(E e)
線程阻塞,直到中斷或被喚醒
void
4
offer(E e, long timeout, TimeUnit unit)
在規定時間内重試,超過規定時間傳回false
boolean
2、取出方法比較
序号
方法名
隊列空時處理方式
方法傳回值
1
peek()
傳回null
E
2
poll()
傳回null
E
3
take()
線程阻塞,指定中斷或被喚醒
E
4
poll(long timeout, TimeUnit unit)
在規定時間内重試,超過規定時間傳回null
E
四、總結
以上時關于ArrayBlockingQueue這個阻塞隊列的相關實作及方法介紹,此隊列以數組為載體,配合可重入鎖實作生産線程和消費線程共享資料,ArrayBlockingQueue作為共享池,實作了并發條件下的添加及取出等方法。
有不正之處歡迎指正,感謝!