天天看點

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

1 Java中的阻塞隊列

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

1.1 簡介

一種支援兩個附加操作的隊列,是一系列阻塞隊列類的接口

當存取條件不滿足時,阻塞在操作處

  • 隊列滿時,阻塞存儲元素的線程,直到隊列可用
  • 隊列空時,擷取元素的線程會等待隊列非空

阻塞隊列常用于生産者/消費者場景,生産者是向隊列裡存元素的線程,消費者是從隊列裡取元素的線程.阻塞隊列就是生産者存儲元素、消費者擷取元素的容器

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

BlockingQueue繼承體系

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

阻塞隊列不可用時,兩個附加操作提供了4種處理方式

  • 抛出異常
    • 當隊列滿時,如果再往隊列裡插入元素,會抛出IllegalStateException("Queuefull")異常
    • 當隊列空時,從隊列裡擷取元素會抛出NoSuchElementException異常
  • 傳回特殊值
    • 當往隊列插入元素時,會傳回元素是否插入成功,成功則傳回true
    • 若是移除方法,則是從隊列裡取出一個元素,若沒有則傳回null
  • 一直阻塞
    • 當阻塞隊列滿時,如果生産者線程往隊列裡put元素,隊列會一直阻塞生産者線程,直到隊列有可用空間或響應中斷退出
    • 當隊列空時,若消費者線程從隊列裡take元素,隊列會阻塞住消費者線程,直到隊列非空
  • 逾時退出
    • 當阻塞隊列滿時,若生産者線程往隊列裡插入元素,隊列會阻塞生産者線程

      一段時間,若超過指定的時間,生産者線程就會退出

若是無界阻塞隊列,隊列不會出現滿的情況,是以使用put或offer方法永遠不會被阻塞,使用offer方法時,永遠傳回true

BlockingQueue 不接受 null 元素,抛 NullPointerException

null 被用作訓示 poll 操作失敗的警戒值(無法通過編譯)

BlockingQueue 實作主要用于生産者/使用者隊列,但它另外還支援 Collection 接口。

是以,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。

然而,這種操作通常表現并不高效,隻能有計劃地偶爾使用,比如在取消排隊資訊時。

BlockingQueue 的實作是線程安全的

所有排隊方法都可使用内置鎖或其他形式的并發控制來自動達到它們的目的

然而,大量的Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有必要自動執行,除非在實作中特别說明

是以,舉例來說,在隻添加 c 中的一些元素後,addAll(c) 有可能失敗(抛出一個異常)

BlockingQueue 實質上不支援使用任何一種“close”或“shutdown”操作來訓示不再添加任何項

這種功能的需求和使用有依賴于實作的傾向

例如,一種常用的政策是:對于生産者,插入特殊的 end-of-stream 或 poison 對象,并根據使用者擷取這些對象的時間來對它們進行解釋

2 生産者和消費者例子

在介紹具體的阻塞類之前,先來看看阻塞隊列最常應用的場景,即生産者和消費者例子

一般而言,有n個生産者,各自生産産品,并放入隊列

同時有m個消費者,各自從隊列中取出産品消費

當隊列已滿時(隊列可以在初始化時設定Capacity容量),生産者會在放入隊列時阻塞;當隊列空時,消費者會在取出産品時阻塞。代碼如下:

public class BlockingQueueExam {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3);
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            service.submit(new Producer("Producer" + i, blockingQueue));
        }
        for (int i = 0; i < 5; i++) {
            service.submit(new Consumer("Consumer" + i, blockingQueue));
        }
        service.shutdown();
    }
}

class Producer implements Runnable {
    private final String name;
    private final BlockingQueue<String> blockingQueue;
    private static Random rand = new Random(47);
    private static AtomicInteger productID = new AtomicInteger(0);

