天天看點

Java并發容器之阻塞隊列BlockingQueue

    BlockingQueue提供了線程安全的隊列通路方式:當阻塞隊列進行插入資料時,如果隊列已滿,線程将會阻塞等待直到隊列非滿;從阻塞隊列取資料時,如果隊列已空,線程将會阻塞等待直到隊列非空。 

    BlockingQueue 具有 4 組不同的方法用于插入、移除以及對隊列中的元素進行檢查。如果請求的操作不能得到立即執行的話,每個方法的表現也不同。這些方法如下:

Java并發容器之阻塞隊列BlockingQueue

四組不同的行為方式解釋:

    • 抛異常:如果試圖的操作無法立即執行,抛一個異常。
    • 特定值:如果試圖的操作無法立即執行,傳回一個特定的值(常常是 true / false)。
    • 阻塞:如果試圖的操作無法立即執行,該方法調用将會發生阻塞,直到能夠執行。
    • 逾時:如果試圖的操作無法立即執行,該方法調用将會發生阻塞,直到能夠執行,但等待時間不會超過給定值。傳回一個特定值以告知該操作是否成功(典型的是true / false。

    阻塞隊列實作阻塞同步的方式很簡單,使用的就是是lock鎖的多條件(condition)阻塞控制。封裝了根據條件阻塞線程的過程,而我們就不用關心繁瑣的await/signal操作了。

    阻塞隊列的主要實作類有以下幾種:

    1:ArrayBlockingQueue:ArrayBlockingQueue是一個有界的阻塞隊列,其内部實作是一個數組。有界也就意味着,它不能夠存儲無限多數量的元素。它有一個同一時間能夠存儲元素數量的上限。你可以在對其初始化的時候設定這個上限,但之後就無法對這個上限進行修改了(因為它是基于數組實作的,一旦初始化,大小就無法修改,不會自動擴容)。内部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

    2:DelayQueue:DelayQueue 對元素進行持有,直到一個特定的延遲到期。DelayQueue 将會在每個元素的 getDelay() 方法傳回的值的時間段之後才釋放掉該元素。如果傳回的是 0 或者負值,延遲将被認為過期,該元素将會在 DelayQueue 的下一次 take  被調用的時候被釋放掉。元素必須實作 java.util.concurrent.Delayed 接口。

    3:LinkedBlockingQueue:LinkedBlockingQueue 内部以一個連結清單存儲元素。可以選擇一個長度上限。如果沒有定義上限,将使用 Integer.MAX_VALUE 作為上限。内部以 FIFO(先進先出)的順序對元素進行存儲。隊列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

    4:PriorityBlockingQueue:PriorityBlockingQueue 是一個無界的并發隊列。所有插入到 PriorityBlockingQueue 的元素必須實作 java.lang.Comparable接口。隊列中元素的排序取決于你自己的 Comparable 實作。注意:如果你從一個 PriorityBlockingQueue 獲得一個 Iterator 的話,該 Iterator 并不能保證它對元素的周遊是以優先級為序的。

    5:SynchronousQueue:SynchronousQueue 是一個特殊的隊列,它的内部同時隻能夠容納一個元素。如果該隊列已有一進制素的話,試圖向隊列中插入一個新元素的線程将會阻塞,直到另一個線程将該元素從隊列中抽走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程将會阻塞,直到另一個線程向隊列中插入了一條新的元素。

     6:LinkedBlockingDeque :鍊阻塞雙端隊列.它是對BlockingDeque雙端隊列接口的實作,線程在雙端隊列的兩端都可以插入和提取元素。在它為空的時候,一個試圖從中抽取資料的線程将會阻塞,無論該線程是試圖從哪一端抽取資料。

    BlockingQueue 通常用于一個線程生産對象,而另外一個線程消費這些對象的場景,最常見的就是用來實作生産者消費者模式:因為使用者不需顯式指定什麼時候阻塞進行生産或者消費,隊列本身就提供了滿或空時的阻塞功能。

public class BlockingQueueTest {
    //生産者線程
    public static class Producer implements Runnable{
        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        private Random random;

        public Producer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
            flag=false;
            random=new Random();

        }
        public void run() {
            while(!flag){
                int info=random.nextInt(100);
                try {
                    blockingQueue.put(info);//生産對象并存入阻塞隊列中
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }               
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    //消費者線程
    public static class Consumer implements Runnable{
        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
        public void run() {
            while(!flag){
                int info;
                try {
                    info = blockingQueue.take();//從阻塞隊列中消費産品
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }               
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    public static void main(String[] args){
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
        Producer producer=new Producer(blockingQueue);
        Consumer consumer=new Consumer(blockingQueue);
        //建立5個生産者,5個消費者,并啟動生産和消費
        for(int i=0;i<10;i++){
            if(i<5){
                new Thread(producer,"producer"+i).start();
            }else{
                new Thread(consumer,"consumer"+(i-5)).start();
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.shutDown();//手動停止生産
        consumer.shutDown();//手動停止消費

    }
}      

轉載于:https://www.cnblogs.com/ygj0930/p/6543890.html