天天看点

Flink常用的DataSet 和 DataStream API

声明:本系列博客为原创,最先发表在拉勾教育,其中一部分为免费阅读部分。被读者各种搬运至各大网站。所有其他的来源均为抄袭。

《2021年最新版大数据面试题全面开启更新》

一、说好的流批一体呢?

1、现状

     Flink并没有安全支持所谓的“流批一体”,即编写一套代码,可以同时支持流式计算和批量计算的场景。目前版本1.10依然采用DataSet和DataStream两套API来适配不同的应用场景。

2、DataSet和DataStream的区别和联系

      Flink诞生支持的设计哲学就是:用同一个引擎支持多种形式的计算,包括批处理、流处理和机器学习。尤其在流式计算方面,Flink实现了计算引擎级别的流批一体。对于普通开发者而言,如果要使用原生的Flink,直接的感受还是要编写两套代码。

整体架构如下:

Flink常用的DataSet 和 DataStream API

      在Flink源码中,可以在flink-java这个模块中找到所有关于DataSet的核心类,DataStream的核心类则在flink-streaming-java这个模块中。

Flink常用的DataSet 和 DataStream API
Flink常用的DataSet 和 DataStream API

      打开DataSet和DataStream两个类,二者支持的API都非常丰富且十分类似,比如常用的map、fliter、join等常见的transformation函数。

      对于DataSet而言,Source部分来源于文件、表或者Java集合;而DataStream的Source来源于一般都是消息中间件比如Kafka等。

      由于Flink DataSet和DataStream API的高度相似,并且Flink在实时计算领域中的应用更为广泛,下面主要讲解DataStream API 的使用。

二、DataStream

      Flink的基础构建模块就是流(Streams)和转换(Transformations),每一个数据流起始于一个或多个Source,并终止于一个或多个Sink,类似有向无环图(DAG)。

Flink常用的DataSet 和 DataStream API

1、自定义实时数据源

      利用Flink提供的自定义Source的功能来实现一个自定义的实时数据源,具体如下:

public class MyStreamingSource implements SourceFunction<MyStreamingSource.Item> {

    private boolean isRunning = true;