    Producer(String name, BlockingQueue<String> blockingQueue) {
        this.name = name;
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                SECONDS.sleep(rand.nextInt(5));
                String str = "Product" + productID.getAndIncrement();
                blockingQueue.add(str);
                //注意,這裡得到的size()有可能是錯誤的
                System.out.println(name + " product " + str + ", queue size = " + blockingQueue.size());
            }
            System.out.println(name + " is over");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {
    private final String name;
    private final BlockingQueue<String> blockingQueue;
    private static Random rand = new Random(47);

    Consumer(String name, BlockingQueue<String> blockingQueue) {
        this.name = name;
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                SECONDS.sleep(rand.nextInt(5));
                String str = blockingQueue.take();
                //注意,這裡得到的size()有可能是錯誤的
                System.out.println(name + " consume " + str + ", queue size = " + blockingQueue.size());
            }
            System.out.println(name + " is over");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
           

以上代碼中的阻塞隊列是LinkedBlockingQueue,初始化容量為3

生産者5個,每個生産者間隔随機時間後生産一個産品put放入隊列,每個生産者生産10個産品

消費者也是5個,每個消費者間隔随機時間後take取出一個産品進行消費,每個消費者消費10個産品

可以看到,當隊列滿時,所有生産者被阻塞

當隊列空時,所有消費者被阻塞

代碼中還用到了AtomicInteger原子整數,用來確定産品的編号不會混亂

2 Java裡的阻塞隊列

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

BlockingQueue的實作類

至JDK8,Java提供了7個阻塞隊列

  • ArrayBlockingQueue:數組結構組成的有界阻塞隊列
  • LinkedBlockingQueue:連結清單結構組成的有界(預設MAX_VALUE容量)阻塞隊列
  • PriorityBlockingQueue:支援優先級排程的無界阻塞隊列,排序基于compareTo或Comparator完成
  • DelayQueue:支援延遲擷取元素,在OS調用中較多或者應用于某些條件變量達到要求後需要做的事情
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:連結清單結構的TransferQueue,無界阻塞隊列
  • LinkedBlockingDeque:連結清單結構的雙向阻塞隊列

2.1 LinkedBlockingQueue和ArrayBlockingQueue

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列
Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

基于數組的阻塞隊列實作,在

ArrayBlockingQueue

内部,維護了一個定長數組,以便緩存隊列中的資料對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue内部還儲存着兩個整形變量,分别辨別着隊列的頭部和尾部在數組中的位置。

ArrayBlockingQueue在生産者放入資料和消費者擷取資料,都是共用同一個鎖對象,由此也意味着兩者無法真正并行運作,這點尤其不同于LinkedBlockingQueue;按照實作原理來分析,ArrayBlockingQueue完全可以采用分離鎖,進而實作生産者和消費者操作的完全并行運作。Doug Lea之是以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和擷取操作已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或删除元素時不會産生或銷毀任何額外的對象執行個體,而後者則會生成一個額外的Node對象。這在長時間内需要高效并發地處理大批量資料的系統中,其對于GC的影響還是存在一定的差別。而在建立ArrayBlockingQueue時,我們還可以控制對象的内部鎖是否采用公平鎖,預設采用非公平鎖。

都是FIFO隊列

正如其他Java集合一樣,連結清單形式的隊列,其存取效率要比數組形式的隊列高

但是在一些并發程式中,數組形式的隊列由于具有一定的可預測性,是以可以在某些場景中獲得更好的效率

另一個不同點在于,ArrayBlockingQueue支援“公平”政策

若在構造函數中指定了“公平”政策為true,可以有效避免一些線程被“餓死”,公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性"

BlockingQueue queue = new ArrayBlockingQueue<>(3, true);

總體而言,LinkedBlockingQueue是阻塞隊列的最經典實作,在不需要“公平”政策時,基本上使用它就夠了

所謂公平通路隊列是指阻塞的線程,可以按照阻塞的先後順序通路隊列,即先阻塞的線程先通路隊列

非公平性是對先等待的線程是非公平的,當隊列有可用空間時,阻塞的線程都可以争奪通路隊列的資格,有可能先阻塞的線程最後才通路隊列

為保證公平性,通常會降低吞吐量.我們可以使用以下代碼建立一個公平的阻塞隊列

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
           
Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

通路者的公平性是使用可重入鎖實作的

2.2 SynchronousQueue同步隊列

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

比較特殊的阻塞隊列,它具有以下幾個特點:

  1. 一個插入方法的線程必須等待另一個線程調用取出
  2. 隊列沒有容量Capacity(或者說容量為0),事實上隊列中并不存儲元素,它隻是提供兩個線程進行資訊交換的場所
  3. 由于以上原因,隊列在很多場合表現的像一個空隊列。不能對元素進行疊代,不能peek元素,poll會傳回null
  4. 隊列中不允許存入null元素
  5. SynchronousQueue如同ArrayedBlockingQueue一樣,支援“公平”政策

下面是一個例子,5個Producer産生産品,存入隊列

5個Consumer從隊列中取出産品,進行消費。

public class SynchronizeQueueExam {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>(false);
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            service.submit(new Producer(queue, "Producer" + i));
        }
        for (int i = 0; i < 5; i++) {
            service.submit(new Consumer(queue, "Consumer" + i));
        }
        service.shutdown();
    }

