天天看點

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

1 概覽

1.1 預定義的源和接收器

Flink内置了一些基本資料源和接收器,并且始終可用。該預定義的資料源包括檔案,目錄和插socket,并從集合和疊代器攝取資料。該預定義的資料接收器支援寫入檔案和标準輸入輸出及socket。

1.2 綁定連接配接器

連接配接器提供用于與各種第三方系統連接配接的代碼。目前支援這些系統:

Apache Kafka (source/sink)

Apache Cassandra (sink)

Amazon Kinesis Streams (source/sink)

Elasticsearch (sink)

Hadoop FileSystem (sink)

RabbitMQ (source/sink)

Apache NiFi (source/sink)

Twitter Streaming API (source)

Google PubSub (source/sink)

要在應用程式中使用其中一個連接配接器,通常需要其他第三方元件,例如資料存儲或消息隊列的伺服器。

雖然本節中列出的流連接配接器是Flink項目的一部分,并且包含在源版本中,但它們不包含在二進制分發版中。

1.3 Apache Bahir中的連接配接器

Flink的其他流處理連接配接器正在通過Apache Bahir釋出,包括:

Apache ActiveMQ (source/sink)

Apache Flume (sink)

Redis (sink)

Akka (sink)

Netty (source)

1.4 其他連接配接到Flink的方法

1.4.1 通過異步I / O進行資料渲染

使用連接配接器不是将資料輸入和輸出Flink的唯一方法。一種常見的模式是在一個Map或多個FlatMap 中查詢外部資料庫或Web服務以渲染主資料流。

Flink提供了一個用于異步I / O的API, 以便更有效,更穩健地進行這種渲染。

1.4.2 可查詢狀态

當Flink應用程式将大量資料推送到外部資料存儲時,這可能會成為I / O瓶頸。如果所涉及的資料具有比寫入更少的讀取,則更好的方法可以是外部應用程式從Flink擷取所需的資料。在可查詢的狀态界面,允許通過Flink被管理的狀态,按需要查詢支援這個。

2 HDFS連接配接器

此連接配接器提供一個Sink,可将分區檔案寫入任一Hadoop檔案系統支援的檔案系統 。

要使用此連接配接器,請将以下依賴項添加到項目中:

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

請注意,流連接配接器目前不是二進制釋出的一部分

2.1 Bucketing File Sink

可以配置分段行為以及寫入,但我們稍後會介紹。這是可以建立一個預設情況下彙總到按時間拆分的滾動檔案的存儲槽的方法

  • Java
  • Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
  • Scala
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

唯一必需的參數是存儲桶的基本路徑。可以通過指定自定義bucketer,寫入器和批量大小來進一步配置接收器。

預設情況下,當資料元到達時,分段接收器将按目前系統時間拆分,并使用日期時間模式"yyyy-MM-dd–HH"命名存儲區。這種模式傳遞給 DateTimeFormatter使用目前系統時間和JVM的預設時區來形成存儲桶路徑。使用者還可以為bucketer指定時區以格式化存儲桶路徑。每當遇到新日期時,都會建立一個新存儲桶。

例如,如果有一個包含分鐘作為最精細粒度的模式,将每分鐘獲得一個新桶。每個存儲桶本身都是一個包含多個部分檔案的目錄:接收器的每個并行執行個體将建立自己的部件檔案,當部件檔案變得太大時,接收器也會在其他檔案旁邊建立新的部件檔案。當存儲桶變為非活動狀态時,将重新整理并關閉打開的部件檔案。如果存儲桶最近未寫入,則視為非活動狀态。預設情況下,接收器每分鐘檢查一次非活動存儲桶,并關閉任何超過一分鐘未寫入的存儲桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一個BucketingSink。

也可以通過指定自定義bucketer setBucketer()上BucketingSink。如果需要,bucketer可以使用資料元或元組的屬性來确定bucket目錄。

預設編寫器是StringWriter。這将調用toString()傳入的資料元并将它們寫入部分檔案,由換行符分隔。在a setWriter() 上指定自定義編寫器使用BucketingSink。如果要編寫Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter,也可以配置為使用壓縮。

有兩個配置選項指定何時應關閉零件檔案并啟動新零件檔案:

通過設定批量大小(預設部件檔案大小為384 MB)

