天天看點

Java8學習(4)-Stream流

Stream和Collection的差別是什麼

流和集合的差別是什麼?

粗略地說, 集合和流之間的差異就在于什麼時候進行計算。集合是一個記憶體中的資料結構,它包含資料結構中目前所有的值--集合中的每個元素都得先計算出來才能添加到記憶體裡。(你可以往集合裡加東西或者删東西,但是不管什麼時候,集合中的每個元素都是放在記憶體裡的,元素都得計算出來才能成為集合的一部分。)
相比之下,流則是在概念上固定的資料結構(你不能添加或者删除元素),其元素則是按需計算的。這對程式設計有很大的好處。使用者僅僅從流中提取需要的值,而這些值--在使用者看不見的地方--隻會按需生成。這是一種生産者 - 消費者的關系。從另一個角度來說,流就像一個延遲建立的集合:隻有在消費者要求的時候才會計算值。

Stream是内部疊代

一個明顯的差別是疊代方式不同。Collection需要手動

for-each

或者使用

Iterator

在外部疊代。而Stream則開啟後可以直接對單個元素進行操作,内部幫你做好了疊代工作。

内部疊代的好處是可一個更好的并行。自己手寫疊代需要處理好每次疊代的内容。為了提高執行效率,也許會把多個處理邏輯寫到同一個周遊裡。比如,有同僚看到從scala轉過來的同僚的代碼,說他寫的代碼經常重複好多次。scala是函數式語言,和流天然內建。而我們慣性的做法,還是把一堆操作邏輯寫到同一個循環體中,來滿足自己對所謂的性能要求的潔癖。這常常會使得可讀性變差。很厭煩閱讀超過100行的代碼,尤其代碼還有首尾同步處理的邏輯(for, try-catch),很容易出錯。多寫一次循環來做這些事情,心理又過不去。

Stream開啟流之後,系統内部會分析對元素的操作是否可以并行,然後合并執行。也就是說,看起來,自己

filter-map-filter-map-group

很多次,但真實執行的時候并不是周遊了很多次。至于到底周遊了多少次。這是一個好問題,後面會說明這個問題。

使用流Stream的注意事項

流隻能消費一次。比如,

foreach

隻能周遊一次stream。再次則會抛異常。

流操作

針對流的操作方式兩種:

中間操作

可以連接配接起來的流操作叫做中間操作。諸如

filter

map

等中間操作會傳回另一個流。這讓多個操作可以連接配接起來形成一個查詢。但是,除非調用一個終端操作,比如

collect

,

foreach

, 否則中間操作不會執行----它們很懶。這是因為中間操作一般可以合并起來,在終端操作時一次性全部處理。

終端操作

關閉流的操作叫做終端操作。終端操作會從流的流水線生成結果。

使用流

本文demo源碼:

https://github.com/Ryan-Miao/someTest/tree/master/src/main/java/com/test/java8/streams

建立一個Entity作為基本元素。

package com.test.java8.streams.entity;

/**
 * Created by Ryan Miao on 12/11/17.
 */
public class Dish {
    private final String name;
    private final boolean vegetarian;
    private final int calories;
    private final Type type;

    public Dish(String name, boolean vegetarian, int calories, Type type) {
        this.name = name;
        this.vegetarian = vegetarian;
        this.calories = calories;
        this.type = type;
    }

    public String getName() {
        return name;
    }

    public boolean isVegetarian() {
        return vegetarian;
    }

    public int getCalories() {
        return calories;
    }

    public Type getType() {
        return type;
    }

    public enum Type{
        MEAT, FISH, OTHER
    }

}           

最常用,最簡單的用法

Stream API支援許多操作,這些操作能讓你快速完成複雜的資料查詢,比如篩選、切片、映射、查找、比對和歸約。

package com.test.java8.streams;

import com.google.common.collect.Lists;
import com.test.java8.streams.entity.Dish;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

import static java.util.stream.Collectors.toList;

/**
 * Created by Ryan Miao on 12/11/17.
 */
public class StreamExample {

    private List<Dish> menu;

