天天看点

Apache Flink SQL 详解与实践

问题导读

1.为何会有Flink SQL?

2.本文哪些地方涉及Flink 1.7?

4.如何定义源(sources )和接收器(sinks)?

5.Flink SQL本文介绍了哪些sql?

6.将数据格式化为正确的格式以便进一步处理?

7.如何监控Flink sql查询

8.使用Flink SQL中的视图的作用是什么?

9.本文使用Flink sql实现了什么案例?

虽然Flink SQL最初于2016年8月与Flink 1.1.0一起发布,但最近的Flink版本增加了相当多的功能,使Flink SQL更易于使用,无需会编写Java / Scala代码。 在这篇文章中,我们希望(重新)从这些变化所带来的新角度介绍Flink SQL,同时为经验丰富的用户提供一些额外的知识。

新添加的SQL命令行(SQL CLI)可以轻松快速浏览流中的数据或静态数据(例如,在数据库或HDFS中)。 它还可用于构建功能强大的数据转换管道或分析管道。 在这篇文章中,我们想要探索当前可用的功能,而后续文章将更详细地介绍特定功能,并介绍Flink 1.7即将推出的令人兴奋的新功能,例如使用MATCH_RECOGNIZE扩展的复杂event处理和改进 基于时间的enrichment(富集) join。

在我们深入研究一些实践实例之前,我们列出了Flink SQL的一些亮点:

  • Flink SQL是批处理和流处理的统一API:这允许使用相同的查询来处理历史数据和实时数据
  • 支持处理时间和事件时间语义
  • 支持使用嵌套的Avro和JSON数据
  • 用户定义的scalar,聚合和表值(table-valued)函数
  • 无需编码的SQL命令行(即没有Java / Scala编码)
  • 支持各种类型的流连接
  • 支持聚合,包括窗口和没有窗口

定义 Sources 和Sinks

使用Flink SQL的命令行客户端时,我们要做的第一件事就是定义源(sources )和接收器(sinks)。 否则,我们将无法读取或写入任何数据。 源和接收器在YAML配置文件中定义,以及其他配置设置。 YAML文件中的源和接收器配置类似于SQL DDL语句(Flink社区目前正在讨论对SQL DDL的支持)。 对于我们正在进行的示例,我们假设我们有一个Kafka主题(topic),其中存储了我们想要进一步处理和分析的出租车游乐设施的信息。 它的配置如下所示:

tables:
  - name: TaxiRides
    type: source
    update-mode: append
    schema:
    - name: rideId
      type: LONG
    - name: rowTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "rideTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: taxiId
      type: LONG
    - name: driverId
      type: LONG
    - name: psgCnt
      type: INT
    connector:
      property-version: 1
      type: kafka
      version: 0.11
      topic: TaxiRides
      startup-mode: earliest-offset
      properties:
      - key: zookeeper.connect
        value: zookeeper:2181
      - key: bootstrap.servers
        value: kafka:9092
      - key: group.id
        value: testGroup
    format:
      property-version: 1
      type: json
      schema: "ROW(rideId LONG, isStart BOOLEAN, 
rideTime TIMESTAMP, lon FLOAT, lat FLOAT, 
psgCnt INT, taxiId LONG, driverId LONG)"
           

在Flink SQL中,源,接收器以及介于两者之间的所有内容称为表。 在这里,我们基于包含JSON格式的事件的Kafka主题定义初始表。 我们定义Kafka配置设置,格式以及我们如何将其映射到模式,以及我们希望如何从数据中导出watermarks 。 除了JSON之外,Flink SQL还内置了对CSV和Avro格式的支持,并且还可以使用自定义格式对其进行扩展。 Flink SQL始终支持在JSON和Avro架构中处理嵌套数据。

Flink SQL的使用

现在我们讨论了源表的配置和格式,下面我们说说 Flink SQL的使用

从Flink SQL命令行客户端,我们可以列出我们定义的表:

Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro
           

我们还可以检查任何表的schema :

Flink SQL> DESCRIBE TaxiRides;
root
|-- rideId: Long
|-- rowTime: TimeIndicatorTypeInfo(rowtime)
|-- isStart: Boolean
|-- lon: Float
|-- lat: Float
|-- taxiId: Long
|-- driverId: Long
|-- psgCnt: Integer
           

有了这个,让我们看看我们可以用我们的表做什么。

