天天看點

Flink實戰(四) - DataSet API程式設計1 你将學到2 Data Set API 簡介3 測試環境4 Data Sources簡介6 從檔案/檔案夾建立DataSet7 從csv檔案建立Dataset9從壓縮檔案中建立DataSet10 Transformation11 Data Sinks參考

1 你将學到

◆ DataSet API開發概述

◆ 計數器

◆ DataSource

◆ 分布式緩存

◆ Transformation

◆ Sink

2 Data Set API 簡介

Flink中的DataSet程式是實作資料集轉換(例如,過濾,映射,連接配接,分組)的正常程式.

最初從某些Source源建立資料集(例如,通過讀取檔案或從本地集合建立)

結果通過sink傳回,接收器可以例如将資料寫入(分布式)檔案或标準輸出(例如指令行終端)

Flink程式可以在各種環境中運作,單機運作或嵌入其他程式中

執行可以在本地JVM中執行,也可以在叢集機器上執行.

  • 有關Flink API基本概念的介紹,請參閱本系列的上一篇
Flink實戰(三) - 程式設計模型及核心概念

為了建立自己的Flink DataSet程式,鼓勵從Flink程式的解剖開始,逐漸添加自己的轉換!

3 測試環境

4 Data Sources簡介

資料源建立初始資料集,例如來自檔案或Java集合。建立資料集的一般機制是在InputFormat後面抽象的

Flink附帶了幾種内置格式,可以從通用檔案格式建立資料集。其中許多都在ExecutionEnvironment上有快捷方法。

4.1 基于檔案

  • readTextFile(path)/ TextInputFormat

    按行讀取檔案并将它們作為字元串傳回

  • readTextFileWithValue(path)/ TextValueInputFormat

    按行讀取檔案并将它們作為StringValues傳回。 StringValues是可變字元串

  • readCsvFile(path)/ CsvInputFormat

    解析逗号(或其他字元)分隔字段的檔案。傳回元組,案例類對象或POJO的DataSet。支援基本的java類型及其Value對應的字段類型

  • readFileOfPrimitives(path,delimiter)/ PrimitiveInputFormat

    使用給定的分隔符解析新行(或其他char序列)分隔的原始資料類型(如String或Integer)的檔案

  • readSequenceFile(Key,Value,path)/ SequenceFileInputFormat

    建立JobConf并從類型為SequenceFileInputFormat,Key class和Value類的指定路徑中讀取檔案,并将它們作為Tuple2 傳回。

4.2 基于集合

  • fromCollection(Iterable) - 從Iterable建立資料集。 Iterable傳回的所有元素必須屬于同一類型
  • fromCollection(Iterator) - 從疊代器建立資料集。該類指定疊代器傳回的元素的資料類型
  • fromElements(elements:_ *) - 根據給定的對象序列建立資料集。所有對象必須屬于同一類型
  • fromParallelCollection(SplittableIterator) - 并行地從疊代器建立資料集。該類指定疊代器傳回的元素的資料類型
  • generateSequence(from,to) - 并行生成給定時間間隔内的數字序列。

4.3 通用

  • readFile(inputFormat,path)/ FileInputFormat

    接受檔案輸入格式

  • createInput(inputFormat)/ InputFormat

    接受通用輸入格式5 從集合建立DataSet5.1 Scala實作

5.2 Java實作

6 從檔案/檔案夾建立DataSet

6.1 Scala實作

檔案

檔案夾

Java實作

7 從csv檔案建立Dataset

7.1 Scala實作

  • 注意忽略第一行
  • includedFields參數使用
  • 定義一個POJO8 從遞歸檔案夾的内容建立DataSet8.1 Scala實作

9從壓縮檔案中建立DataSet

Flink目前支援輸入檔案的透明解壓縮,如果它們标有适當的檔案擴充名。 特别是,這意味着不需要進一步配置輸入格式,并且任何FileInputFormat都支援壓縮,包括自定義輸入格式。

壓縮檔案可能無法并行讀取,進而影響作業可伸縮性。

下表列出了目前支援的壓縮方法

9.1 Scala實作

10 Transformation

10.1 map

Map轉換在DataSet的每個元素上應用使用者定義的map函數。 它實作了一對一的映射,也就是說,函數必須傳回一個元素。

以下代碼将Integer對的DataSet轉換為Integers的DataSet:

Scala實作

10.2 filter

10.3 mapPartition

MapPartition在單個函數調用中轉換并行分區。 map-partition函數将分區作為Iterable擷取,并且可以生成任意數量的結果值。 每個分區中的元素數量取決于并行度和先前的操作。

10.4 first

10.5 Cross

11 Data Sinks

11.1 Java描述

Data Sinks使用DataSet并用于存儲或傳回它們

使用OutputFormat描述資料接收器操作

Flink帶有各種内置輸出格式,這些格式封裝在DataSet上的操作後面:

  • writeAsText()/ TextOutputFormat

    将元素按行順序寫入字元串。通過調用每個元素的toString()方法獲得字元串。

  • writeAsFormattedText()/ TextOutputFormat

    按字元串順序寫入元素。通過為每個元素調用使用者定義的format()方法來擷取字元串。

  • writeAsCsv(...)/ CsvOutputFormat

    将元組寫為逗号分隔值檔案。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法。

  • print()/ printToErr()/ print(String msg)/ printToErr(String msg)

    列印标準輸出/标準錯誤流上每個元素的toString()值。可選地,可以提供字首(msg),其字首為輸出。這有助于區分不同的列印調用。如果并行度大于1,則輸出也将以生成輸出的任務的辨別符為字首。

  • write()/ FileOutputFormat

    自定義檔案輸出的方法和基類。支援自定義對象到位元組的轉換。

  • output()/ OutputFormat

    最通用的輸出方法,用于非基于檔案的資料接收器(例如将結果存儲在資料庫中)。

可以将DataSet輸入到多個操作。程式可以編寫或列印資料集,同時對它們執行其他轉換。

例子

标準資料接收方法:

// text data
DataSet<String> textData = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS");

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// tuples as lines with pipe as the separator "a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file");

// this writes values as strings using a user-defined TextFormatter object
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });           

使用自定義輸出格式:

DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// write Tuple DataSet to a relational database
myResult.output(
    // build and configure OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
                    .setDBUrl("jdbc:derby:memory:persons")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );           

本地排序輸出

可以使用元組字段位置或字段表達式以指定順序在指定字段上對資料接收器的輸出進行本地排序。 這适用于每種輸出格式。

以下示例顯示如何使用此功能:

DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print();

// sort output on Double field in descending and Integer field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);

// sort output on the full tuple in ascending order
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);

// sort atomic type (String) output in descending order
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);           

參考

DataSet Transformations

相關源碼