BlockingQueue提供了線程安全的隊列通路方式:當阻塞隊列進行插入資料時,如果隊列已滿,線程将會阻塞等待直到隊列非滿;從阻塞隊列取資料時,如果隊列已空,線程将會阻塞等待直到隊列非空。
BlockingQueue 具有 4 組不同的方法用于插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:
四組不同的行為方式解釋:
-
- 抛異常:如果試圖的操作無法立即執行,抛一個異常。
- 特定值:如果試圖的操作無法立即執行,傳回一個特定的值(常常是 true / false)。
- 阻塞:如果試圖的操作無法立即執行,該方法調用将會發生阻塞,直到能夠執行。
- 逾時:如果試圖的操作無法立即執行,該方法調用将會發生阻塞,直到能夠執行,但等待時間不會超過給定值。傳回一個特定值以告知該操作是否成功(典型的是true / false。
阻塞隊列實作阻塞同步的方式很簡單,使用的就是是lock鎖的多條件(condition)阻塞控制。封裝了根據條件阻塞線程的過程,而我們就不用關心繁瑣的await/signal操作了。
阻塞隊列的主要實作類有以下幾種:
1:ArrayBlockingQueue:ArrayBlockingQueue是一個有界的阻塞隊列,其内部實作是一個數組。有界也就意味着,它不能夠存儲無限多數量的元素。它有一個同一時間能夠存儲元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(因為它是基于數組實作的,一旦初始化,大小就無法修改,不會自動擴容)。内部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
2:DelayQueue:DelayQueue 對元素進行持有,直到一個特定的延遲到期。DelayQueue 将會在每個元素的 getDelay() 方法傳回的值的時間段之後才釋放掉該元素。如果傳回的是 0 或者負值,延遲将被認為過期,該元素将會在 DelayQueue 的下一次 take 被調用的時候被釋放掉。元素必須實作 java.util.concurrent.Delayed 接口。
3:LinkedBlockingQueue:LinkedBlockingQueue 内部以一個連結清單存儲元素。可以選擇一個長度上限。如果沒有定義上限,将使用 Integer.MAX_VALUE 作為上限。内部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。
4:PriorityBlockingQueue:PriorityBlockingQueue 是一個無界的并發隊列。所有插入到 PriorityBlockingQueue 的元素必須實作 java.lang.Comparable接口。隊列中元素的排序取決于你自己的 Comparable 實作。注意:如果你從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 并不能保證它對元素的周遊是以優先級為序的。
5:SynchronousQueue:SynchronousQueue 是一個特殊的隊列,它的内部同時隻能夠容納一個元素。如果該隊列已有一進制素的話,試圖向隊列中插入一個新元素的線程将會阻塞,直到另一個線程将該元素從隊列中抽走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程将會阻塞,直到另一個線程向隊列中插入了一條新的元素。
6:LinkedBlockingDeque :鍊阻塞雙端隊列.它是對BlockingDeque雙端隊列接口的實作,線程在雙端隊列的兩端都可以插入和提取元素。在它為空的時候,一個試圖從中抽取資料的線程将會阻塞,無論該線程是試圖從哪一端抽取資料。
BlockingQueue 通常用于一個線程生産對象,而另外一個線程消費這些對象的場景,最常見的就是用來實作生産者消費者模式:因為使用者不需顯式指定什麼時候阻塞進行生産或者消費,隊列本身就提供了滿或空時的阻塞功能。
public class BlockingQueueTest {
//生産者線程
public static class Producer implements Runnable{
private final BlockingQueue<Integer> blockingQueue;
private volatile boolean flag;
private Random random;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
flag=false;
random=new Random();
}
public void run() {
while(!flag){
int info=random.nextInt(100);
try {
blockingQueue.put(info);//生産對象并存入阻塞隊列中
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void shutDown(){
flag=true;
}
}
//消費者線程
public static class Consumer implements Runnable{
private final BlockingQueue<Integer> blockingQueue;
private volatile boolean flag;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
while(!flag){
int info;
try {
info = blockingQueue.take();//從阻塞隊列中消費産品
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void shutDown(){
flag=true;
}
}
public static void main(String[] args){
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
Producer producer=new Producer(blockingQueue);
Consumer consumer=new Consumer(blockingQueue);
//建立5個生産者,5個消費者,并啟動生産和消費
for(int i=0;i<10;i++){
if(i<5){
new Thread(producer,"producer"+i).start();
}else{
new Thread(consumer,"consumer"+(i-5)).start();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.shutDown();//手動停止生産
consumer.shutDown();//手動停止消費
}
}
轉載于:https://www.cnblogs.com/ygj0930/p/6543890.html