介紹java DelayQueue類
本文我們讨論java.util.concurrent 包中的DelayQueue類,是用于生産者和消費者模式的阻塞隊列。它有一個非常有用的特性————當消費者想從隊列中取消息時,隻能擷取已經過期一段時間的元素(延遲擷取)。
在DelayQueue隊列中實作元素延遲
每個需要放入DelayQueue隊列元素需要實作Delayed接口,下面我們建立DelayObject 類,其執行個體對象将被放入DelayQueue中。其構造函數包括字元串類型資料及延遲毫秒變量:
public class DelayObject implements Delayed {
private String data;
private long startTime;
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
startTime 變量————隊列中元素被消費的時間。接下來需要實作getDelay方法,其傳回隊列中對象剩餘延遲時間,根據參數确定時間機關,是以我們需要使用TimeUnit.convert() 方法傳回響應機關的時間量:
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
當消費者嘗試從隊列中擷取元素時,DelayQueue 隊列執行getDelay()方法查找允許從隊列中傳回的對象。如果方法傳回0或負數,則可以從隊列中擷取元素。
同時也需要實作compareTo() 方法,因為DelayQueue 隊列中的元素根據有效期進行排序。先要過期的元素在隊列的首位,而擁有最長有效期的元素在隊尾:
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
通過延遲隊列實作生産者和消費者程式
為了測試DelayQueue 隊列,我們需要實作生産者和消費者邏輯。生産者類需要隊列、生産元素的數量以及每個元素延遲的毫秒作為參數。
然後執行run方法,在隊列中加入元素後并休眠500毫秒:
public class DelayQueueProducer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;
// standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
消費者實作類似,但包括跟蹤消費消息的數量:
public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
DelayQueue 應用測試
為了測試DelayQueue,我們建立一個生産者線程即一個消費者線程。生産者put兩個對象至隊列中,每個對象需要500毫秒延遲,測試斷言消費者消費2個消息:
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
我們能觀察到運作程式輸出為:
Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007}
Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}
生産者在隊列中加入第一個元素,等待過期後被消費。第二個元素也一樣。
消費者在給定的時間内不能消費
生産者生産元素需要10秒後過期:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
啟動測試程式,但5秒後終止。根據DelayQueue性質,消費者不能從隊列中消費未過期的消息:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
是以,numberOfConsumedElements 數量為0.
生産立即過期元素
如果實作延遲消息的getDelay() 方法傳回負值,那意味着元素已經過期。這時消費者将立刻消費該消息。下面測試生産帶負延遲的消息場景:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
當我們開始這個測試場景,消費者将立刻消費元素,因為元素已過期:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
總結
本文我們學習了DelayQueue資料結構。通過執行個體實作生産延遲元素的生産者和消費者程式。利用DelayQueue隊列的特性,消費者隻能消費已經過期的元素。