天天看點

Flink中的的Transformation算子及執行個體

Map

DataStream → DataStream:輸入一個參數産生一個參數。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.generateSequence(1,10)
val streamMap = stream.map { x => x * 2 }
streamFilter.print()

env.execute("FirstJob")
           
注意:stream.print():每一行前面的數字代表這一行是哪一個并行線程輸出的。      
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.utils.ParameterTool;
import scala.Tuple2;

import java.util.Random;


public class StuScore {
    private static Random rand = new Random();

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        DataSet<String> text;
        if (params.has("input")) {
            text = env.readTextFile("F:\\date\\flinkdata\\stu.txt");
        }else{
            System.out.println("請檢查你的輸入");
            return;
        }

        MapOperator<String, Tuple2<String, Integer>> stuscore = text.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s, rand.nextInt(100) + 1);
            }
        });

        if (params.has("output")) {
            stuscore.writeAsCsv("F:\\date\\flinkdata\\personinput\\A");
        }else {
            System.out.println("列印到控制台");
            stuscore.print();
        }
    }
}
           

FlatMap

DataStream → DataStream:輸入一個參數,産生0個、1個或者多個輸出。

import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}
streamFilter.print()

env.execute("FirstJob")
           
Filter      

DataStream → DataStream:結算每個元素的布爾值,并傳回布爾值為true的元素。下面這個例子是過濾出非0的元素:

import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.generateSequence(1,10)
val streamFilter = stream.filter{
  //列印奇數
    x => (x % 2 != 0)
}
streamFilter.print()

env.execute("FirstJob")
           

Connect

Flink中的的Transformation算子及執行個體

DataStream,DataStream → ConnectedStreams:連接配接兩個保持他們類型的資料流,兩個資料流被Connect之後,隻是被放在了一個同一個流中,内部依然保持各自的資料和形式不發生任何變化,兩個流互相獨立。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")

val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))
//streamMap和streamCollect交換順序不會影響結果
val streamConnect = streamMap.connect(streamCollect)

streamConnect.map(item=>println(item), item=>println(item))

env.execute("FirstJob")
           

CoMap,CoFlatMap

Flink中的的Transformation算子及執行個體

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分别進行map和flatMap處理。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
    (str) => str + "connect",
    (in) => in + 100
)

streamCoMap.print()

env.execute("FirstJob")

//========================

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
    (str1) => str1.split(" "),
    (str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))

env.execute("FirstJob")
           

Split

Flink中的的Transformation算子及執行個體

DataStream → SplitStream:根據某些特征把一個DataStream拆分成兩個或者多個DataStream。注:此代碼無法運作出結果,使用Select即可運作

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
  //字元串内容為hadoop的組成一個DataStream,其餘的組成一個DataStream 
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)

env.execute("FirstJob")
           

Select

Flink中的的Transformation算子及執行個體

SplitStream→DataStream:從一個SplitStream中擷取一個或者多個DataStream。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("F:\date\flinkdata\stu.tsv")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)

val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
other.print()

env.execute("FirstJob")
           

Union

Flink中的的Transformation算子及執行個體

DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,産生一個包含所有DataStream元素的新DataStream。注意:如果你将一個DataStream跟它自己做union操作,在新的DataStream中,你将看到每一個元素都出現兩次。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)

env.execute("FirstJob")
           

KeyBy

DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地将一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在内部以hash的形式實作的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}
val streamMap = streamFlatMap.map{
    x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")
           

Reduce

KeyedStream → DataStream:一個分組資料流的聚合操作,合并目前的元素和上次聚合的結果,産生一個新的值,傳回的流中包含每一次聚合的結果,而不是隻傳回最後一次聚合的最終結果。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)

val streamReduce = stream.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)

streamReduce.print()

env.execute("FirstJob")
           

Fold

KeyedStream → DataStream:一個有初始值的分組資料流的滾動折疊操作,合并目前元素和前一次折疊操作的結果,并産生一個新的值,傳回的流中包含每一次折疊的結果,而不是隻傳回最後一次折疊的最終結果。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)

val streamReduce = stream.fold(100)(
  (begin, item) => (begin + item._2)
)

streamReduce.print()

env.execute("FirstJob")
           

Aggregations

KeyedStream → DataStream:分組資料流上的滾動聚合操作。min和minBy的差別是min傳回的是一個最小值,而minBy傳回的是其字段中包含最小值的元素(同樣原理适用于max和maxBy),傳回的流中包含每一次聚合的結果,而不是隻傳回最後一次聚合的最終結果。

keyedStream.sum(0) 
keyedStream.sum("key") 
keyedStream.min(0) 
keyedStream.min("key") 
keyedStream.max(0) 
keyedStream.max("key") 
keyedStream.minBy(0) 
keyedStream.minBy("key") 
keyedStream.maxBy(0) 
keyedStream.maxBy("key")

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)

val streamReduce = stream.sum(1)

streamReduce.print()

env.execute("FirstJob")
           

   在2.3.10之前的算子都是可以直接作用在Stream上的,因為他們不是聚合類型的操作,但是到2.3.10後你會發現,我們雖然可以對一個無邊界的流資料直接應用聚合算子,但是它會記錄下每一次的聚合結果,這往往不是我們想要的,其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,隻有配合Window,才能得到想要的結果。

繼續閱讀