天天看點

Flink之Java lambda表達式

一.簡介

Java 8引入了一些新的語言功能,旨在更快,更清晰地編碼。它具有最重要的功能,即所謂的“ Lambda表達式”,為函數式程式設計打開了大門。Lambda表達式允許以直接方式實作和傳遞函數,而無需聲明其他(匿名)類。

注意: Flink支援對Java API的所有運算符使用lambda表達式,但是,每當lambda表達式使用Java泛型時,都需要顯式聲明類型資訊。

二.範例與限制

下面的示例說明如何實作一個簡單的内聯map()函數,該函數使用lambda表達式對輸入進行平方。函數的輸入i和輸出參數的類型map()無需聲明,因為它們是Java編譯器推斷的。

env.fromElements(1, 2, 3)
// returns the squared i
.map(i -> i*i)
.print();
           

Flink可以從方法簽名的實作中自動提取結果類型資訊,out map(in value)因為out【map的傳回值類型】它不是通用的,而是in類型是可以自動推導的。

三.異常分析及解決方案

不幸的是,flatMap()帶有簽名的功能flatMap(IN value, Collector out)是flatMap(IN value, Collector out)由Java編譯器編譯進來的。這使得Flink無法自動推斷輸出類型的類型資訊。

Flink很可能會引發類似于以下内容的異常:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
    In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
    An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
    Otherwise the type has to be specified explicitly using type information.
           

在這種情況下,需要明确指定類型資訊,否則輸出将被視為Object導緻無效序列化的類型。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.util.Collector;

DataSet<Integer> input = env.fromElements(1, 2, 3);

// collector type must be declared
input.flatMap((Integer number, Collector<String> out) -> {
    StringBuilder builder = new StringBuilder();
    for(int i = 0; i < number; i++) {
        builder.append("a");
        out.collect(builder.toString());
    }
})
// provide type information explicitly
.returns(Types.STRING)
// prints "a", "a", "aa", "a", "aa", "aaa"
.print();
           

當使用map()具有通用傳回類型的函數時,也會發生類似的問題。在下面的示例中Tuple2<Integer, Integer> map(Integer value)是基于删除方法簽名Tuple2 map(Integer value)。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

env.fromElements(1, 2, 3)
    .map(i -> Tuple2.of(i, i))    // no information about fields of Tuple2
    .print();
           

通常,這些問題可以通過多種方式解決:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;

// use the explicit ".returns(...)"
env.fromElements(1, 2, 3)
    .map(i -> Tuple2.of(i, i))
    .returns(Types.TUPLE(Types.INT, Types.INT))
    .print();

// use a class instead
env.fromElements(1, 2, 3)
    .map(new MyTuple2Mapper())
    .print();

public static class MyTuple2Mapper extends MapFunction<Integer, Tuple2<Integer, Integer>> {
    @Override
    public Tuple2<Integer, Integer> map(Integer i) {
        return Tuple2.of(i, i);
    }
}

// use an anonymous class instead
env.fromElements(1, 2, 3)
    .map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
        @Override
        public Tuple2<Integer, Integer> map(Integer i) {
            return Tuple2.of(i, i);
        }
    })
    .print();

// or in this example use a tuple subclass instead
env.fromElements(1, 2, 3)
    .map(i -> new DoubleTuple(i, i))
    .print();

public static class DoubleTuple extends Tuple2<Integer, Integer> {
    public DoubleTuple(int f0, int f1) {
        this.f0 = f0;
        this.f1 = f1;
    }
}
           

繼續閱讀