通過設定批次滾動時間間隔(預設滾動間隔為Long.MAX_VALUE)

當滿足這兩個條件中的任何一個時,将啟動新的部分檔案。看如下例子:

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

這将建立一個接收器,該接收器将寫入遵循此模式的存儲桶檔案:

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

生成結果

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
  • date-time是我們從日期/時間格式擷取的字元串
  • parallel-task是并行接收器執行個體的索引
  • count是由于批處理大小或批處理翻轉間隔而建立的部分檔案的運作數
  • Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

然而這種方式建立了太多小檔案,不适合HDFS!僅供娛樂!

3 Apache Kafka連接配接器

3.1 簡介

此連接配接器提供對Apache Kafka服務的事件流的通路。

Flink提供特殊的Kafka連接配接器,用于從/向Kafka主題讀取和寫入資料。Flink Kafka Consumer內建了Flink的檢查點機制,可提供一次性處理語義。為實作這一目标,Flink并不完全依賴Kafka的消費者群體偏移跟蹤,而是在内部跟蹤和檢查這些偏移。

為用例和環境選擇一個包(maven artifact id)和類名。對于大多數使用者來說,FlinkKafkaConsumer08(部分flink-connector-kafka)是合适的。

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

然後,導入maven項目中的連接配接器:

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
https://blog.csdn.net/qq_33589510/article/details/97064338

3.2 ZooKeeper安裝及配置

配置系統環境

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

修改配置資料存儲路徑

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

啟動

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

3.3 Kafka部署及測試

假設你剛剛開始并且沒有現有的Kafka或ZooKeeper資料

由于Kafka控制台腳本對于基于Unix和Windows的平台不同,是以在Windows平台上使用bin \ windows \而不是bin /,并将腳本擴充名更改為.bat。

Step 1:下載下傳代碼

下載下傳:

https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

解壓

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

配置環境變量

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

配置伺服器屬性

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

修改日志存儲路徑

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

修改主機名

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

Step 2: 啟動伺服器

Kafka使用ZooKeeper,是以如果還沒有ZooKeeper伺服器,則需要先啟動它。

  • 背景模式啟動
  • Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

Step 3: 建立一個主題

  • 建立topic
  • Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

Step 4: 發送一些消息

Kafka附帶一個指令行用戶端,它将從檔案或标準輸入中擷取輸入,并将其作為消息發送到Kafka叢集。 預設情況下,每行将作為單獨的消息發送。

運作生産者,然後在控制台中鍵入一些消息以發送到伺服器。

  • 啟動生産者
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

Step 5: 啟動一個消費者

Kafka還有一個指令行使用者,它會将消息轉儲到标準輸出。

  • 分屏,建立消費端
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

在不同的終端中運作上述每個指令,那麼現在應該能夠在生産者終端中鍵入消息并看到它們出現在消費者終端中

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

所有指令行工具都有其他選項; 運作不帶參數的指令将顯示更詳細地記錄它們的使用資訊。

3.4 Kafka 1.0.0+ Connector

從Flink 1.7開始,有一個新的通用Kafka連接配接器,它不跟蹤特定的Kafka主要版本。 相反,它在Flink釋出時跟蹤最新版本的Kafka。

如果您的Kafka代理版本是1.0.0或更高版本,則應使用此Kafka連接配接器。 如果使用舊版本的Kafka(0.11,0.10,0.9或0.8),則應使用與代理版本對應的連接配接器。

相容性

通過Kafka用戶端API和代理的相容性保證,通用Kafka連接配接器與較舊和較新的Kafka代理相容。 它與版本0.11.0或更高版本相容,具體取決于所使用的功能。

将Kafka Connector從0.11遷移到通用(V1.10新增)

要執行遷移,請參閱更新作業和Flink版本指南和

在整個過程中使用Flink 1.9或更新版本。

不要同時更新Flink和操作符。

確定您作業中使用的Kafka Consumer和/或Kafka Producer配置設定了唯一辨別符(uid):

使用stop with savepoint功能擷取儲存點(例如,使用stop --withSavepoint)CLI指令。

用法

要使用通用Kafka連接配接器,請為其添加依賴關系:

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

然後執行個體化新源(FlinkKafkaConsumer)

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

