天天看点

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

什么是流

集合是Java中使用最多的API 。 可以让你把数据分组并加以处理。尽管集合对于几乎任何一个Java应用都是不可或缺的,但集合操作却远远算不上完美。

流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,无需写任何多线程代码 !

Java 8中的集合支持一个新的stream 方法,它会返回一个流(接口定义在

java.util.stream.Stream

里)

  • 元素序列
就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如 ArrayList 与 LinkedList )。但流的目的在于表达计算,比如 filter 、 sorted 和 map 。集合讲的是数据,流讲的是计算。
流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
  • 数据处理操作
流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如 filter 、 map 、 reduce 、 find 、 match 、 sort 等。流操作可以顺序执行,也可并行执行

此外,流操作有两个重要的特点

  • 流水线

很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大

的流水线。 流水线的操作可以看作对数据源进行数据库式查询。

  • 内部迭代
与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。

实例解释

/**
     * 需求: 卡路里前三的dish的名字
     * @param dishList
     * @return
     */
    public static List<String> getTop3HighCalories(List<Dish> dishList){
        return dishList.stream().filter(dish->dish.getCalories()>300)
                .map(Dish::getName)
                .limit(3)
                .collect(Collectors.toList());
    }           

复制

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理
Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理
  • 先是对 menu 调用 stream 方法,得到一个流。 数据源是dishList ,它给流提供一个元素列表
  • 接下来,对流应用一系列数据处理操作: filter 、 map 、 limit除了 collect 之外,所有这些操作都会返回另一个流,这样它们就可以接成一条 流水线 , 于是就可以看作对源的一个查询
  • 最后, collect 操作开始处理流水线,并返回结果(它和别的操作不一样,因为它返回的不是流,在这里是一个 List )

在调用 collect 之前,没有任何结果产生,实际上根本就没有从 menu 里选择元素。可以这么理解:链中的方法调用都在排队等待,直到调用 collect 。

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理
  • filter :接受Lambda,从流中排除某些元素。在本例中,通过传递lambda

    d ->d.getCalories() > 300

    ,选择出超过300卡路里的Dish
  • map : 接受一个Lambda,将元素转换成其他形式或提取信息。在本例中,通过传递方法引用

    Dish::getName

    ,相当于Lambda

    d -> d.getName()

    ,提取了名字。
  • limit :截断流,使其元素不超过给定数量。
  • collect :将流转换为其他形式。在本例中,流被转换为一个列表。 可以把 collect 看作能够接受各种方案作为参数,并将流中的元素累计成为一个汇总结果的操作。 这里的toList() 就是将流转换为列表的方案。

流 VS 集合

Java现有的集合概念和新的流概念都提供了接口,来配合代表元素型有序值的数据接口。

粗略地说,集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。(你可以往集合里加东西或者?东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分。)

相比之下,流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。 是一种生产者?消费者的关系。

从另一个角度来说,流就像是一个延迟创建的集合:只有在消费者要求的时候才会计算值 。 与此相反,集合则是急切创建的。

以质数为例,要是想创建一个包含所有质数的集合,那这个程序算起来就没完没了了,因为总有新的质数要算,然后把它加到集合里面。当然这个集合是永远也创建不完的,消费者这辈子都见不着了。

另一个例子是用浏览器进行互联网搜索。假设你搜索的短语在Google或是网?里面有很多匹配项。你用不着等到所有结果和照片的集合下载完,而是得到一个流,里面有最好的10个或20个匹配项,还有一个按钮查看下面10个或20个。当你作为消费者点“下面10个”的时候,供应商就按需计算这些结果,然后再返回你的浏览器上显示。

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

只能遍历一次

和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费了。可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样(这里假设它是集合之类的可重复的源,如果是I/O通道就不行了)

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理
public static void  testConsumeMoreTime(List<Dish> dishList){
        Stream<Dish> stream = dishList.stream();
        stream.forEach(System.out::println);
        stream.forEach(System.out::println);
    }           

复制

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

集合和流的另一个关键区别在于它们遍历数据的方式.

内部迭代与外部迭代

使用 Collection 接口需要用户去做迭代(比如用 for-each ),这称为外部迭代。 相反,Streams库使用内部迭代——它帮你把迭代做了,还把得到的流值存在了某个地方,你只要给出一个函数说要干什么就可以了。下面的代码列表说明了这种区别。

