天天看點

終結操作之forEach、forEachOrdered 、reduce、collect

終結操作周遊流來産生一個結果或是副作用。在一個流上執行終結操作之後,該流被消費,無法再次被消費

  • forEach 和 forEachOrdered 對流中的每個元素執行由 Consumer 給定的實作。

    在使用 forEach時,并沒有确定的處理元素的順序;forEachOrdered 則按照流的相遇順序來處理元素,如果流有确定的相遇順序的話

  • reduce進行遞歸計算
  • collect生成新的資料結構

1. forEach 和 forEachOrdered

//在這裡forEach執行是順序的
System.out.println("forEach Demo");
Stream.of("AAA","BBB","CCC").forEach(s->System.out.println("Output:"+s));
System.out.println("forEachOrdered Demo");
Stream.of("AAA","BBB","CCC").forEachOrdered(s->System.out.println("Output:"+s));

//輸出:
forEach Demo
Output:AAA
Output:BBB
Output:CCC

forEachOrdered Demo
Output:AAA
Output:BBB
Output:CCC


//在這裡forEach執行順序是不能保障的
Stream.of("AAA","BBB","CCC").parallel().forEach(s->System.out.println("Output:"+s));
Stream.of("AAA","BBB","CCC").parallel().forEachOrdered(s->System.out.println("Output:"+s));

//第二行将始終輸出
Output:AAA
Output:BBB
Output:CCC
//而第一個不保證,因為forEach處理的元素順序是不确定的。 
//forEachOrdered将按其源指定的順序處理流的元素,無論流是順序流還是并行流。
           

