天天看點

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

什麼是流

集合是Java中使用最多的API 。 可以讓你把資料分組并加以處理。盡管集合對于幾乎任何一個Java應用都是不可或缺的,但集合操作卻遠遠算不上完美。

流是Java API的新成員,它允許你以聲明性方式處理資料集合(通過查詢語句來表達,而不是臨時編寫一個實作)。就現在來說,可以把它們看成周遊資料集的進階疊代器。此外,流還可以透明地并行處理,無需寫任何多線程代碼 !

Java 8中的集合支援一個新的stream 方法,它會傳回一個流(接口定義在

java.util.stream.Stream

裡)

  • 元素序列
就像集合一樣,流也提供了一個接口,可以通路特定元素類型的一組有序值。因為集合是資料結構,是以它的主要目的是以特定的時間/空間複雜度存儲和通路元素(如 ArrayList 與 LinkedList )。但流的目的在于表達計算,比如 filter 、 sorted 和 map 。集合講的是資料,流講的是計算。
流會使用一個提供資料的源,如集合、數組或輸入/輸出資源。 請注意,從有序集合生成流時會保留原有的順序。由清單生成的流,其元素順序與清單一緻。
  • 資料處理操作
流的資料處理功能支援類似于資料庫的操作,以及函數式程式設計語言中的常用操作,如 filter 、 map 、 reduce 、 find 、 match 、 sort 等。流操作可以順序執行,也可并行執行

此外,流操作有兩個重要的特點

  • 流水線

很多流操作本身會傳回一個流,這樣多個操作就可以連結起來,形成一個大

的流水線。 流水線的操作可以看作對資料源進行資料庫式查詢。

  • 内部疊代
與使用疊代器顯式疊代的集合不同,流的疊代操作是在背後進行的。

執行個體解釋

/**
     * 需求: 卡路裡前三的dish的名字
     * @param dishList
     * @return
     */
    public static List<String> getTop3HighCalories(List<Dish> dishList){
        return dishList.stream().filter(dish->dish.getCalories()>300)
                .map(Dish::getName)
                .limit(3)
                .collect(Collectors.toList());
    }           

複制

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理
Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理
  • 先是對 menu 調用 stream 方法,得到一個流。 資料源是dishList ,它給流提供一個元素清單
  • 接下來,對流應用一系列資料處理操作: filter 、 map 、 limit除了 collect 之外,所有這些操作都會傳回另一個流,這樣它們就可以接成一條 流水線 , 于是就可以看作對源的一個查詢
  • 最後, collect 操作開始處理流水線,并傳回結果(它和别的操作不一樣,因為它傳回的不是流,在這裡是一個 List )

在調用 collect 之前,沒有任何結果産生,實際上根本就沒有從 menu 裡選擇元素。可以這麼了解:鍊中的方法調用都在排隊等待,直到調用 collect 。

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理
  • filter :接受Lambda,從流中排除某些元素。在本例中,通過傳遞lambda

    d ->d.getCalories() > 300

    ,選擇出超過300卡路裡的Dish
  • map : 接受一個Lambda,将元素轉換成其他形式或提取資訊。在本例中,通過傳遞方法引用

    Dish::getName

    ,相當于Lambda

    d -> d.getName()

    ,提取了名字。
  • limit :截斷流,使其元素不超過給定數量。
  • collect :将流轉換為其他形式。在本例中,流被轉換為一個清單。 可以把 collect 看作能夠接受各種方案作為參數,并将流中的元素累計成為一個彙總結果的操作。 這裡的toList() 就是将流轉換為清單的方案。

流 VS 集合

Java現有的集合概念和新的流概念都提供了接口,來配合代表元素型有序值的資料接口。

粗略地說,集合與流之間的差異就在于什麼時候進行計算。集合是一個記憶體中的資料結構,它包含資料結構中目前所有的值——集合中的每個元素都得先算出來才能添加到集合中。(你可以往集合裡加東西或者?東西,但是不管什麼時候,集合中的每個元素都是放在記憶體裡的,元素都得先算出來才能成為集合的一部分。)

相比之下,流則是在概念上固定的資料結構(你不能添加或删除元素),其元素則是按需計算的。 是一種生産者?消費者的關系。

從另一個角度來說,流就像是一個延遲建立的集合:隻有在消費者要求的時候才會計算值 。 與此相反,集合則是急切建立的。

以質數為例,要是想建立一個包含所有質數的集合,那這個程式算起來就沒完沒了了,因為總有新的質數要算,然後把它加到集合裡面。當然這個集合是永遠也建立不完的,消費者這輩子都見不着了。

另一個例子是用浏覽器進行網際網路搜尋。假設你搜尋的短語在Google或是網?裡面有很多比對項。你用不着等到所有結果和照片的集合下載下傳完,而是得到一個流,裡面有最好的10個或20個比對項,還有一個按鈕檢視下面10個或20個。當你作為消費者點“下面10個”的時候,供應商就按需計算這些結果,然後再傳回你的浏覽器上顯示。

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