    /**
     * 重写run方法产生一个源源不断的数据发送源
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Item> ctx) throws Exception {
        while(isRunning){
            Item item = generateItem();
            ctx.collect(item);

            //每秒产生一条数据
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }

    //随机产生一条商品数据
    private Item generateItem(){
        int i = new Random().nextInt(100);

        Item item = new Item();
        item.setName("name" + i);
        item.setId(i);
        return item;
    }

    class Item{
        private String name;
        private Integer id;

        Item() {
        }

        public String getName() {
            return name;
        }

        void setName(String name) {
            this.name = name;
        }

        private Integer getId() {
            return id;
        }

        void setId(Integer id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "Item{" +
                    "name='" + name + '\'' +
                    ", id=" + id +
                    '}';
        }
    }
}


class StreamingDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        DataStreamSource<MyStreamingSource.Item> text = 
        //注意:并行度设置为1,我们会在后面的课程中详细讲解并行度
        env.addSource(new MyStreamingSource()).setParallelism(1); 
        DataStream<MyStreamingSource.Item> item = text.map(
                (MapFunction<MyStreamingSource.Item, MyStreamingSource.Item>) value -> value);

        //打印结果
        item.print().setParallelism(1);
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }

}

           

      在自定义Source中,实现了Flink的SourceFunction接口,同时实现了其实的run方法,在run方法中每隔一秒随机发送一个自定义的Item。我们查看运行结果:

Flink常用的DataSet 和 DataStream API

从控制台中看到,数据不断输出。

2、Map

      Map接受一个元素作为输入,并且根据开发者自定义的逻辑处理后输出。

Flink常用的DataSet 和 DataStream API

      Map算子是最常用的算子之一。从源DataStream到目标DataStream的转换过程中,返回的是SingleOutputStreamOpeartor。我们也可以在重写的Map函数中使用lamba表达式。

SingleOutputStreamOperator<Object> mapItems = items.map(
      item -> item.getName()
);
           

      我们也可以自定义Map函数,通过重写MapFunction或RichMapFunction来自定义自己的Map函数

class StreamingDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1);
        SingleOutputStreamOperator<String> mapItems = items.map(new MyMapFunction());
        //打印结果
        mapItems.print().setParallelism(1);
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }

    static class MyMapFunction extends RichMapFunction<MyStreamingSource.Item,String> {

        @Override
        public String map(MyStreamingSource.Item item) throws Exception {
            return item.getName();
        }
    }
}
           

      RichMapFunction中还提供了open、close等函数方法,重写这些方法还能实现更为复杂的功能,比如获取累加器、计数器等。

3、FlatMap

      FlatMap接受一个元素,返回0到多个元素。FlatMap和Map有些类似,但是当返回值是列表的时候,FlatMap会将列表“平铺”,也就是以单个元素的形式进行输出。

SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
    @Override
    public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {
        String name = item.getName();
        collector.collect(name);
    }
});
           

4、Filter

      Filter的意思是过滤掉不需要的数据。每个元素都会被Filter函数处理,Filter函数返回True就保留,否则丢弃。

Flink常用的DataSet 和 DataStream API

      例如只保留id为偶数的那些Item。

SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
    @Override
    public boolean filter(MyStreamingSource.Item item) throws Exception {

        return item.getId() % 2 == 0;
    }
});
           
Flink常用的DataSet 和 DataStream API

      也可以在Filter中使用lamba表达式:

SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter( 
    item -> item.getId() % 2 == 0
);
           

5、KeyBy

      在介绍KeyBy之前,先理解一个概念:KeyedStream。在实际业务中,经常需要根据数据的某种属性或字段进行分组,然后对不同的组进行不同的处理。

Flink常用的DataSet 和 DataStream API

      我们在使用KeyBy函数的时候会把DataStream转换成KeyedStream,事实上KeyedStream继承了DataStream,KeyedStream中的元素会根据用户传入的参数进行分组。

// 将接收的数据进行拆分,分组,窗口计算并且进行聚合输出
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds
                ....
           

      在生产环境使用keyby函数要十分注意!这个函数会按照用户指定的key进行分组,那么相同分组的数据会被分发到一个subtask上进行处理,在大数据量和key分布不均匀的时候非常容易出现数据倾斜和反压,导致任务失败。

Flink常用的DataSet 和 DataStream API

      常见的解决方式是把所有的数据加上随机前缀。

6、Aggregations

      Aggregations为聚合函数的总称,常见的聚合函数包括但不限于sum、max、min等。Aggregations也需要指定一个Key进行聚合:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
           

      在上面的这几个函数中,max、min、sum 会分别返回最大值、最小值和汇总值;而 minBy 和 maxBy 则会把最小或者最大的元素全部返回。

      用max和maxBy举例说明:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));

DataStreamSource<MyStreamingSource.Item> items = env.fromCollection(data);
items.keyBy(0).max(2).printToErr();

//打印结果
String jobName = "user defined streaming source";
env.execute(jobName);
           

运行,发现奇怪的一幕:

Flink常用的DataSet 和 DataStream API

      我们希望按照Tuple3的第一个元素进行聚合,并且按照第三个元素取最大值。但是结果却不是这样。

      Flink官网中写道:

The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
           

      意思是:min和minBy的区别在于,min会返回我们指定字段的最小值,minBy会返回对应元素。

min 和 minBy 都会返回整个元素,只是 min 会根据用户指定的字段取最小值,并且把这个值保存在对应的位置,而对于其他的字段,并不能保证其数值正确。max 和 maxBy 同理。

事实上,对于 Aggregations 函数,Flink 帮助我们封装了状态数据,这些状态数据不会被清理,所以在实际生产环境中应该尽量避免在一个无限流上使用 Aggregations。而且,对于同一个 keyedStream ,只能调用一次 Aggregation 函数。

List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));

DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
//items.keyBy(0).max(2).printToErr();

SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> reduce = items.keyBy(0).reduce(new ReduceFunction<Tuple3<Integer, Integer, Integer>>() {
    @Override
    public Tuple3<Integer,Integer,Integer> reduce(Tuple3<Integer, Integer, Integer> t1, Tuple3<Integer, Integer, Integer> t2) throws Exception {
        Tuple3<Integer,Integer,Integer> newTuple = new Tuple3<>();

        newTuple.setFields(0,0,(Integer)t1.getField(2) + (Integer) t2.getField(2));
        return newTuple;
    }
});

reduce.printToErr().setParallelism(1);
           

继续阅读