Flink Kafka Consumer是一個流資料源,可以從Apache Kafka中提取并行資料流。 使用者可以在多個并行執行個體中運作,每個執行個體都将從一個或多個Kafka分區中提取資料。

Flink Kafka Consumer參與了檢查點,并保證在故障期間沒有資料丢失,并且計算處理元素“恰好一次”。(注意:這些保證自然會假設Kafka本身不會丢失任何資料。)

請注意,Flink在内部将偏移量作為其分布式檢查點的一部分進行快照。 承諾給Kafka的抵消隻是為了使外部的進展觀與Flink對進展的看法同步。 這樣,監控和其他工作可以了解Flink Kafka消費者在多大程度上消耗了一個主題。

和接收器(FlinkKafkaProducer)。

除了從子產品和類名中删除特定的Kafka版本之外,API向後相容Kafka 0.11連接配接器。

3.5 Kafka消費者

Flink的Kafka消費者被稱為FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供對一個或多個Kafka主題的通路。

構造函數接受以下參數:

主題名稱/主題名稱清單

DeserializationSchema / KeyedDeserializationSchema用于反序列化來自Kafka的資料

Kafka消費者的屬性。需要以下屬性:

“bootstrap.servers”(以逗号分隔的Kafka經紀人名單)

“zookeeper.connect”(逗号分隔的Zookeeper伺服器清單)(僅Kafka 0.8需要)

“group.id”消費者群組的ID

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
  • 上述程式注意配置ip主機映射
  • 虛拟機hosts
  • Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
  • 本地機器 hosts
Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

發送消息

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

運作程式消費消息

Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
  • Example:
  • Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器
  • Flink實戰(八) - Streaming Connectors 程式設計(上)1 概覽2 HDFS連接配接器3 Apache Kafka連接配接器

The DeserializationSchema

Flink Kafka Consumer需要知道如何将Kafka中的二進制資料轉換為Java / Scala對象。

在 DeserializationSchema允許使用者指定這樣的一個架構。T deserialize(byte[] message) 為每個Kafka消息調用該方法,從Kafka傳遞值。

從它開始通常很有幫助AbstractDeserializationSchema,它負責将生成的Java / Scala類型描述為Flink的類型系統。實作vanilla的使用者DeserializationSchema需要自己實作該getProducedType(…)方法。

為了通路Kafka消息的鍵和值,KeyedDeserializationSchema具有以下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)。

為友善起見,Flink提供以下模式:

TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)建立基于Flink的模式TypeInformation。如果Flink編寫和讀取資料,這将非常有用。此模式是其他通用序列化方法的高性能Flink替代方案。

JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)将序列化的JSON轉換為ObjectNode對象,可以使用objectNode.get(“field”)作為(Int / String / …)()從中通路字段。KeyValue objectNode包含一個“key”和“value”字段,其中包含所有字段,以及一個可選的“中繼資料”字段,用于公開此消息的偏移量/分區/主題。

AvroDeserializationSchema它使用靜态提供的模式讀取使用Avro格式序列化的資料。它可以從Avro生成的類(AvroDeserializationSchema.forSpecific(…))中推斷出模式,也可以GenericRecords 使用手動提供的模式(with AvroDeserializationSchema.forGeneric(…))。此反序列化架構要求序列化記錄不包含嵌入式架構。

還有一個可用的模式版本,可以在Confluent Schema Registry中查找編寫器的模式(用于編寫記錄的 模式)。使用這些反序列化模式記錄将使用從模式系統資料庫中檢索的模式進行讀取,并轉換為靜态提供的模式(通過 ConfluentRegistryAvroDeserializationSchema.forGeneric(…)或ConfluentRegistryAvroDeserializationSchema.forSpecific(…))。

要使用此反序列化模式,必須添加以下附加依賴項:

當遇到因任何原因無法反序列化的損壞消息時,有兩個選項 - 從deserialize(…)方法中抛出異常将導緻作業失敗并重新啟動,或者傳回null以允許Flink Kafka使用者以靜默方式跳過損壞的消息。請注意,由于使用者的容錯能力(請參閱下面的部分以擷取更多詳細資訊),是以對損壞的消息執行失敗将使消費者嘗試再次反序列化消息。是以,如果反序列化仍然失敗,則消費者将在該損壞的消息上進入不間斷重新開機和失敗循環。