天天看點

Java核心技術知識點筆記—并行流

作者:書生聽雨景闌珊林

前言:流使得并行處理塊操作變得容易。這個過程幾乎是自動的,但需遵守一些規則。

1、擷取并行流:

(1)使用Collection.parallelStream()方法從任何集合中擷取一個并行流:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");
Stream<String> parallelStream = list.parallelStream();           
Java核心技術知識點筆記—并行流

(2)使用Stream.parallel()方法将任意順序流轉換為并行流:

Stream<String> stream = Stream.of("a", "b", "c", "d", "e");
Stream<String> parallelled = stream.parallel();           
Java核心技術知識點筆記—并行流

2、注意:

(1)隻要在終結方法執行時,流處于并行模式,那麼所有的中間流操作都将被并行化。

(2)流操作并行運作時,其目标是讓其傳回結果與順序執行時傳回的結果相同。重要的是,這些操作可以任意順序執行。

(3)示例:假設要對字元串流中的短單詞計數,下面的方法将非常糟糕:

int[] shortWords = new int[12];
Stream<String> stream = Stream.of("abc", "bdfafasf", "c", "ddwadf", "dsadfafe");
stream.parallel().forEach(s -> {
    if (s.length() < 12) {
        shortWords[s.length()]++;
    }
});
System.out.print(Arrays.toString(shortWords));           
Java核心技術知識點筆記—并行流

上例中,傳遞給forEach的函數會在多個并發線程中運作,每個都會更新共享的數組,這是一種經典的競争情況。多次運作這個程式将會發現每次的結果都不一樣,并且結果都是錯的。

為了得到正确的結果,要確定傳遞給并行流操作的任何函數都可以安全地并行執行,達到這個目的的最佳方式是遠離易變狀态。例如,用長度将字元串群組,然後分别進行計數:

Stream<String> stream = Stream.of("abc", "bdfafasf", "c", "ddwadf", "dsadfafe");
Map<Integer, Long> collect = stream.parallel()
        .filter(s -> s.length() < 5)
        .collect(Collectors.groupingBy(String::length,
                Collectors.counting()));
System.out.print(collect);           
Java核心技術知識點筆記—并行流

注意:不要修改在執行某項流操作後會将元素傳回到流中的集合(即使這種修改是線程安全的)。流并不會收集它們的資料,資料總是在單獨的集合中。如果修改了這樣的集合,那麼流操作的結果就是未定義的。更準确地将,因為中間的流操作都是惰性的,是以直到執行終結操作時才對集合進行修改仍舊是可行的。讓并行流正常工作需要滿足大量的條件:

(1)傳遞給并行流操作的函數不應被阻塞。并行流使用fork-join池來操作流的各個部分,如果多個流操作被阻塞,池可能就無法執行任何操作。

(2)資料應該在記憶體中。必須等到資料到達是非常低效的。

(3)流應該可以被高效地分成若幹個字部份。由數組或平衡二叉樹支撐的流都可以工作得很好,但Stream.iterate傳回的結果不行。

(4)流操作的工作量應該具有較大的規模。如果總工作負載并不是很大,那麼搭建并行計算時所付出的代價就沒有意義。

(5)隻有在對已經位于記憶體中的資料執行大量計算操作時,才應該使用并行流。

繼續閱讀