    @Before
    public void setUp(){
        menu = Lists.newArrayList(
                new Dish("pork", false, 800, Dish.Type.MEAT),
                new Dish("beef", false, 700, Dish.Type.MEAT),
                new Dish("chicken", false, 400, Dish.Type.MEAT),
                new Dish("french fries", true, 530, Dish.Type.OTHER),
                new Dish("rice", true, 350, Dish.Type.OTHER),
                new Dish("season fruit", true, 120, Dish.Type.OTHER),
                new Dish("pizza", true, 550, Dish.Type.OTHER),
                new Dish("prawns", false, 300, Dish.Type.FISH),
                new Dish("salmon", false, 450, Dish.Type.FISH)
        );
    }

    @Test
    public void demo(){
        List<String> threeHighCaloricDishNames = menu.stream()
                .filter(dish -> dish.getCalories() > 300)
                .map(Dish::getName)
                .limit(3)
                .collect(toList());

        System.out.println(threeHighCaloricDishNames);
    }
}           
  1. stream()

    将一個集合轉換成一個流,流和list一樣,都是單元素的集合體。
  2. filter()

    接受一個布爾值lambda,即一個謂詞。當表達式的value是

    true

    的時候,該元素通過篩選。
  3. map()

    接受一個轉換lambda,将一個元素class映射成另一個class。
  4. collect

    收集器,彙總結果,觸發流,終端操作。
Java8學習(4)-Stream流

謂詞篩選filter

謂詞是一個傳回boolean的函數,也就是條件,通過這個條件進行篩選。

@Test
public void testFilterMapLimit(){
    List<Entity> entities = Lists.newArrayList(new Entity(100), new Entity(12), new Entity(33), new Entity(41));
    List<Integer> collect = entities.stream()
            .filter(entity -> entity.getId() < 100)
            .map(Entity::getId)
            .collect(Collectors.toList());
    System.out.println(collect);
}           

這裡,filter的參數就是一個謂詞,配合filter,可以篩選結果,隻有傳回值是true的item會通過。

去重複distinct

distinct()

截短流limit

limit(n)

跳過元素skip

skip(n)。 通過limit(n)形成互補關系。

映射map

map, stream的核心操作。接收一個參數,用來把一個對象轉換為另一個。demo同上。下面看具體需求。

/**
 * Returns a stream consisting of the results of applying the given
 * function to the elements of this stream.
 *
 * <p>This is an <a href="package-summary.html#StreamOps">intermediate
 * operation</a>.
 *
 * @param <R> The element type of the new stream
 * @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
 *               <a href="package-summary.html#Statelessness">stateless</a>
 *               function to apply to each element
 * @return the new stream
 */
<R> Stream<R> map(Function<? super T, ? extends R> mapper);           

接收一個Function函數,然後傳回Stream. 而Function在前面已經介紹過了。我們看核心的方法。

/**
 * Represents a function that accepts one argument and produces a result.
 *
 * <p>This is a <a href="package-summary.html">functional interface</a>
 * whose functional method is {@link #apply(Object)}.
 *
 * @param <T> the type of the input to the function
 * @param <R> the type of the result of the function
 *
 * @since 1.8
 */
@FunctionalInterface
public interface Function<T, R> {

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);
}           

Function函數的功能就是把參數轉換成另一個類型的對象,傳回。也就是

a -> {return b;}

瞥一眼Peek

上面map的需求特别多,但有時候我并不想傳回另一個對象,我隻是想要把原來的對象加工一個下,還是傳回原來的對象。用map也是可以的,隻要傳回同一個對象就行。但IDEA會推薦用

peek()

比如,我想把list的user全部取出來,把updateDate更新為目前時間。

@Test
public void testPeek(){
    final List<Integer> list = Lists.newArrayList(1,2,3,4);

    List<Entity> collect = list.stream()
            .map(Entity::new)
            .peek(e -> e.setUpdateTime(new Date()))
            .collect(Collectors.toList());

    System.out.println(collect);
}           

源碼裡是這樣寫的

