天天看點

介紹java DelayQueue類

介紹java DelayQueue類

本文我們讨論java.util.concurrent 包中的DelayQueue類,是用于生産者和消費者模式的阻塞隊列。它有一個非常有用的特性————當消費者想從隊列中取消息時,隻能擷取已經過期一段時間的元素(延遲擷取)。

在DelayQueue隊列中實作元素延遲

每個需要放入DelayQueue隊列元素需要實作Delayed接口,下面我們建立DelayObject 類,其執行個體對象将被放入DelayQueue中。其構造函數包括字元串類型資料及延遲毫秒變量:

public class DelayObject implements Delayed {
    private String data;
    private long startTime;
 
    public DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }
           

startTime 變量————隊列中元素被消費的時間。接下來需要實作getDelay方法,其傳回隊列中對象剩餘延遲時間,根據參數确定時間機關,是以我們需要使用TimeUnit.convert() 方法傳回響應機關的時間量:

@Override
public long getDelay(TimeUnit unit) {
    long diff = startTime - System.currentTimeMillis();
    return unit.convert(diff, TimeUnit.MILLISECONDS);
}
           

當消費者嘗試從隊列中擷取元素時,DelayQueue 隊列執行getDelay()方法查找允許從隊列中傳回的對象。如果方法傳回0或負數,則可以從隊列中擷取元素。

同時也需要實作compareTo() 方法,因為DelayQueue 隊列中的元素根據有效期進行排序。先要過期的元素在隊列的首位,而擁有最長有效期的元素在隊尾:

@Override
public int compareTo(Delayed o) {
    return Ints.saturatedCast(
      this.startTime - ((DelayObject) o).startTime);
}
           

通過延遲隊列實作生産者和消費者程式

為了測試DelayQueue 隊列,我們需要實作生産者和消費者邏輯。生産者類需要隊列、生産元素的數量以及每個元素延遲的毫秒作為參數。

然後執行run方法,在隊列中加入元素後并休眠500毫秒:

public class DelayQueueProducer implements Runnable {
  
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToProduce;
    private Integer delayOfEachProducedMessageMilliseconds;
 
    // standard constructor
 
    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToProduce; i++) {
            DelayObject object
              = new DelayObject(
                UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object: " + object);
            try {
                queue.put(object);
                Thread.sleep(500);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}
           

消費者實作類似,但包括跟蹤消費消息的數量:

public class DelayQueueConsumer implements Runnable {
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToTake;
    public AtomicInteger numberOfConsumedElements = new AtomicInteger();
 
    // standard constructors
 
    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) {
            try {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           

DelayQueue 應用測試

為了測試DelayQueue,我們建立一個生産者線程即一個消費者線程。生産者put兩個對象至隊列中,每個對象需要500毫秒延遲,測試斷言消費者消費2個消息:

@Test
public void givenDelayQueue_whenProduceElement
  _thenShouldConsumeAfterGivenDelay() throws InterruptedException {
    // given
    ExecutorService executor = Executors.newFixedThreadPool(2);
     
    BlockingQueue<DelayObject> queue = new DelayQueue<>();
    int numberOfElementsToProduce = 2;
    int delayOfEachProducedMessageMilliseconds = 500;
    DelayQueueConsumer consumer = new DelayQueueConsumer(
      queue, numberOfElementsToProduce);
    DelayQueueProducer producer = new DelayQueueProducer(
      queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
 
    // when
    executor.submit(producer);
    executor.submit(consumer);
 
    // then
    executor.awaitTermination(5, TimeUnit.SECONDS);
    executor.shutdown();
  
    assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
           

我們能觀察到運作程式輸出為:

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
           

生産者在隊列中加入第一個元素,等待過期後被消費。第二個元素也一樣。

消費者在給定的時間内不能消費

生産者生産元素需要10秒後過期:

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
  queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
           

啟動測試程式,但5秒後終止。根據DelayQueue性質,消費者不能從隊列中消費未過期的消息:

executor.submit(producer);
executor.submit(consumer);
 
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
           

是以,numberOfConsumedElements 數量為0.

生産立即過期元素

如果實作延遲消息的getDelay() 方法傳回負值,那意味着元素已經過期。這時消費者将立刻消費該消息。下面測試生産帶負延遲的消息場景:

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
           

當我們開始這個測試場景,消費者将立刻消費元素,因為元素已過期:

executor.submit(producer);
executor.submit(consumer);
 
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
           

總結

本文我們學習了DelayQueue資料結構。通過執行個體實作生産延遲元素的生産者和消費者程式。利用DelayQueue隊列的特性,消費者隻能消費已經過期的元素。

繼續閱讀