    static class Producer implements Runnable {
        private final SynchronousQueue<String> queue;
        private final String name;
        private static Random rand = new Random(47);
        private static AtomicInteger productID = new AtomicInteger(0);

        Producer(SynchronousQueue<String> queue, String name) {
            this.queue = queue;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {
                    TimeUnit.SECONDS.sleep(rand.nextInt(5));
                    String str = "Product" + productID.incrementAndGet();
                    queue.put(str);
                    System.out.println(name + " put " + str);
                }
                System.out.println(name + " is over.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    static class Consumer implements Runnable {
        private final SynchronousQueue<String> queue;
        private final String name;

        Consumer(SynchronousQueue<String> queue, String name) {
            this.queue = queue;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {
                    String str = queue.take();
                    System.out.println(name + " take " + str);
                }
                System.out.println(name + " is over.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           

2.3 PriorityBlockingQueue優先級阻塞隊列

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列
  1. 隊列中的元素總是按照“自然順序”排序,或者根據構造函數中給定的Comparator進行排序
  2. 隊列中不允許存在null,也不允許存在不能排序的元素
  3. 對于排序值相同的元素,其序列是不保證的,當然你可以自己擴充這個功能
  4. 隊列容量是沒有上限的,但是如果插入的元素超過負載,有可能會引起OOM
  5. 使用疊代子iterator()對隊列進行輪詢,其順序不能保證
  6. 具有BlockingQueue的put和take方法,但是由于隊列容量沒有上線,是以put方法是不會被阻塞的,但是take方法是會被阻塞的
  7. 可以給定初始容量,這個容量會按照一定的算法自動擴充

下面是一個PriorityBlockingQueue的例子,例子中定義了一個按照字元串倒序排列的隊列

5個生産者不斷産生随機字元串放入隊列

5個消費者不斷從隊列中取出随機字元串

同一個線程取出的字元串基本上是倒序的(因為不同線程同時存元素,是以取的字元串列印到螢幕上往往不是倒序的了)

public class PriorityBlockingQueueExam {
    public static void main(String[] args) {
        //建立一個初始容量為3,排序為字元串排序相反的隊列
        PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(3, (o1, o2) -> {
            if (o1.compareTo(o2) < 0) {
                return 1;
            } else if (o1.compareTo(o2) > 0) {
                return -1;
            } else {
                return 0;
            }
        });

        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            service.submit(new Producer("Producer" + i, queue));
        }
        for (int i = 0; i < 5; i++) {
            service.submit(new Consumer("Consumer" + i, queue));
        }
        service.shutdown();
    } 

    static class Producer implements Runnable {
        private final String name;
        private final PriorityBlockingQueue<String> queue;
        private static Random rand = new Random(System.currentTimeMillis());

        Producer(String name, PriorityBlockingQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                String str = "Product" + rand.nextInt(1000);
                queue.put(str);
                System.out.println("->" + name + " put " + str);
                try {
                    TimeUnit.SECONDS.sleep(rand.nextInt(5));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(name + " is over");
        }
    }


    static class Consumer implements Runnable {
        private final String name;
        private final PriorityBlockingQueue<String> queue;
        private static Random rand = new Random(System.currentTimeMillis());

        Consumer(String name, PriorityBlockingQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    String str = queue.take();
                    System.out.println("<-" + name + " take " + str);
                    TimeUnit.SECONDS.sleep(rand.nextInt(5));
                }
                System.out.println(name + " is over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           

2.4 DelayQueue

隊列中隻能存入Delayed接口實作的對象

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

DelayQueue

Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列
Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

DelayQueue中存入的對象要同時實作getDelay和compareTo

  • getDelay方法是用來檢測隊列中的元素是否到期
  • compareTo方法是用來給隊列中的元素進行排序
    Java中的BlockingQueue1 Java中的阻塞隊列2 生産者和消費者例子2 Java裡的阻塞隊列

    DelayQueue持有一個PriorityBlockingQueue

    每個Delayed對象實際上都放入了這個隊列,并按照compareTo方法進行排序

當隊列中對象的getDelay方法傳回的值<=0(即對象已經逾時)時,才可以将對象從隊列中取出

若使用take方法,則方法會一直阻塞,直到隊列頭部的對象逾時被取出

若使用poll方法,則當沒有逾時對象時,直接傳回null

總結來說,有如下幾個特點:

  1. 隊列中的對象都是Delayed對象,它實作了getDelay和compareTo
  2. 隊列中的對象按照優先級(按照compareTo)進行了排序,隊列頭部是最先逾時的對象
  3. take方法會在沒有逾時對象時一直阻塞,直到有對象逾時;poll方法會在沒有逾時對象時傳回null。
  4. 隊列中不允許存儲null,且iterator方法傳回的值不能確定按順序排列

下面是一個列子,特别需要注意getDelay和compareTo方法的實作:

public class DelayQueueExam {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayElement> queue = new DelayQueue<>();
        for (int i = 0; i < 10; i++) {
            queue.put(new DelayElement(1000 * i, "DelayElement" + i));
        }
        while (!queue.isEmpty()) {
            DelayElement delayElement = queue.take();
            System.out.println(delayElement.getName());
        }
    }

    static class DelayElement implements Delayed {
        private final long delay;
        private long expired;
        private final String name;

        DelayElement(int delay, String name) {
            this.delay = delay;
            this.name = name;
            expired = System.currentTimeMillis() + delay;
        }

        public String getName() {
            return name;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expired - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            long d = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }
    }
}
           

DelayQueue通過PriorityQueue,使得逾時的對象最先被處理,将take對象的操作阻塞住,避免了周遊方式的輪詢,提高了性能。在很多需要回收逾時對象的場景都能用上

BlockingDeque阻塞雙向隊列

BlockingDeque中各種特性上都非常類似于BlockingQueue,事實上它也繼承自BlockingQueue,它們的不同點主要在于BlockingDeque可以同時從隊列頭部和尾部增删元素。

是以,總結一下BlockingDeque的四組增删元素的方法:

第一組,抛異常的方法,包括addFirst(e),addLast(e),removeFirst(),removeLast(),getFirst()和getLast();

第二組,傳回特殊值的方法,包括offerFirst(e) ,offerLast(e) ,pollFirst(),pollLast(),peekFirst()和peekLast();

第三組,阻塞的方法,包括putFirst(e),putLast(e),takeFirst()和takeLast();

第四組,逾時的方法,包括offerFirst(e, time, unit),offerLast(e, time, unit),pollFirst(time, unit)和pollLast(time, unit)。

BlockingDeque目前隻有一個實作類LinkedBlockingDeque,其用法與LinkedBlockingQueue非常類似,這裡就不給出執行個體了。

TransferQueue傳輸隊列

TransferQueue繼承自BlockingQueue,之是以将它獨立成章,是因為它是一個非常重要的隊列,且提供了一些阻塞隊列所不具有的特性。

簡單來說,TransferQueue提供了一個場所,生産者線程使用transfer方法傳入一些對象并阻塞,直至這些對象被消費者線程全部取出。前面介紹的SynchronousQueue很像一個容量為0的TransferQueue。

下面是一個例子,一個生産者使用transfer方法傳輸10個字元串,兩個消費者線程則各取出5個字元串,可以看到生産者在transfer時會一直阻塞直到所有字元串被取出:

public class TransferQueueExam {
    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<>();
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new Producer("Producer1", queue));
        service.submit(new Consumer("Consumer1", queue));
        service.submit(new Consumer("Consumer2", queue));
        service.shutdown();
    }

    static class Producer implements Runnable {
        private final String name;
        private final TransferQueue<String> queue;

        Producer(String name, TransferQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }
        @Override
        public void run() {
            System.out.println("begin transfer objects");

            try {
                for (int i = 0; i < 10; i++) {
                    queue.transfer("Product" + i);
                    System.out.println(name + " transfer "+"Product"+i);
                }
                System.out.println("after transformation");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " is over");
        }
    }

    static class Consumer implements Runnable {
        private final String name;
        private final TransferQueue<String> queue;
        private static Random rand = new Random(System.currentTimeMillis());

        Consumer(String name, TransferQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {
                    String str = queue.take();
                    System.out.println(name + " take " + str);
                    TimeUnit.SECONDS.sleep(rand.nextInt(5));
                }
                System.out.println(name + " is over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           

上面的代碼中隻使用了transfer方法,TransferQueue共包括以下方法:

  1. transfer(E e),若目前存在一個正在等待擷取的消費者線程,即立刻移交之;否則會将元素e插入到隊列尾部,并進入阻塞狀态,直到有消費者線程取走該元素。
  2. tryTransfer(E e),若目前存在一個正在等待擷取的消費者線程(使用take()或者poll()函數),即立刻移交之; 否則傳回false,并且不進入隊列,這是一個非阻塞的操作。
  3. tryTransfer(E e, long timeout, TimeUnit unit) 若目前存在一個正在等待擷取的消費者線程,即立刻移交之;否則會将元素e插入到隊列尾部,并且等待被消費者線程擷取消費掉,若在指定的時間内元素e無法被消費者線程擷取,則傳回false,同時該元素被移除。
  4. hasWaitingConsumer() 判斷是否存在消費者線程。
  5. getWaitingConsumerCount() 擷取所有等待擷取元素的消費線程數量。

    再來看兩個生産者和兩個消費者的例子:

public class TransferQueueExam2 {
    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<>();
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(new Producer("Producer1", queue));
        service.submit(new Producer("Producer2", queue));
        service.submit(new Consumer("Consumer1", queue));
        service.submit(new Consumer("Consumer2", queue));
        service.shutdown();
    }

    static class Producer implements Runnable {
        private final String name;
        private final TransferQueue<String> queue;

        Producer(String name, TransferQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }

        @Override
        public void run() {
            System.out.println(name + " begin transfer objects");

            try {
                for (int i = 0; i < 5; i++) {
                    queue.transfer(name + "_Product" + i);
                    System.out.println(name + " transfer " + name + "_Product" + i);
                }
                System.out.println(name + " after transformation");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " is over");
        }
    }

    static class Consumer implements Runnable {
        private final String name;
        private final TransferQueue<String> queue;
        private static Random rand = new Random(System.currentTimeMillis());

        Consumer(String name, TransferQueue<String> queue) {
            this.name = name;
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 5; i++) {
                    String str = queue.take();
                    System.out.println(name + " take " + str);
                    TimeUnit.SECONDS.sleep(rand.nextInt(5));
                }
                System.out.println(name + " is over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           

它的作者Doug Lea 這樣評價它:TransferQueue是一個聰明的隊列,它是ConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 無界的LinkedBlockingQueues等的超集。

是以,在合适的場景中,請盡量使用TransferQueue,目前它隻有一個實作類LinkedTransferQueue。

ConcurrentLinkedQueue并發連結隊列

1 并發與并行

寫到此處時,應該認真梳理一下關于多線程程式設計中的一些名詞了,具體包括多線程(MultiThread)、并發(Concurrency)和并行(Parrellism)。多線程的概念應該是比較清晰的,是指計算機和程式設計語言都提供線程的概念,多個線程可以同時在一台計算機中運作。

而并發和并行則是兩個非常容易混淆的概念,第一種區分方法是以程式在計算機中的執行方式來區分。我稱之為“并發執行”和“并行執行”的區分:

并發執行是指多個線程(例如n個)在一台計算機中宏觀上“同時”運作,它們有可能是一個CPU輪換的處理n個線程,也有可能是m個CPU以各種排程政策來輪換處理n個線程;

并行執行是指多個線程(n個)在一台計算機的多個CPU(m個,m>=n)上微觀上同時運作,并行執行時作業系統不需要排程這n個線程,每個線程都獨享一個CPU持續運作直至結束。

第二種區分方法則是“并發程式設計”和“并行程式設計”的差別:

并發程式設計可以了解為多線程程式設計,并發程式設計的代碼必定以“并發執行”的方式運作;

并行程式設計則是一種更加特殊的程式設計方法,它需要使用特殊的程式設計語言(例如Cilk語言),或者特殊的程式設計架構(例如Parallel Java 2 Library)。另外,我在本系列的第一篇中提到的Fork-Join架構也是一種并行程式設計架構。

2 并發的基礎

了解了并發的概念,我們再來看首次遇到的帶有并發字眼(Concurrent)的類ConcurrentLinkedQueue,并發連結隊列。

目前看來,可以這麼認為,在java.util.concurrency包内,凡是帶有Concurrent字眼的類,都是以CAS為基礎的非阻塞工具類。例如ConcurrentLinkedQueue、ConcurrentLinkedDeque、ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet。

好了,那麼什麼是CAS呢?CAS即CompareAndSwap“比較并交換”,具體的細節将會在後續的關于原子變量(Atomic)章節中介紹。簡而言之,當代的很多CPU提供了一種CAS指令,由于指令運作不會被打斷,是以依賴這種指令就可以設計出一種不需要鎖的非阻塞并發算法,依賴這種算法,就可以設計出各種并發類。

3 各類隊列的例子

下面的例子中,我們使用參數控制,分别測試了四種隊列在多個線程同時存儲變量時的表現:

public class ConcurrentLinkedQueueExam {
    private static final int TEST_INT = 10000000;

    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        Queue<Integer> queue = null;
        if (args.length < 1) {
            System.out.println("Usage: input 1~4 ");
            System.exit(1);
        }
        int param = Integer.parseInt(args[0]);
        switch (param) {
            case 1:
                queue = new LinkedList<>();
                break;
            case 2:
                queue = new LinkedBlockingQueue<>();
                break;
            case 3:
                queue = new ArrayBlockingQueue<Integer>(TEST_INT * 5);
                break;
            case 4:
                queue = new ConcurrentLinkedQueue<>();
                break;
            default:
                System.out.println("Usage: input 1~4 ");
                System.exit(2);
        }
        System.out.println("Using " + queue.getClass().getSimpleName());

        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            service.submit(new Putter(queue, "Putter" + i));
        }
        TimeUnit.SECONDS.sleep(2);
        for (int i = 0; i < 5; i++) {
            service.submit(new Getter(queue, "Getter" + i));
        }
        service.shutdown();
        service.awaitTermination(1, TimeUnit.DAYS);
        long end = System.currentTimeMillis();
        System.out.println("Time span = " + (end - start));
        System.out.println("queue size = " + queue.size());
    }

    static class Putter implements Runnable {
        private final Queue<Integer> queue;
        private final String name;

        Putter(Queue<Integer> queue, String name) {
            this.queue = queue;
            this.name = name;
        }


        @Override
        public void run() {
            for (int i = 0; i < TEST_INT; i++) {
                queue.offer(1);
            }
            System.out.println(name + " is over");
        }
    }

    static class Getter implements Runnable {
        private final Queue<Integer> queue;
        private final String name;

        Getter(Queue<Integer> queue, String name) {
            this.queue = queue;
            this.name = name;
        }

        @Override
        public void run() {
            int i = 0;
            while (i < TEST_INT) {
                synchronized (Getter.class) {
                    if (!queue.isEmpty()) {
                        queue.poll();
                        i++;
                    }
                }
            }
            System.out.println(name + " is over");
        }
    }
}
           
輸入1,結果如下:

Using LinkedList
…
Time span = 16613
queue size = 10296577
輸入2,結果如下:

Using LinkedBlockingQueue
…
Time span = 16847
queue size = 0
輸入3,結果如下:

Using ArrayBlockingQueue
…
Time span = 6815
queue size = 0
輸入4,結果如下:

Using ConcurrentLinkedQueue
…
Time span = 22802
queue size = 0
           

分析運作的結果,有如下結論:

第一,非并發類例如LinkedList在多線程環境下運作是會出錯的,結果的最後一行輸出了隊列的size值,隻有它的size值不等于0,這說明在多線程運作時許多poll操作并沒有彈出元素,甚至很多offer操作也沒有能夠正确插入元素。其他三種并發類都能夠在多線程環境下正确運作;

第二,并發類也不是完全不需要注意加鎖,例如這一段代碼:

while (i < TEST_INT) {
    synchronized (Getter.class) {
        if (!queue.isEmpty()) {
            queue.poll();
            i++;
        }
    }
}
           

如果不加鎖,那麼isEmpty和poll之間有可能被其他線程打斷,造成結果的不确定性。

第三,本例中LinkedBlockingQueue和ArrayBlockingQueue并沒有因為生産-消費關系阻塞,因為容量設定得足夠大。它們的元素插入和彈出操作是加鎖的,而ConcurrentLinkedQueue的元素插入和彈出操作是不加鎖的,而觀察性能其實并沒有數量級上的差異(有待進一步測試)。

第四,ArrayBlockingQueue性能明顯好于LinkedBlockingQueue,甚至也好于ConcurrentLinkedQueue,這是因為它的内部存儲結構是原生數組,而其他兩個是連結清單,需要new一個Node。同時,連結清單也會造成更多的GC。

ConcurrentLinkedDeque并發連結雙向隊列

ConcurrentLinkedDeque與ConcurrentLinkedQueue非常類似,不同之處僅在于它是一個雙向隊列。

3 阻塞隊列的實作原理

Java的并發隊列,具體包括BlockingQueue阻塞隊列、BlockingDeque阻塞雙向隊列、TransferQueue傳輸隊列、ConcurrentLinkedQueue并發連結隊列和ConcurrentLinkedDeque并發連結雙向隊列。BlockingQueue和BlockingDeque的内部使用鎖來保護元素的插入彈出操作,同時它們還提供了生産者-消費者場景的阻塞方法;TransferQueue被用來在多個線程之間優雅的傳遞對象;ConcurrentLinkedQueue和ConcurrentLinkedDeque依靠CAS指令,來實作非阻塞的并發算法。

若隊列為空,消費者會一直等待,當生産者添加元素時,消費者是如何知道目前隊列有元素的呢?讓我們看看JDK是如何實作的。

使用通知模式實作。所謂通知模式,就是當生産者往滿的隊列裡添加元素時會阻塞住生産者,當消費者消費了一個隊列中的元素後,會通知生産者目前隊列可用。通過檢視源碼發現ArrayBlockingQueue使用了Condition來實作,代碼如下。

private final Condition notFull;
    private final Condition notEmpty;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        // 省略其他代碼
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
     private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

           

當往隊列裡插入一個元素時,如果隊列不可用,那麼阻塞生産者主要通過

LockSupport.park(this)來實作。

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
           

繼續進入源碼,發現調用setBlocker先儲存一下将要阻塞的線程,然後調用unsafe.park阻塞目前線程。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }
           

unsafe.park是個native方法,代碼如下。

public native void park(boolean isAbsolute, long time);
           

park這個方法會阻塞目前線程,隻有以下4種情況中的一種發生時,該方法才會傳回。

  • 與park對應的unpark執行或已經執行時。“已經執行”是指unpark先執行,然後再執行park的情況。
  • 線程被中斷時。
  • 等待完time參數指定的毫秒數時。
  • 異常現象發生時,這個異常現象沒有任何原因。