2. reduce

  • reduce一般用于遞歸操作,最常見的用法就是将stream中一連串的值合成為單個值,比如為一個包含一系列數值的數組求和。
  • reduce方法有三個重載的方法,方法簽名如下:
    Optional<T> reduce(BinaryOperator<T> accumulator);
    T reduce(T identity, BinaryOperator<T> accumulator);
    <U> U reduce(U identity,
                     BiFunction<U, ? super T, U> accumulator,
                     BinaryOperator<U> combiner);
               
  • reduce執行過程分析
    //接受一個BinaryOperator類型的lambada表達式
    List<Integer> numList = Arrays.asList(1,2,3,4,5);
    int result = numList.stream().reduce((a,b) -> a + b ).get();
    System.out.println(result);
               
    代碼實作了對numList中的元素累加。lambada表達式的a參數是表達式的執行結果的緩存,也就是表達式這一次的執行結果會被作為下一次執行的參數,而第二個參數b則是依次為stream中每個元素。如果表達式是第一次被執行,a則是stream中的第一個元素。
    int result = numList.stream().reduce((a,b) -> {
      System.out.println("a=" + a + ",b=" + b);
      return a + b;
    } ).get();
               
    在表達式中假如列印參數的代碼,列印出來的内容如下:
    a=1,b=2
    a=3,b=3
    a=6,b=4
    a=10,b=5
               
    //與第一個簽名的實作的唯一差別是它首次執行時表達式第一次參數并不是stream的第一個元素,
    //而是通過簽名的第一個參數identity來指定
    //第一種比第一種僅僅多了一個字定義初始值罷了。 
    //-------------------------------------------------------------------
    //此外,因為存在stream為空的情況,是以第一種實作并不直接方法計算的結果,
    //而是将計算結果用Optional來包裝,我們可以通過它的get方法獲得一個Integer類型的結果
    //,而Integer允許null。
    //
    //第二種實作因為允許指定初始值,是以即使stream為空,也不會出現傳回結果為null的情況,
    //當stream為空,reduce為直接把初始值傳回
    List<Integer> numList = Arrays.asList(1,2,3,4,5);
    int result = numList.stream().reduce(0,(a,b) ->  a + b );
    System.out.println(result);
               
    //第三種簽名的用法相較前兩種稍顯複雜,由于前兩種實作有一個缺陷,
    //它們的計算結果必須和stream中的元素類型相同,如上面的代碼示例,stream中的類型為int,
    //那麼計算結果也必須為int,這導緻了靈活性的不足,甚至無法完成某些任務,
    //比入我們咬對一個一系列int值求和,但是求和的結果用一個int類型已經放不下,
    //必須更新為long類型,此實第三簽名就能發揮價值了,它不将執行結果與stream中元素的類型綁死。
    
    List<Integer> numList = Arrays.asList(Integer.MAX_VALUE,Integer.MAX_VALUE);
    long result = numList.stream().reduce(0L,(a,b) ->  a + b, (a,b)-> 0L );
    System.out.println(result);
    
    //如上代碼所示,它能将int類型的清單合并成long類型的結果。
    
    //當然這隻是其中一種應用罷了,由于擺脫了類型的限制我們還可以通過他來靈活的完成許多任務,
    //比如将一個int類型的ArrayList轉換成一個String類型的ArrayList	
    List<Integer> numList = Arrays.asList(1, 2, 3, 4, 5, 6);
    ArrayList<String> result = numList.stream().reduce(new ArrayList<String>(), (a, b) -> {
        a.add("element-" + Integer.toString(b));
        return a;
    }, (a, b) -> null);
    System.out.println(result);
    
    //執行結果為
    [element-1, element-2, element-3, element-4, element-5, element-6]
    
    //這個示例顯得有點雞肋,一點不實用,不過在這裡我們的主要目的是說明代碼能達到什麼樣的效果,
    //是以代碼示例也不必取自實際的應用場景。
    
    //現在解釋下這個reduce的簽名還包含第三個參數,一個BinaryOperator<U>類型的表達式。
    //在正常情況下我們可以忽略這個參數,敷衍了事的随便指定一個表達式即可,目的是為了通過編譯器的檢查,
    //因為在正常的stream中它并不會被執行到,然而, 雖然此表達式形同虛設,
    //可是我們也不是把它設定為null,否者還是會報錯。
    //在并行stream中,此表達式則會被執行到
    /**
     * lambda文法:
     * System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, (s1, s2) -> s1 + s2
     , (s1, s2) -> s1 + s2));
     **/
    System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, 
    		new BiFunction<Integer, Integer, Integer>() {
    			@Override
    			public Integer apply(Integer integer, Integer integer2) {
    				return integer + integer2;
    			}
    		}
    		, new BinaryOperator<Integer>() {
    			@Override
    			public Integer apply(Integer integer, Integer integer2) {
    				return integer + integer2;
    			}
    		}));
    
    //并行時的計算結果是18,而非并行時的計算結果是10!
    //為什麼會這樣?
    //先分析下非并行時的計算過程:
    //第一步計算4 + 1 = 5,第二步是5 + 2 = 7,第三步是7 + 3 = 10。這樣解釋好像沒有問題呀。
    //那問題就是非并行的情況與了解有不一緻的地方了!
    //先分析下它可能是通過什麼方式來并行的?
    //按非并行的方式來看它是分了三步的,每一步都要依賴前一步的運算結果!
    //那應該是沒有辦法進行并行計算的啊!
    //可實際上現在并行計算出了結果并且關鍵其結果與非并行時是不一緻的!
    //那要不就是了解上有問題,要不就是這種方式在并行計算上存在BUG。
    //暫且認為其不存在BUG,先來看下它是怎麼樣出這個結果的。
    //猜測初始值4是存儲在一個變量result中的;并行計算時,線程之間沒有影響,
    //是以每個線程在調用第二個參數BiFunction進行計算時,直接都是使用result值當其第一個參數
    //(由于Stream計算的延遲性,在調用最終方法前,都不會進行實際的運算,
    //是以每個線程取到的result值都是原始的4),是以計算過程現在是這樣的:
    //線程1:1 + 4 = 5;線程2:2 + 4 = 6;線程3:3 + 4 = 7;
    //Combiner函數: 5 + 6 + 7 = 18!
    
    //在來一個例子
    /**
     * lambda文法:
     * System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, (s1, s2) -> s1 + s2
     , (s1, s2) -> s1 * s2));
     */
    System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, 
    		new BiFunction<Integer, Integer, Integer>() {
    			@Override
    			public Integer apply(Integer integer, Integer integer2) {
    				return integer + integer2;
    			}
    		}
    		, new BinaryOperator<Integer>() {
    			@Override
    			public Integer apply(Integer integer, Integer integer2) {
    				return integer * integer2;
    			}
    		}));
    
    //以上示例輸出的結果是210!
    //它表示的是,使用4與1、2、3中的所有元素按(s1,s2) -> s1 + s2(accumulator)的方式進行第一次計算,
    //得到結果序列4+1, 4+2, 4+3,即5、6、7;
    //然後将5、6、7按combiner即(s1, s2) -> s1 * s2的方式進行彙總,也就是5 * 6 * 7 = 210。
    //使用函數表示就是:(4+1) * (4+2) * (4+3) = 210;
    //reduce的這種寫法可以與以下寫法結果相等(但過程是不一樣的,三個參數時會進行并行處理):
    System.out.println(Stream.of(1, 2, 3).map(n -> n + 4).reduce((s1, s2) -> s1 * s2));
    
    //這種方式有助于了解并行三個參數時的場景,實際上就是第一步使用accumulator進行轉換
    //(它的兩個輸入參數一個是identity, 一個是序列中的每一個元素),由N個元素得到N個結果;
    //第二步是使用combiner對第一步的N個結果做彙總。
    /**
     * 模拟Filter查找其中含有字母a的所有元素,列印結果将是aa ab ad
     * lambda文法:
     * s1.parallel().reduce(new ArrayList<String>(), 
     * 			             (r, t) -> {if (predicate.test(t)) r.add(t);  return r; },
    						 (r1, r2) -> {System.out.println(r1==r2); return r2;
    		}).stream().forEach(System.out::println);
     */
    Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
    Predicate<String> predicate = t -> t.contains("a");
    s1.parallel().reduce(new ArrayList<String>(), 
    	  new BiFunction<ArrayList<String>, String, ArrayList<String>>() {
    			@Override
    			public ArrayList<String> apply(ArrayList<String> strings, String s) {
    				if (predicate.test(s)) {
    					strings.add(s);
    				}
    
    				return strings;
    			}
    		},
    		new BinaryOperator<ArrayList<String>>() {
    			@Override
    			public ArrayList<String> apply(ArrayList<String> strings, ArrayList<String> strings2) {
    				System.out.println(strings == strings2);
    				return strings;
    			}
    		}).stream().forEach(System.out::println);
    
      //其中System.out.println(r1==r2)這句列印的結果是什麼呢?經過運作後發現是True!
     //為什麼會這樣?這是因為每次第二個參數也就是accumulator傳回的都是第一個參數中New的ArrayList對象!
     //是以combiner中傳入的永遠都會是這個對象,這樣r1與r2就必然是同一樣對象!
     //是以如果按了解的,combiner是将不同線程操作的結果彙總起來,那麼一般情況下上述代碼就會這樣寫(lambda):
    
    Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
    
    //模拟Filter查找其中含有字母a的所有元素,由于使用了r1.addAll(r2),
    //其列印結果将不會是預期的aa ab ad
    Predicate<String> predicate = t -> t.contains("a");
    s1.parallel().reduce(new ArrayList<String>(), 
    				(r, t) -> {if (predicate.test(t)) r.add(t);  return r; },
    				(r1, r2) -> {r1.addAll(r2); return r1; })
    			.stream().forEach(System.out::println);
    //這個時候出來的結果與預期的結果就完全不一樣了,要多了很多元素!
               
    //更多例子
    	// 字元串連接配接,concat = "ABCD"
    	String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
    	// 求最小值,minValue = -3.0
    	double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
    	// 求和,sumValue = 10, 有起始值
    	int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
    	// 求和,sumValue = 10, 無起始值
    	sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
    	// 過濾,字元串連接配接,concat = "ace"
    	concat = Stream.of("a", "B", "c", "D", "e", "F").
    	 filter(x -> x.compareTo("Z") > 0).
    	 reduce("", String::concat);
    	```
    
               

3. collect

collect含義與Reduce有點相似,先看其定義:

<R> R collect(Supplier<R> supplier,
			  BiConsumer<R, ? super T> accumulator,
			  BiConsumer<R, R> combiner);
           

仍舊先分析其參數(參考其JavaDoc):

  • supplier:動态的提供初始化的值;建立一個可變的結果容器(JAVADOC);對于并行計算,這個方法可能被調用多次,每次傳回一個新的對象;
  • accumulator:類型為BiConsumer,注意這個接口是沒有傳回值的;它必須将一個元素放入結果容器中(JAVADOC)。
  • combiner:類型也是BiConsumer,是以也沒有傳回值。它與三參數的Reduce類型,隻是在并行計算時彙總不同線程計算的結果。它的輸入是兩個結果容器,必須将第二個結果容器中的值全部放入第一個結果容器中(JAVADOC)。

可見Collect與分并行與非并行兩種情況。

下面對并行情況進行分析。

直接使用上面Reduce模拟Filter的示例進行示範(使用lambda文法):

/**
 * 模拟Filter查找其中含有字母a的所有元素,列印結果将是aa ab ad
 */
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
Predicate<String> predicate = t -> t.contains("a");
System.out.println(s1.parallel().collect(() -> new ArrayList<String>(),
		(array, s) -> {if (predicate.test(s)) array.add(s); },
		(array1, array2) -> array1.addAll(array2)));
           

根據以上分析,這邊了解起來就很容易了:

  • 每個線程都建立了一個結果容器ArrayList,假設每個線程處理一個元素,那麼處理的結果将會是[aa],[ab],[],[ad]四個結果容器(ArrayList);
  • 最終再調用第三個BiConsumer參數将結果全部Put到第一個List中,是以傳回結果就是列印的結果了。

關于collect其他用法:

  • Collectors.toList():轉換成List集合。
  • Collectors.toSet():轉換成set集合。
  • Collectors.toCollection()
  • Collectors.toConcurrentMap()
  • Collectors.toCollection(TreeSet::new):轉換成特定的set集合。
    TreeSet<String> treeSet = Stream.of("a", "c", "b", "a")
    								.collect(Collectors.toCollection(TreeSet::new));
    System.out.println(treeSet);
               
  • Collectors.toMap(keyMapper, valueMapper, mergeFunction):轉換成map。
    private static void testToConcurrentMap() {
        Optional.of(menu.stream()
                .collect(Collectors.toConcurrentMap(Dish::getName, Dish::getCalories)))
                .ifPresent(v -> {
                    System.out.println(v);
                    System.out.println(v.getClass());
                });
    }
    
    private static void testToConcurrentMapWithBinaryOperator() {
        Optional.of(menu.stream()
                .collect(Collectors.toConcurrentMap(Dish::getType, v -> 1L, (a, b) -> a + b)))
                .ifPresent(v -> {
                    System.out.println(v);
                    System.out.println(v.getClass());
                });
    }
    
    private static void testToConcurrentMapWithBinaryOperatorAndSupplier() {
        Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
        Optional.of(menu.stream()
                .collect(Collectors.toConcurrentMap(Dish::getType, v -> 1L, (a, b) -> a + b, ConcurrentSkipListMap::new)))
                .ifPresent(v -> {
                    System.out.println(v);
                    System.out.println(v.getClass());
                });
    }
    ------------------------
    Map<String, String> collect = Stream.of("a", "b", "c", "a")
    									.collect(Collectors.toMap(x -> x, x -> x + x,(oldVal, newVal) -> newVal)));
    collect.forEach((k,v) -> System.out.println(k + ":" + v));
    //a:aa
    //b:bb
    //c:cc
    
    //補充關于合并函數 BinaryOperator<U> mergeFunction對象
    //當toMap中沒有mergeFunction時,出現key重複時,會抛出異常 :
    //    Exception in thread "main" java.lang.IllegalStateException: Duplicate key aa
    //當使用mergeFunction時,可通過Labmda表達式,對重複值進行處理
               
  • Collectors.minBy(Integer::compare):求最小值,相對應的當然也有maxBy方法。
  • Collectors.averagingInt(x->x):求平均值,同時也有averagingDouble、averagingLong方法。
  • Collectors.summingInt(x -> x)):求和。
  • Collectors.summarizingDouble(x -> x):可以擷取最大值、最小值、平均值、總和值、總數。
    DoubleSummaryStatistics summaryStatistics = Stream.of(1, 3, 4)
    			.collect(Collectors.summarizingDouble(x -> x));
    System.out.println(summaryStatistics .getAverage());
               
  • Collectors.groupingBy(x -> x):有三種方法,檢視源碼可以知道前兩個方法最終調用第三個方法,

    第二個參數預設HashMap::new 第三個參數預設Collectors.toList()

    //第一種
    Map<Integer, List<Integer>> map = Stream.of(1, 3, 3, 2)
    									.collect(Collectors.groupingBy(Function.identity()));
    System.out.println(map);
    //{1=[1], 2=[2], 3=[3, 3]}
    
    //第二種	
    Map<Integer, Integer> map1 = Stream.of(1, 3, 3, 2)
    		.collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(x -> x)));
    System.out.println(map1);
    //{1=1, 2=2, 3=6}
    
    //第三種	
    HashMap<Integer, List<Integer>> hashMap = Stream.of(1, 3, 3, 2)
    		.collect(Collectors.groupingBy(Function.identity(), HashMap::new, Collectors.mapping(x -> x + 1, Collectors.toList())));
    System.out.println(hashMap);
    //{1=[2], 2=[3], 3=[4, 4]}
    
    //補充: identity()是Function類的靜态方法,和 x->x 是一個意思,
    //當僅僅需要自己傳回自己時,使用identity()能更清楚的表達作者的意思.
    //寫的複雜一點,繞一點,對了解很有好處
    
    // 按照字元串長度進行分組符合條件的元素将組成一個 List 映射到以條件長度為key的
    //Map<Integer, List<String>> 中
    servers.stream.collect(Collectors.groupingBy(String::length))	
    
    //如果我不想 Map 的 value 為 List 怎麼辦? 上面的實作實際上調用了下面的方式:
    //Map<Integer, Set<String>>
    servers.stream.collect(Collectors.groupingBy(String::length, Collectors.toSet()))
    
    //我要考慮同步安全問題怎麼辦? 當然使用線程安全的同步容器啊,那前兩種都用不成了吧! 
    //别急! 我們來推斷一下,其實第二種等同于下面的寫法: 
    Supplier<Map<Integer,Set<String>>> mapSupplier = HashMap::new;
    Map<Integer,Set<String>> collect = servers.stream
    		.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));
    //這就非常好辦了,我們提供一個同步 Map 不就行了,于是問題解決了:
    Supplier<Map<Integer, Set<String>>> mapSupplier = () -> Collections.synchronizedMap(new HashMap<>());
    Map<Integer, Set<String>> collect = servers.stream.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));	
    其實同步安全問題 Collectors 的另一個方法 groupingByConcurrent 給我們提供了解決方案,用法和 groupingBy 差不多。
               
  • Collectors.partitioningBy(x -> x > 2),把資料分成兩部分,key為ture/false。第一個方法也是調用第二個方法,第二個參數預設為Collectors.toList()
Map<Boolean, List<Integer>> map = Stream.of(1, 3, 3, 2)
                .collect(Collectors.partitioningBy(x -> x > 2));
        map.forEach((k,v) -> System.out.println(k + ":" + v));
        
        Map<Boolean, Long> longMap = Stream.of(1, 3, 3, 2)
                .collect(Collectors.partitioningBy(x -> x > 1, Collectors.counting()));
        longMap.forEach((k,v) -> System.out.println(k + ":" + v));
		//false:[1, 2]
		//true:[3, 3]
		
		//false:1
		//true:3
           
  • Collectors.joining(","):拼接字元串。
    //輸出 FelordcnTomcatJettyUndertowResin
     servers.stream().collect(Collectors.joining());
    
     //輸出 Felordcn,Tomcat,Jetty,Undertow,Resin
     servers.stream().collect(Collectors.joining("," ));
    
     //輸出 [Felordcn,Tomcat,Jetty,Undertow,Resin]
     servers.stream().collect(Collectors.joining(",", "[", "]")); 
               
  • Collectors.collectingAndThen先執行了一個歸納操作,然後再對歸納的結果進行 Function 函數處理輸出一個新的結果
    //先執行collect操作後再執行第二個參數的表達式。這裡是先塞到集合,再得出集合長度
    Integer integer = Stream.of("1", "2", "3")
    						.collect(Collectors.collectingAndThen(Collectors.toList(), x -> x.size()));
    //輸出:3
    
     //我們将servers joining 然後轉成大寫,結果為: FELORDCN,TOMCAT,JETTY,UNDERTOW,RESIN   
     servers.stream.collect(Collectors.collectingAndThen(Collectors.joining(","), 
     						String::toUpperCase));
               
  • Collectors.mapping(…):跟Stream的map操作類似,隻是參數有點差別
    //該方法是先對元素使用 Function 進行再加工操作,然後用另一個Collector 歸納。
    //比如我們先去掉 servers 中元素的首字母,然後将它們裝入 List 。
    
     // [elordcn, omcat, etty, ndertow, esin]
     servers.stream.collect(Collectors.mapping(s -> s.substring(1), Collectors.toList()));
    
    //有點類似 Stream 先進行了map, 操作再進行collect:
    servers.stream.map(s -> s.substring(1)).collect(Collectors.toList());