看了前兩篇你肯定已經了解了 java 并發程式設計的低層建構。然而,在實際程式設計中,應該經可能的遠離低層結構,畢竟太底層的東西用起來是比較容易出錯的,特别是并發程式設計,既難以調試,也難以發現問題,我們還是使用由并發處理的專業人員實作的較高層次的結構要友善、安全得多。
阻塞隊列
對于許多線程問題,都可以使用一個或多個隊列來安全、優雅的進行資料的傳遞。比如經典的生産者--消費者問題,生産者不停的生成某些資料,消費者需要處理資料,在多線程環境中,如何安全的将資料從生産者線程傳遞到消費者線程?
無需使用鎖和條件對象,java 自帶的阻塞隊列就能夠完美的解決這個問題。阻塞隊列中所有方法都是線程安全的,是以我們進行讀取、寫入操作時無需考慮并發問題。阻塞隊列主要有以下幾種方法:
方法 | 正常結果 | 異常結果 |
---|---|---|
add | 添加一個元素 | 隊列滿,抛出 IllegalStateException 異常 |
element | 傳回隊列頭元素 | 隊列空,抛出 NoSuckElementException 異常 |
offer | 添加一個元素,傳回 true | 隊列滿,傳回 false |
peek | 傳回隊列的頭元素 | 隊列空,傳回 null |
poll | 移出并傳回隊列頭元素 | 隊列空,傳回 null |
put | 添加一個元素 | 隊列滿,阻塞 |
remove | 移出并傳回頭元素 | 隊列空,抛出 NoSuckElementException 異常 |
take | 移出并傳回頭元素 | 隊列空,則阻塞 |
上面的方法主要分成了三類,第一類:異常情況下抛出異常;第二類:異常情況傳回 false/null;第三類:異常情況下阻塞。可以根據自身情況選擇合适的方法來操作隊列。
阻塞隊列的實作
在 java.util.concurrent 包中,提供了阻塞隊列的幾種實作,目前也可以自己實作 BlockingQueue 接口,實作自己的阻塞隊列。
- LinkdedBlockingQueue:鍊式阻塞隊列。一般情況下鍊式的結構容量都是沒有上限的,但是也可以選擇手動指定最大容量。
- LinkdedBlockingDeque:鍊式阻塞雙端隊列。
- PriorityBlockingQueue:優先級隊列。按照優先級移出,無容量上限。
- ArrayBlockingQueue:數組隊列,需指定容量。可選指定是否需要公平性,如果設定了公平性,等待了最長時間的線程會優先得到處理,但是會降低性能。
延遲隊列
DelayQueue 也是阻塞隊列的一種,不過它要求隊列中的元素實作
Delayed
接口。需要重新兩個方法:
- long getDelay(TimeUnit unit)傳回延遲的時間,負值表示延遲結束,隻有延遲結束的情況下,元素才能從隊列中移出。
- int compareTo(Delayed o)比較方法,DelayQueue 使用該方法對元素進行排序。
傳遞隊列
在 Java SE 7 中新增了一個 TransferQueue 接口,允許生産者等待,直到消費者消費了某個元素。原本生産者消費者是沒有關系的,生産者并不知道某個元素是否被消費者消費了。通過此接口可以讓生産者知道某個元素确實被消費了。如果生産者調用:
q.transer(item)
方法,這個調用會阻塞,知道 item 被消費線程取出消費。LinkedTransferQueue 實作了此接口。
線程安全的集合
如果多個線程并發的操作集合,會很容易出現問題,我們可以選擇鎖來保護共享資料,但是更好的選擇是使用線程安全的集合來作為替代。本節介紹 Java 類庫中提供的線程安全的集合(上一節介紹的阻塞隊列也在其中)。
這類集合,size 是通過便利得出的,較慢。而且如果 size 數量大于 20 億,有可能超過 int 的範圍,使用 size 方法無法擷取到大小,在 java8 中引入了 mappingCount 方法,傳回值類型為 long。
映射 map
映射是日常使用中非常常見的一種資料結構。共有以下幾種線程安全的映射:
- ConcurrentSkipListMap:有序映射,根據鍵排序
- ConcurrentHashMap:無序映射
映射條目的原子更新
一旦涉及到多線程環境,做啥都比較麻煩,比如更新一個 map 中某個鍵值對的值,下面的操作顯然是不正确的:
int old = map.get(key);
map.put(key,old+1);
假如有兩個線程同時操作一個 key,雖然 put 方法是線程安全的,但是由于兩個線程之前讀取的 old 是一樣的,這樣就會導緻某個線程的修改被覆寫掉。
有以下幾種安全的更新方法:
- 使用 repalce(key,oldValue,newValue)方法,此方法會在 key,oldValue 完全比對時将 oldValue 換為 newValue 傳回 true,否則傳回 false。
- 使用 AtomicLong 或者 LongAdder 作為映射的值,這兩個的操作方法是原子性的,是以可以安全的修改值。 3.使用 compute 類似方法完成更新。比如下面的:
# 如果key不再map中,v的值為null
map.compute(key,(k,v)->v==null?1:v+1);
# 如果不存在key
map.computeIfAbsent(key,key->new LongAdder())
# 如果存在key
map.computeIfPresent(key,key->key+1)
# 和compute方法類似,不過不處理鍵
map.merge(key,value,(existingValue,newValue)->existingValue+newValue+1)
批操作
java8 引入的,即使有其他線程在處理映射,批操作也能安全的執行。批操作會周遊映射,處理便利過程中找到的元素,且無需當機目前映射的快照。顯然通過批操作擷取的結果不是完全精确的,因為周遊過程中,元素可能會被改變。
有以下三種不同的操作:
- 搜尋(search),周遊結果直到傳回一個非 null 的結果
- 歸約(reduce),組合所有鍵或值,需提供累加函數
-
forEach,周遊所有的鍵值對
每個操作都有 4 個版本:
- operationKeys:處理鍵
- operationValues:處理值
- operation:處理鍵值
- operationEntries:處理需要 map.Entry 對象
并發集合
線程安全的 set 集合隻有以下一種:
-
ConcurrentSkipListSet:有序 set
如果我們想要一個 hash 結構的,線程安全的 set,有以下幾種辦法.
- 通過 ConcurrentHashMap.<Key>newKeySet()生成一個 Set,比如:
Set<String> sets = ConcurrentHashMap.<String>newKeySet();
這其實隻是 ConcurrentHashMap<Key,Boolean>的一個包裝器,所有的值都為 true
- 通過現有映射對象的 keySet 方法,生成這個映射的鍵集。如果删除這個集的某個元素,映射上對于元素也會被删除。但是不能添加元素,因為沒有相應的值。java8 新增了一個 keySet 方法,可以設定一個預設值,這樣就能為向集合中增加元素。
數組
在 Concurrent 包中隻有一個
CopyOnWriteArrayList
數組。該數組所有的修改都會對底層數組進行複制,也就是每插入一個元素都會将原來的數組複制一份并加入新的元素。
當建構一個疊代器時,疊代器指向的是目前數組的引用,如果後來數組被修改了,疊代器指向的任然是舊的數組。
任何集合類都可以通過使用同步包裝器變成線程安全的,如下:
//線程安全的清單
List<String> list1 = Collections.synchronizedList(new ArrayList<>());
//線程安全的map
Map<String,String> map1 = Collections.synchronizedMap(new HashMap<>());
//線程安全的set
Set<String> set1 = Collections.synchronizedSet(new HashSet<>());
并行數組算法
在 java 8 中,Arrays 類提供了大量的并行化操作。
- Arrays.parallelSort
對一個基本資料類型或對象的數組進行排序
- Arrays.paralletSetAll
用一個函數計算得到的值填充一個數組。這個函數接收元素索引,然後計算值。例如:
# 将所有值加上對于的序号
Arrays.parallelSetAll(arr,i->i+ arr[i]);
- parallelPrefix
用對應一個給定結合操作的字首的累加結果替換各個數組元素。看文字描述不太容易看懂,這裡用一個例子說明:
int[] arr = {1,2,3,4}
Arrays.parallelPrefix(arr,(x,y)->x*y);
// arr變成:[1,1*2,1*2*3,1*2*3*4]
本文原創釋出與::https://www.tapme.top/blog/detail/2019-04-10