天天看點

使用 Apache Kafka、Kafka Connect、Debezium 和 ksqlDB 的實時流 ETL

ETL 代表提取-轉換-加載,是将資料從一個源系統移動到另一個源系統的過程。下面将描述如何使用 Apache Kafka、Kafka Connect、Debezium 和 ksqlDB 建構實時流 ETL 流程。

建構業務應用程式時,會先根據應用程式的功能需求來設計資料模型。為了重塑我們的資料,需要将其移動到另一個資料庫。

在行業中,人們大多從源系統中批量提取資料,在合理的時間段内,主要是每天一次,但也可以是每小時一次,也可以是兩三天一次。保持較短的周期可能會導緻源系統的資源使用率更高,目标系統的中斷頻繁;但是,保持較長時間可能會導緻目标系統出現最新問題。是以,我們需要一些對源系統性能造成最小影響的東西,并且可以在更短的時間内或實時更新目标系統。

Debezium 不使用 SQL 提取資料。它使用資料庫日志檔案來跟蹤資料庫中的更改,是以它對源系統的影響最小。

提取資料後,需要 Kafka Connect 将其流式傳輸到 Apache Kafka 中,以便根據需要使用它并對其進行重塑。可以使用 ksqlDB 來以目标系統所需的方式重塑原始資料。下面考慮一個簡單的訂購系統資料庫,其中有一個客戶表、一個産品表和一個訂單表,如下所示。

使用 Apache Kafka、Kafka Connect、Debezium 和 ksqlDB 的實時流 ETL

現在需要送出一份關于訂單的報告,可以看到購買者的電子郵件,并且在同一行中顯示了産品的名稱。如下圖:

使用 Apache Kafka、Kafka Connect、Debezium 和 ksqlDB 的實時流 ETL

客戶列将包含位于客戶表的電子郵件字段中的客戶的電子郵件,而産品列将包含位于産品表的名稱字段中的産品名稱。

首先,建立一個源連接配接器來從源資料庫中提取資料。源資料庫是 MySQL 資料庫,是以使用 Debezium MySQL Source Connector,如下所示:

CREATE SOURCE CONNECTOR `mysql-connector` WITH(

    "connector.class"= 'io.debezium.connector.mysql.MySqlConnector',

    "tasks.max"= '1',

    "database.hostname"= 'mysql',

    "database.port"= '3306',

    "database.user"= 'root',

    "database.password"= 'debezium',

    "database.server.id"= '184054',

    "database.server.name"= 'dbserver1',

    "database.whitelist"= 'inventory',

    "table.whitelist"= 'inventory.customers,inventory.products,inventory.orders',

    "database.history.kafka.bootstrap.servers"= 'kafka:9092',

    "database.history.kafka.topic"= 'schema-changes.inventory',

    "transforms"= 'unwrap',

    "transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState',

    "key.converter"= 'org.apache.kafka.connect.json.JsonConverter',

    "key.converter.schemas.enable"= 'false',

    "value.converter"= 'org.apache.kafka.connect.json.JsonConverter',

    "value.converter.schemas.enable"= 'false');
           

現在擁有了來自源系統的表、客戶、産品和訂單的 Kafka 主題。

ksql> show topics;

 

 Kafka Topic                   | Partitions | Partition Replicas

-----------------------------------------------------------------

 dbserver1                     | 1          | 1

 dbserver1.inventory.customers | 1          | 1

 dbserver1.inventory.orders    | 1          | 1

 dbserver1.inventory.products  | 1          | 1

 default_ksql_processing_log   | 1          | 1

 my_connect_configs            | 1          | 1

 my_connect_offsets            | 25         | 1

 my_connect_statuses           | 5          | 1

 schema-changes.inventory      | 1          | 1
           

使用以下腳本,為訂單建立一個 ksqlDB 流,該流在訂單資料旁邊連接配接客戶和産品資料。

CREATE STREAM S_CUSTOMER (ID INT,

                       FIRST_NAME string,

                       LAST_NAME string,

                       EMAIL string)

                 WITH (KAFKA_TOPIC='dbserver1.inventory.customers',

                       VALUE_FORMAT='json');

 

CREATE TABLE T_CUSTOMER

AS

    SELECT id,

           latest_by_offset(first_name) as fist_name,

           latest_by_offset(last_name) as last_name,

           latest_by_offset(email) as email

    FROM s_customer

    GROUP BY id

    EMIT CHANGES;

 

CREATE STREAM S_PRODUCT (ID INT,

                       NAME string,

                       description string,

                       weight DOUBLE)

                 WITH (KAFKA_TOPIC='dbserver1.inventory.products',

                       VALUE_FORMAT='json');

 

CREATE TABLE T_PRODUCT

AS

    SELECT id,

           latest_by_offset(name) as name,

           latest_by_offset(description) as description,

           latest_by_offset(weight) as weight

    FROM s_product

    GROUP BY id

    EMIT CHANGES;

 

CREATE STREAM s_order (

    order_number integer,

    order_date timestamp,

    purchaser integer,

    quantity integer,

    product_id integer) 

    WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json');

 

CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS

   select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id

     from s_order as o 

left join t_product as p on o.product_id = p.id

left join t_customer as c on o.purchaser = c.id

partition by o.order_number

emit changes;
最後,在 JDBC 接收器連接配接器的幫助下,我們會将豐富的訂單表推送到 PostgreSQL 資料庫中。

CREATE SINK CONNECTOR `postgres-sink` WITH(

    "connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector',

    "tasks.max"= '1',

    "dialect.name"= 'PostgreSqlDatabaseDialect',

    "table.name.format"= 'ENRICHED_ORDER',

    "topics"= 'SA_ENRICHED_ORDER',

    "connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw',

    "auto.create"= 'true',

    "insert.mode"= 'upsert',

    "pk.fields"= 'ORDER_NUMBER',

    "pk.mode"= 'record_key',

    "key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter',

    "key.converter.schemas.enable" = 'false',

    "value.converter"= 'io.confluent.connect.avro.AvroConverter',

    "value.converter.schemas.enable" = 'true',

    "value.converter.schema.registry.url"= 'http://schema-registry:8081'

);