天天看點

Java8使用并行流(ParallelStream)注意事項

Java8并行流ParallelStream和Stream的差別就是支援并行執行,提高程式運作效率。但是如果使用不當可能會發生線程安全的問題。Demo如下:

public static void concurrentFun() {
        List<Integer> listOfIntegers =
                new ArrayList<>();
        for (int i = 0; i <100; i++) {
            listOfIntegers.add(i);
        }
        List<Integer> parallelStorage = new ArrayList<>() ;
        listOfIntegers
                .parallelStream()
                .filter(i->i%2==0)
                .forEach(i->parallelStorage.add(i));
        System.out.println();

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));

        System.out.println();
        System.out.println("Sleep 5 sec");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));
    }           

複制

程式運作結果如下:

null 72 56 58 60 74 34 36 68 70 54 28 30 50 52 26 16 44 12 14 48 22 46 40 24 42 18 20 38 6 8 10 0 null 4 82 66 84 86 78 80 76 62 64 90 92 94 88 96 98 
Sleep 5 sec
null 72 56 58 60 74 34 36 68 70 54 28 30 50 52 26 16 44 12 14 48 22 46 40 24 42 18 20 38 6 8 10 0 null 4 82 66 84 86 78 80 76 62 64 90 92 94 88 96 98            

複制

除了以上在ForEach裡面添加集合元素會出現這種問題,以下這種方式也會:

listOfIntegers
                .parallelStream()
                .map(e -> {
                    parallelStorage.add(e);
                    return e;
                })
                .forEachOrdered(e -> System.out.print(e + " "));           

複制

兩個問題:

1.為什麼parallelStorage的大小不固定?

2.為什麼parallelStorage會有null元素?

最初我以為是因為主線程執行完成後并行流中的線程并未結束,sleep了主線程後發現結果并沒有發生改變,其實我們可以認為ArrayList内部維護了一個數組Arr其定義一個變量 n用以表式這個數組的大小那麼向這個ArrayList中存儲資料的過程可以分解為這麼幾步:

1.讀取數組的長度存入n

2.向這個數組中儲入元素arr[n]=a

3.将n+1

4.儲存n

而對于parrallelStorage元素數量不固定的原因就是多線程有可能同時讀取到相同的下标n同時指派,這樣就會出現元素缺失的問題了

如何解決這個問題呢?我們可以将其轉化為一個同步集合也就是

Collections.synchronizedList(new ArrayList<>())            

複制

在使用并行流的時候是無法保證元素的順序的,也就是即使你用了同步集合也隻能保證元素都正确但無法保證其中的順序。

除了以上這種方式,還有什麼方法可以防止并行流出現線程不安全操作?

那就是最後調用collect(Collectors.tolist()),這種收集起來所有元素到新集合是線程安全的。Demo如下:

public static void collectFun() {
        List<Integer> listOfIntegers =
                new ArrayList<>();

        for (int i = 0; i <100; i++) {
            listOfIntegers.add(i);
        }

        List<Integer> parallelStorage = listOfIntegers
                .parallelStream()
                .filter(i -> i % 2 == 0)
                .collect(Collectors.toList());


        System.out.println();

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));

        System.out.println();
        System.out.println("Sleep 5 sec");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));
    }           

複制

程式運作結果如下:

0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 
Sleep 5 sec
0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98            

複制

不光沒有出現Null和數量不一緻問題,還排序了!是以,在采用并行流收集元素到集合中時,最好調用collect方法,一定不要采用Foreach方法或者map方法。