有关配置Flink SQL以及定义源,接收器及其格式的详细信息,请参阅文档(https://ci.apache.org/projects/f ... l#environment-files)。

格式化数据

我们可能想要做的最简单的事情之一是将数据格式化为正确的格式以便进一步处理。 这可能包括:

  • 在schema之间转换,例如将JSON事件流转换为Avro编码
  • 用SQL语句中删除字段或将其投影
  • 过滤掉我们不感兴趣的整个事件(events )

让我们看一下从架构转换开始我们将如何做到这些。 当我们想要从Kafka读取数据时,将数据转换为不同的格式,并将数据写回不同的Kafka主题以进行下游处理,我们所要做的就是定义源表(如上所述)然后定义 作为接收器的表格具有不同的格式:

tables:
  - name: TaxiRides_Avro0
    type: sink
    update-mode: append
    schema:
    - name: rideId
      type: LONG
    - name: rowTime
      type: TIMESTAMP
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: taxiId
      type: LONG
    - name: driverId
      type: LONG
    - name: psgCnt
      type: INT
    connector:
      property-version: 1
      type: kafka
      version: 0.11
      topic: TaxiRides_Avro
      properties:
      - key: zookeeper.connect
        value: zookeeper:2181
      - key: bootstrap.servers
        value: kafka:9092
      - key: group.id
        value: trainingGroup
    format:
      property-version: 1
      type: avro
      avro-schema: >
          {
            "type": "record",
            "name": "test",
            "fields" : [
              {"name": "rideId", "type": "long"},
              {"name": "rowTime", "type": {"type": "long", "logicalType": "timestamp-millis"}},
              {"name": "isStart", "type": "boolean"},
              {"name": "lon", "type": "float"},
              {"name": "lat", "type": "float"},
              {"name": "taxiId", "type": "long"},
              {"name": "driverId", "type": "long"},
              {"name": "psgCnt", "type": "int"}
            ]
          }
           

通过我们定义的源和接收器转换数据变得如此简单:

Flink SQL> INSERT INTO TaxiRides_Avro SELECT * FROM TaxiRides;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: ffa9109b9cad077ec83137f55ec6d1c5
Web interface: http://jobmanager:8081
           

我们的查询作为常设查询提交给Flink集群。可以通过访问http://localhost:8081来监视和控制来自Flink的WebUI的查询。

我们可以通过引入(projection)投影和(filtering)过滤来构建这个简单的模式。 如果我们只想在结果中包含某些字段,我们可以在SELECT查询中指定。 例如:

Flink SQL> INSERT INTO TaxiRides_Avro SELECT rideIdId, taxiId, driverId FROM TaxiRides;
           

这只会给我们events中的ID。 (请记住,需要调整接收器的格式才能使此查询起作用。)

基于此,我们可以做的另一件简单事情就是过滤掉整个事件。 考虑一下我们只对在某个城市发生的出租车乘坐感兴趣的情况。 事件具有lon和lat字段,分别给出事件发生的经度和纬度。 我们可以使用它们来确定事件是否发生在某个城市:

Flink SQL> SELECT * FROM TaxiRides WHERE isInNYC(lon, lat);
           

你可能会注意到,那就是isInNYC()。 这是我们在SQL客户端配置中定义的用户定义函数或UDF。 我们可以通过以下方式查看我们提供的用户功能:

Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId
           

就像在Flink SQL客户端配置文件中配置的其他内容一样:

functions:
- name: timeDiff
  from: class
  class: com.dataartisans.udfs.TimeDiff
- name: isInNYC
  from: class
  class: com.dataartisans.udfs.IsInNYC
- name: toAreaId
  from: class
  class: com.dataartisans.udfs.ToAreaId
- name: toCoords
  from: class
  class: com.dataartisans.udfs.ToCoords
           

UDF是实现特定接口并在客户端注册的Java类。 有不同类型的用户功能:(scalar )标量函数,表函数和聚合函数。 其中详细介绍了用户定义的函数,可以查看UDF文档。

使用Flink SQL中的视图构建查询

一旦我们有足够复杂的SQL查询,它们就会变得有点难以理解。 我们可以通过在Flink SQL中定义视图来缓解这种情况。 这类似于在编程语言中定义变量以给出某个名称的方式,以便以后能够重用它。 假设我们想要在早期的例子的基础上进行构建,并创建一个在给定日期之后在某个城市发生的游乐设施的视图。 我们会这样做:

Flink SQL> CREATE VIEW TaxiRides_NYC AS SELECT * FROM TaxiRides
  WHERE isInNYC(lon, lat)
  AND rowTime >= TIMESTAMP '2013-01-01 00:00:00';
[INFO] View has been created.
           

我们可以通过以下方式找出视图:

Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro
TaxiRides_NYC
           

需要注意的一点是,创建视图实际上并不实例化任何常设查询或产生任何输出或中间结果。 视图只是可以重用的查询的逻辑名称,并允许更好地构建查询。 这与其他一些类似SQL的流式系统不同,在这些系统中,每个中间查询都会创建数据并使用资源。

视图是Flink 1.7的即将推出的功能,但它已经实现并合并到主分支中( master branch),这就是为什么我们已经在这里提到它。 另外,它非常有用。

基于事件时间的窗口化聚合

作为最后一步,我们希望展示一个更复杂的查询,它将我们到目前为止所解释的内容汇集在一起。 考虑一种情况,我们希望监控正在发生的游乐设施,并且需要知道某个城市某个特定区域的游乐设施数量何时超过阈值(比如说5)。 这是这样做的查询:

SELECT
  toAreaId(lon, lat) AS area,
  TUMBLE_END(rowTime, INTERVAL '5' MINUTE) AS t,
  COUNT(*) AS c
FROM TaxiRides_NYC
WHERE isStart = TRUE
GROUP BY
  toAreaId(lon, lat),
  TUMBLE(rowTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
           

在上面的示例中,我们执行以下操作:

  • 我们使用之前创建的视图,其中包含在特定日期之后发生的某个城市的事件,
  • 我们过滤掉那些不是“开始事件”的事件,
  • 我们使用另一个用户定义的函数将lon,lat对转换为区域id和group by,
  • 我们指定我们想要有五分钟的窗口,最后
  • 我们过滤掉那些计数小于5的窗口。

在现实世界的用例中,我们现在将其写入Elasticsearch接收器并使用它为仪表板或通知系统供电。留给大家思考。

总结

在这篇博文中,我们解释了如何在不编写Java代码的情况下使用Flink SQL实现简单的数据转换和数据Massaging作业。 我们还解释了如何使用视图来构建更复杂的查询并使其易于理解。 最后,我们开发了一个更复杂的查询,它结合了用户定义的函数,窗口聚合和事件时间支持。

在后续文章中,我们将提供有关如何开发和使用用户定义函数的更多内容,我们将深入了解Flink SQL的强大连接以及如何使用它们来丰富数据。 在Flink 1.7.0发布之后使用Flink SQL的数据丰富,复杂事件处理和模式检测引入强大的新增功能。