【集合 】

  • 用 for-each 循环外部迭代
Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理
  • 用背后的迭代器做外部迭代
Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

【流:内部迭代】

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

内部迭代时,项目可以透明地并行处理,或者用更优化的顺序进行处理

Streams库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。

与此相反,一旦通过写 for-each 而选择了外部迭代,那你基本上就要自己管理所有的并行问题了

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

需求

需求: 输出小于400的Dish的名字 , 并按照卡路里排序

Java 7及之前的实现

package com.artisan.java8.stream;

import com.artisan.java8.Dish;

import java.util.*;
import java.util.stream.Collectors;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/3/7 8:23
 * @mark: show me the code , change the world
 */
public class StreamTest {

    public static void main(String[] args) {
        //have a dish list (menu)

        List<Dish> menu = Arrays.asList(
                new Dish("pork", false, 800, com.artisan.java8.Dish.Type.MEAT),
                new Dish("beef", false, 700, com.artisan.java8.Dish.Type.MEAT),
                new Dish("chicken", false, 400, com.artisan.java8.Dish.Type.MEAT),
                new Dish("french fries", true, 530, com.artisan.java8.Dish.Type.OTHER),
                new Dish("rice", true, 350, com.artisan.java8.Dish.Type.OTHER),
                new Dish("season fruit", true, 120, com.artisan.java8.Dish.Type.OTHER),
                new Dish("pizza", true, 550, com.artisan.java8.Dish.Type.OTHER),
                new Dish("prawns", false, 300, com.artisan.java8.Dish.Type.FISH),
                new Dish("salmon", false, 450, com.artisan.java8.Dish.Type.FISH));


        System.out.println(getDiskNamesByCollections(menu));
 
    }

    /**
     * 需求: 输出小于400的Dish的名字 , 并按照卡路里排序 
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByCollections(List<Dish> dishList){
        List<Dish> lowCalories = new ArrayList<>();


        //  filter  过滤小于400的
        for(Dish dish : dishList){
            if (dish.getCalories() < 400) {
                lowCalories.add(dish);
            }
        }

        // sort   按照卡路里排序
        // Collections.sort(lowCalories,(d1,d2)->Integer.compare(d1.getCalories(),d2.getCalories()));

        Collections.sort(lowCalories,Comparator.comparingInt(Dish::getCalories));


        // 处理排序后的数据
        List<String> dishNames = new ArrayList<>();
        for (Dish dish :lowCalories){
            dishNames.add(dish.getName());
        }
        return dishNames;
    }
}           

复制

可以看到需要写这么多代码,这么多步骤

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

还有一个“垃圾变量” lowCalories ,它唯一的作用就是作为一次性的中间容器。

我们来看下Java8的试下

Java8中流的处理

/**
     * 需求: 输出小于400的Dish的名字 , 按照卡路里从第到高输出
     * @param dishList
     * @return
     */
    public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.stream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }           

复制

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

处理流程如下:

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理

可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在 filter 后面接上sorted 、 map 和 collect 操作,如上图所示),同时保持代码清晰可读。 filter 的结果被传给了 sorted 方法,再传给 map 方法,最后传给 collect 方法。

Java8中流的并行处理

为了利用多核架构并行执行这段代码,你只需要把 stream() 换成 parallelStream()

public static List<String> getDiskNamesByStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> dish.getCalories() < 400)
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }           

复制

为了方便观察,我们在获取卡路里这一步加个休眠 ,启动Jconsole 来 观察下线程情况

public static List<String> getDiskNamesByParallStream(List<Dish> dishList){
        return dishList.parallelStream().filter(dish -> {
            try {
                Thread.sleep(1000*1000); // 模拟休眠,观察parallelStream是否开启了多个线程计算
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return dish.getCalories() < 400 ;
        })
                .sorted(Comparator.comparing(Dish::getCalories))
                .map(Dish::getName).collect(Collectors.toList());
    }           

复制

Java 8 - Stream基本实例及Stream的并行处理在线程上的表现什么是流流 VS 集合需求Java 7及之前的实现Java8中流的处理Java8中流的并行处理