以前要实时采集MySQL的表数据 要用canal 配置非常麻烦 有了flink cdc 只需要写几个SQL就能把MySQL的实时数据变动采集,并做加工
先在MySQL中授权
CREATE USER 'cdc'@'%' IDENTIFIED BY 'cdc';
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc'@'%' IDENTIFIED BY 'cdc';
cdc这个用户可以做replicate
定义源头表,是从mysql binlog来.
CREATE TEMPORARY TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.66.1.49',
'port' = '3306',
'username' = 'cdc',
'password' = 'cdc',
'database-name' = 'core',
'table-name' = 'orders');
定义目的表,这儿我是直接打印.
CREATE TEMPORARY TABLE print_table (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN
) WITH (
'connector' = 'print'
);
直接插入
insert into print_table SELECT * FROM orders;
#flink# #mysql#