ETL 代表提取-轉換-加載,是将資料從一個源系統移動到另一個源系統的過程。下面将描述如何使用 Apache Kafka、Kafka Connect、Debezium 和 ksqlDB 建構實時流 ETL 流程。
建構業務應用程式時,會先根據應用程式的功能需求來設計資料模型。為了重塑我們的資料,需要将其移動到另一個資料庫。
在行業中,人們大多從源系統中批量提取資料,在合理的時間段内,主要是每天一次,但也可以是每小時一次,也可以是兩三天一次。保持較短的周期可能會導緻源系統的資源使用率更高,目标系統的中斷頻繁;但是,保持較長時間可能會導緻目标系統出現最新問題。是以,我們需要一些對源系統性能造成最小影響的東西,并且可以在更短的時間内或實時更新目标系統。
Debezium 不使用 SQL 提取資料。它使用資料庫日志檔案來跟蹤資料庫中的更改,是以它對源系統的影響最小。
提取資料後,需要 Kafka Connect 将其流式傳輸到 Apache Kafka 中,以便根據需要使用它并對其進行重塑。可以使用 ksqlDB 來以目标系統所需的方式重塑原始資料。下面考慮一個簡單的訂購系統資料庫,其中有一個客戶表、一個産品表和一個訂單表,如下所示。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLygzNhVDZ4MWZjNjNjZDN5IGZhRTOhJDNlBjN1IWO4Y2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
現在需要送出一份關于訂單的報告,可以看到購買者的電子郵件,并且在同一行中顯示了産品的名稱。如下圖:
客戶列将包含位于客戶表的電子郵件字段中的客戶的電子郵件,而産品列将包含位于産品表的名稱字段中的産品名稱。
首先,建立一個源連接配接器來從源資料庫中提取資料。源資料庫是 MySQL 資料庫,是以使用 Debezium MySQL Source Connector,如下所示:
CREATE SOURCE CONNECTOR `mysql-connector` WITH(
"connector.class"= 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max"= '1',
"database.hostname"= 'mysql',
"database.port"= '3306',
"database.user"= 'root',
"database.password"= 'debezium',
"database.server.id"= '184054',
"database.server.name"= 'dbserver1',
"database.whitelist"= 'inventory',
"table.whitelist"= 'inventory.customers,inventory.products,inventory.orders',
"database.history.kafka.bootstrap.servers"= 'kafka:9092',
"database.history.kafka.topic"= 'schema-changes.inventory',
"transforms"= 'unwrap',
"transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState',
"key.converter"= 'org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable"= 'false',
"value.converter"= 'org.apache.kafka.connect.json.JsonConverter',
"value.converter.schemas.enable"= 'false');
現在擁有了來自源系統的表、客戶、産品和訂單的 Kafka 主題。
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
-----------------------------------------------------------------
dbserver1 | 1 | 1
dbserver1.inventory.customers | 1 | 1
dbserver1.inventory.orders | 1 | 1
dbserver1.inventory.products | 1 | 1
default_ksql_processing_log | 1 | 1
my_connect_configs | 1 | 1
my_connect_offsets | 25 | 1
my_connect_statuses | 5 | 1
schema-changes.inventory | 1 | 1
使用以下腳本,為訂單建立一個 ksqlDB 流,該流在訂單資料旁邊連接配接客戶和産品資料。
CREATE STREAM S_CUSTOMER (ID INT,
FIRST_NAME string,
LAST_NAME string,
EMAIL string)
WITH (KAFKA_TOPIC='dbserver1.inventory.customers',
VALUE_FORMAT='json');
CREATE TABLE T_CUSTOMER
AS
SELECT id,
latest_by_offset(first_name) as fist_name,
latest_by_offset(last_name) as last_name,
latest_by_offset(email) as email
FROM s_customer
GROUP BY id
EMIT CHANGES;
CREATE STREAM S_PRODUCT (ID INT,
NAME string,
description string,
weight DOUBLE)
WITH (KAFKA_TOPIC='dbserver1.inventory.products',
VALUE_FORMAT='json');
CREATE TABLE T_PRODUCT
AS
SELECT id,
latest_by_offset(name) as name,
latest_by_offset(description) as description,
latest_by_offset(weight) as weight
FROM s_product
GROUP BY id
EMIT CHANGES;
CREATE STREAM s_order (
order_number integer,
order_date timestamp,
purchaser integer,
quantity integer,
product_id integer)
WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json');
CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS
select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id
from s_order as o
left join t_product as p on o.product_id = p.id
left join t_customer as c on o.purchaser = c.id
partition by o.order_number
emit changes;
最後,在 JDBC 接收器連接配接器的幫助下,我們會将豐富的訂單表推送到 PostgreSQL 資料庫中。
CREATE SINK CONNECTOR `postgres-sink` WITH(
"connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector',
"tasks.max"= '1',
"dialect.name"= 'PostgreSqlDatabaseDialect',
"table.name.format"= 'ENRICHED_ORDER',
"topics"= 'SA_ENRICHED_ORDER',
"connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw',
"auto.create"= 'true',
"insert.mode"= 'upsert',
"pk.fields"= 'ORDER_NUMBER',
"pk.mode"= 'record_key',
"key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter',
"key.converter.schemas.enable" = 'false',
"value.converter"= 'io.confluent.connect.avro.AvroConverter',
"value.converter.schemas.enable" = 'true',
"value.converter.schema.registry.url"= 'http://schema-registry:8081'
);