隻能周遊一次

和疊代器類似,流隻能周遊一次。周遊完之後,我們就說這個流已經被消費了。可以從原始資料源那裡再獲得一個新的流來重新周遊一遍,就像疊代器一樣(這裡假設它是集合之類的可重複的源,如果是I/O通道就不行了)

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理
public static void  testConsumeMoreTime(List<Dish> dishList){
        Stream<Dish> stream = dishList.stream();
        stream.forEach(System.out::println);
        stream.forEach(System.out::println);
    }           

複制

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

集合和流的另一個關鍵差別在于它們周遊資料的方式.

内部疊代與外部疊代

使用 Collection 接口需要使用者去做疊代(比如用 for-each ),這稱為外部疊代。 相反,Streams庫使用内部疊代——它幫你把疊代做了,還把得到的流值存在了某個地方,你隻要給出一個函數說要幹什麼就可以了。下面的代碼清單說明了這種差別。

【集合 】

  • 用 for-each 循環外部疊代
Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理
  • 用背後的疊代器做外部疊代
Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

【流:内部疊代】

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

内部疊代時,項目可以透明地并行處理,或者用更優化的順序進行處理

Streams庫的内部疊代可以自動選擇一種适合你硬體的資料表示和并行實作。

與此相反,一旦通過寫 for-each 而選擇了外部疊代,那你基本上就要自己管理所有的并行問題了

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

需求

需求: 輸出小于400的Dish的名字 , 并按照卡路裡排序

Java 7及之前的實作

package com.artisan.java8.stream;

import com.artisan.java8.Dish;

import java.util.*;
import java.util.stream.Collectors;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/3/7 8:23
 * @mark: show me the code , change the world
 */
public class StreamTest {

    public static void main(String[] args) {
        //have a dish list (menu)

        List<Dish> menu = Arrays.asList(
                new Dish("pork", false, 800, com.artisan.java8.Dish.Type.MEAT),
                new Dish("beef", false, 700, com.artisan.java8.Dish.Type.MEAT),
                new Dish("chicken", false, 400, com.artisan.java8.Dish.Type.MEAT),
                new Dish("french fries", true, 530, com.artisan.java8.Dish.Type.OTHER),
                new Dish("rice", true, 350, com.artisan.java8.Dish.Type.OTHER),
                new Dish("season fruit", true, 120, com.artisan.java8.Dish.Type.OTHER),
                new Dish("pizza", true, 550, com.artisan.java8.Dish.Type.OTHER),
                new Dish("prawns", false, 300, com.artisan.java8.Dish.Type.FISH),
                new Dish("salmon", false, 450, com.artisan.java8.Dish.Type.FISH));


        System.out.println(getDiskNamesByCollections(menu));
 
    }

    /**
     * 需求: 輸出小于400的Dish的名字 , 并按照卡路裡排序 
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByCollections(List<Dish> dishList){
        List<Dish> lowCalories = new ArrayList<>();


        //  filter  過濾小于400的
        for(Dish dish : dishList){
            if (dish.getCalories() < 400) {
                lowCalories.add(dish);
            }
        }

        // sort   按照卡路裡排序
        // Collections.sort(lowCalories,(d1,d2)->Integer.compare(d1.getCalories(),d2.getCalories()));

        Collections.sort(lowCalories,Comparator.comparingInt(Dish::getCalories));


        // 處理排序後的資料
        List<String> dishNames = new ArrayList<>();
        for (Dish dish :lowCalories){
            dishNames.add(dish.getName());
        }
        return dishNames;
    }
}           

複制

可以看到需要寫這麼多代碼,這麼多步驟

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

還有一個“垃圾變量” lowCalories ,它唯一的作用就是作為一次性的中間容器。

我們來看下Java8的試下

Java8中流的處理

/**
     * 需求: 輸出小于400的Dish的名字 , 按照卡路裡從第到高輸出
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.stream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }           

複制

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

處理流程如下:

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理

可以把幾個基礎操作連結起來,來表達複雜的資料處理流水線(在 filter 後面接上sorted 、 map 和 collect 操作,如上圖所示),同時保持代碼清晰可讀。 filter 的結果被傳給了 sorted 方法,再傳給 map 方法,最後傳給 collect 方法。

Java8中流的并行處理

為了利用多核架構并行執行這段代碼,你隻需要把 stream() 換成 parallelStream()

public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }           

複制

為了友善觀察,我們在擷取卡路裡這一步加個休眠 ,啟動Jconsole 來 觀察下線程情況

public static List<String> getDiskNamesByParallStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> {
            try {
                Thread.sleep(1000*1000); // 模拟休眠,觀察parallelStream是否開啟了多個線程計算
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return dish.getCalories() < 400 ;
        })
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }           

複制

Java 8 - Stream基本執行個體及Stream的并行處理線上程上的表現什麼是流流 VS 集合需求Java 7及之前的實作Java8中流的處理Java8中流的并行處理