/**
 * Returns a stream consisting of the elements of this stream, additionally
 * performing the provided action on each element as elements are consumed
 * from the resulting stream.
 *
 * <p>This is an <a href="package-summary.html#StreamOps">intermediate
 * operation</a>.
 *
 * <p>For parallel stream pipelines, the action may be called at
 * whatever time and in whatever thread the element is made available by the
 * upstream operation.  If the action modifies shared state,
 * it is responsible for providing the required synchronization.
 *
 * @apiNote This method exists mainly to support debugging, where you want
 * to see the elements as they flow past a certain point in a pipeline:
 * <pre>{@code
 *     Stream.of("one", "two", "three", "four")
 *         .filter(e -> e.length() > 3)
 *         .peek(e -> System.out.println("Filtered value: " + e))
 *         .map(String::toUpperCase)
 *         .peek(e -> System.out.println("Mapped value: " + e))
 *         .collect(Collectors.toList());
 * }</pre>
 *
 * @param action a <a href="package-summary.html#NonInterference">
 *                 non-interfering</a> action to perform on the elements as
 *                 they are consumed from the stream
 * @return the new stream
 */
Stream<T> peek(Consumer<? super T> action);           

而Consumer同樣也在之前出現過

@FunctionalInterface
public interface Consumer<T> {

