天天看點

高效實作Java并發程式設計之Queue

在實際的軟體開發過程中,經常會遇到如下場景:某個子產品負責産生資料,這些資料由另一個子產品負責處理。

産生資料的子產品形象地稱為生産者;

而處理資料的子產品就稱為消費者。

生産者和消費者之間通常還有一個緩沖,生産者把資料放入緩沖,而消費讀取緩沖中的資料,這樣的好處是:

1.支援解耦:生産者和消費者不需要知道對方的資訊,比如郵件投遞,隻需要把郵件交給郵差就行,郵差如何把郵件送到,使用者不需要關心。

2.異步:生産者把資料放入緩存即可傳回,不需要等待消費者處理完畢。這樣的好處是提高了生産者的性能。

3.隔離:生産子產品和消費子產品出現異常不會互相影響。比如生産者出現異常無法生産資料,但不影響消費者消費。

反之,如果消費子產品暫時出了問題不能消費,那麼生産者還可以把資料放入緩沖,消費者從故障中恢複後可以繼續消費資料。

這種緩沖在 Java 中通過 BlockingQueue 子類實作。消息中間件(比如 RabbitMQ)也提供了這種緩沖功能,如下圖所示。

高效實作Java并發程式設計之Queue

示意圖

盡管提供了隔離特性,但當消費者出現故障或消費資料緩慢時,生産者産生的資料放入隊列逐漸累積成海量資料,将導緻隊列所在的系統出現記憶體溢出等故障。

即使消費者端的故障恢複,但也可能消費過時的資料。一般處理方法是丢棄,或者在丢棄前儲存資料,比如 RabbitMQ提供死信來儲存丢棄的資料。

BlockingQueue 類實作了緩沖,線程生成的資料放到 BlockingQueue 中,消費線程從BlockingQueue 中獲得資料。

BlockingQueue 的核心方法如下:

一、 放入資料:

m offer(anObject):表示如果可能的話,将 anObject 加到 BlockingQueue 中,即如果BlockingQueue 可以容納,則傳回 true,否則傳回 false。

m offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間内,還不能往隊列中加入 BlockingQueue,則傳回失敗。

m put(anObject):把 anObject 加到 BlockingQueue 中,如果 BlockQueue 沒有空間,則調用此方法的線程被阻斷,直到 BlockingQueue 裡面有空間再繼續加入。

二、擷取資料:

m poll(time):取走 BlockingQueue 中排在首位的對象,若不能立即取出,則可以等 time參數規定的時間過後再取,取不到時傳回 null。

m poll(long timeout, TimeUnit unit):從 BlockingQueue 取出一個隊首的對象,如果在指定時間内,隊列一旦有資料可取,則立即傳回隊列中的資料。否則直到時間逾時還沒有資料可取,傳回失敗。

m take():取走 BlockingQueue 中排在首位的對象,若 BlockingQueue 為空,則阻斷進入等待狀态直到 BlockingQueue 有新的資料被加入。

m drainTo():一次性從 BlockingQueue 中擷取所有可用的資料對象(還可以指定擷取資料的個數),通過該方法可以提升擷取資料效率;不需要多次分批加鎖或釋放鎖。

常見的 BlockingQueue 有如下圖所示的子類。

高效實作Java并發程式設計之Queue

ArrayBlockingQueue:基于數組的阻塞隊列實作,在 ArrayBlockingQueue 内部維護了一個定長數組,以便緩存隊列中的資料對象。

是以 ArrayBlockingQueue 容納的資料是有限的,如果隊列滿,則生産者線程無法再放入新的資料,線程阻塞。

我們還可以控制 ArrayBlockingQueue 的内部鎖是否采用公平鎖,預設采用非公平鎖。

LinkedBlockingQueue:基于連結清單的阻塞隊列,同 ArrayListBlockingQueue 類似,其内部也維持着一個資料緩沖隊列,該隊列由一個連結清單構成,連結清單既可以固定大小,也可以不限制容量。

在不限制容量情況下,如果生産者的速度大于消費者的速度,則LinkedBlockingQueue 的容量會不斷增加,系統記憶體就有可能被消耗殆盡。

實際系統建議使用固定大小的 ArrayListBlockingQueue。

因為 LinkedBlockingQueue 内部是連結清單構成的,是以每次新增資料都會建立一個 Node對象,消費資料後删除 Node 對象,這樣會造成垃圾回收負擔。

從性能角度看,不如ArrayBlockingQueue 性能好,對虛拟機影響較大。

PriorityBlockingQueue:基于優先級的無大小限制的隊列,通過構造函數傳入的Comparator 對象來決定優先級。

BlockingQueue 都是先進先出的,PriorityBlockingQueue可以設定元素的優先級,比如優先級高的請求放入 PriorityBlockingQueue,會優先被處理。

DelayQueue:DelayQueue 是一個沒有限制容量的隊列,隻有當元素指定的延遲時間到了,才能夠從隊列中擷取到該元素。

SynchronousQueue:一種無緩沖的等待隊列,線程調用 take 擷取元素,必須等待另外一個線程調用 put 設定 SynchronousQueue 隊列的元素,隊列沒有任何容量,這是一種快速傳遞元素的方式。

也就是說,在這種情況下,元素總是以最快的方式從插入者(生産者)傳遞給移除者(消費者),這在多任務隊列中是最快的處理任務的方式。HikariCP連接配接池使用 SynchronousQueue 來擷取一個可用的資料庫連接配接。

内容摘自《高性能Java系統權威指南》第三章

高效實作Java并發程式設計之Queue

李家智 著

本書特點:

内容上,總結作者從事Java開發20年來在頭部IT企業的高并發系統經曆的真實案例,極具參考意義和可讀性。

對于程式員和架構師而言,Java 系統的性能優化是一個超正常的挑戰。這是因為 Java 語言和 Java 運作平台,以及 Java 生态的複雜性決定了 Java 系統的性能優化不再是簡單的更新配置或者簡單的 “空間換時間”的技術實作,這涉及 Java 的各種知識點。

本書從高性能、易維護、代碼增強以及在微服務系統中編寫Java代碼的角度來描述如何實作高性能Java系統,結合真實案例,讓讀者能夠快速上手實戰。

風格上,本書的風格偏實戰,讀者可以下載下傳書中的示例代碼并運作測試。讀者可以從任意一章開始閱讀,掌握性能優化知識為公司的系統所用。

本書适合:

中進階程式員和架構師;

以及有志從事基礎技術研發、開源工具研發的極客閱讀;

也可以作為 Java 筆試和面試的參考書。

繼續閱讀