    /**
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    void accept(T t);
}           

也就是說,peek()的本意是将對象取出來,消一遍,并不是像我的說的那樣傳回原對象,因為參數并不是Function, 而是Consumer。我之是以這麼說是因為Function也可以做到這個功能,隻要将傳回值變為目前對象即可。而peek裡,我們可以修改目前對象的屬性,也是會生效的。

流的扁平化faltMap

我們前面講的函數都是處理一個序列,一個list,一個Stream裡的資料。如果一個Stream的元素也是另一個stream呢?我還想把這個Stream的元素的stream打散,最終輸出一個stream。比如下面這個例子。統計單詞清單中出現的字母。

final List<String> words = Lists.newArrayList( "Hello", "worlds");

List<String[]> rs = words.stream()
        .map(w -> w.split(""))
        .distinct()
        .collect(Collectors.toList());

rs.forEach(e -> {
    for (String i : e) {
        System.out.print(i + ",");
    }
    System.out.println();
});           

列印的結果為:

H,e,l,l,o,

w,o,r,l,d,s,

顯然,目标沒達到。map之後的stream已經變成

Stream<Stream<String>>

。應該如何把裡面的Stream打開,最後拼接起來呢。最直覺的想法就是用一個新的list,将我們剛才foreach列印的步驟中的操作變成插入list。但這顯然不是函數式程式設計。

flatMap

可以接收一個參數,傳回一個流,這個流可以拼接到最外層的流。說的太啰嗦,看具體用法。

@Test
public void flatMap() {
    final List<String> words = Lists.newArrayList( "Hello", "worlds");

    List<String> collect = words.stream()
            .map(w -> w.split(""))
            .flatMap(a -> Arrays.stream(a))
            .distinct()
            .collect(Collectors.toList());

    System.out.println(collect);

}           
  1. 第一步,用map将一個String對象映射成String[]數組。
  2. 第二步,将這個傳回的對象映射成Stream,這裡的數組轉Stream即

    Arrays::stream

    .
  3. 第三步,用flatMap

以上可以合并為一步:

.flatMap(w -> Arrays.stream(w.split("")))

最終列印結果:

[H, e, l, o, w, r, d, s]

查找和比對

另一個常見的資料處理套路是看看資料集中的某些元素是否比對一個給定的屬性。Stream API通過allMatch, anyMatch,noneMatch,findFirst,findAny方法提供了這樣的工具。

比如,找到任何一個比對條件的。

@Test
public void anyMatchTest() {
    final List<Entity> entities = Lists.newArrayList(new Entity(101),
            new Entity(12), new Entity(33), new Entity(42));

    boolean b = entities.stream().anyMatch(e -> {
        System.out.println(e.getId());
        return e.getId() % 2 == 0;
    });

    if (b) {
        System.out.println("有偶數");
    }

}           

101

12

有偶數

上述隻是确定下是不是存在,在很多情況下這就夠了。至于FindAny和FindFirst則是找到後傳回,目前還沒遇到使用場景。

歸約Reduce

Google搜尋提出的Map Reduce模型,Hadoop提供了經典的開源實作。在Java中,我們也可以手動實作這個。

Java8學習(4)-Stream流

reduce的操作在函數式程式設計中很常見,作用是将一個曆史值與目前值做處理。比如求和,求最大值。

求和的時候,我們會将每個元素累加給sum。用reduce即可實作:

/**
 * 沒有初始值,傳回Optional
 */
@Test
public void demo(){
    OptionalInt rs = IntStream.rangeClosed(1, 100)
            .reduce((left, right) -> {
                System.out.println(left + "\t" + right);
                return left + right;
            });

    if (rs.isPresent()){
        System.out.println("===========");
        System.out.println(rs.getAsInt());
    }
}           

列印結果為:

1   2
3   3
6   4
...
...
4851    99
4950    100
===========
5050           

給一個初始值

int rs = IntStream.rangeClosed(1, 100)
                .reduce(10, (a, b) -> a + b);           

同樣,可以用來求最大值。

List<Integer> nums = Lists.newArrayList(3, 1, 4, 0, 8, 5);
Optional<Integer> max = nums.stream().reduce((a, b) -> b > a ? b : a);           

這裡的比較函數恰好是Integer的一個方法,為增強可讀性,可以替換為:

nums.stream().reduce(Integer::max).ifPresent(System.out::println);           

接下來,回歸我們最初的目标,實作偉大的Map-Reduce模型。比如,想要知道有多少個菜(一個dish list)。

@Test
public void mapReduce() {
    final ArrayList<Dish> dishes = Lists.newArrayList(
            new Dish("pork", false, 800, Type.MEAT),
            new Dish("beef", false, 700, Type.MEAT),
            new Dish("chicken", false, 400, Type.MEAT),
            new Dish("french fries", true, 530, Type.OTHER),
            new Dish("rice", true, 350, Type.OTHER),
            new Dish("season fruit", true, 120, Type.OTHER),
            new Dish("pizza", true, 550, Type.OTHER),
            new Dish("prawns", false, 300, Type.FISH),
            new Dish("salmon", false, 450, Type.FISH)
    );

    Integer sum = dishes.stream()
            .map(d -> 1)
            .reduce(0, (a, b) -> a + b);

}           

歸約的優勢和并行化

相比于用foreach逐漸疊代求和,使用reduce的好處在于,這裡的疊代被内部疊代抽象掉了,這讓内部實作得以選擇并行執行reduce操作。而疊代式求和例子要更新共享變量sum,這不是那麼容易并行化的。如果你加入了同步,很可能會發現線程競争抵消了并行本應帶來的性能提升!這種計算的并行化需要另一種方法:将輸入分塊,分塊求和,最後再合并起來。但這樣的話代碼看起來就完全不一樣了。後面會用分支/合并架構來做這件事。但現在重要的是要認識到,可變的累加模式對于并行化來說是死路一條。你需要一種新的模式,這正是reduce所提供的。傳遞給reduce的lambda不能更改狀态(如執行個體變量),而且操作必須滿足結合律才可以按任意順序執行。

流操作的狀态:無狀态和有狀态

你已經看到了很多的流操作,乍一看流操作簡直是靈丹妙藥,而且隻要在從集合生成流的時候把Stream換成parallelStream就可以實作并行。但這些操作的特性并不相同。他們需要操作的内部狀态還是有些問題的。

諸如map和filter等操作會從輸入流中擷取每一個元素,并在輸出流中得到0或1個結果。這些操作一般是無狀态的:他們沒有内部狀态(假設使用者提供的lambda或者方法引用沒有内部可變狀态)。

但諸如reduce、sum、max等操作需要内部狀态來累積結果。在前面的情況下,内部狀态很小。在我們的例子裡就是一個int或者double。不管流中有多少元素要處理,内部狀态都是有界的。

相反,諸如sort或distinct等操作一開始都和filter和map差不多--都是接受一個流,再生成一個流(中間操作), 但有一個關鍵的差別。從流中排序和删除重複項都需要知道先前的曆史。例如,排序要求所有元素都放入緩沖區後才能給輸出流加入一個項目,這一操作的存儲要求是無界的。要是流比較大或是無限的,就可能會有問題(把質數流倒序會做什麼呢?它應當傳回最大的質數,但數學告訴我們他不存在)。我們把這些操作叫做有狀态操作。

以上内容均來自《Java8 In Action》。

    關注我的公衆号

Java8學習(4)-Stream流

唯有不斷學習方能改變!

--

Ryan Miao