天天看点

python连接mysql_Flink 使用python连接mysql Flink 使用python连接mysql

Flink 使用python连接mysql

mysql连接配置依赖包

1.下载flink-connector-jdbc_2.11 jar包 网络路径如下

2.将下载jar包放到/flink-1.11.2/lib下

flink-connector-jdbc_2.11:flink-connector-jdbc_2.11-1.11.2.jar

msql的驱动jar包: mysql-connector-java-5.1.47.jar

3.将下载jar包放到/python/site-packages/pyflink/lib下

cp flink-connector-jdbc_2.11-1.11.2.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-MIKSrtht/lib/python3.7/site-packages/pyflink/lib

cp mysql-connector-java-5.1.47.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-MIKSrtht/lib/python3.7/site-packages/pyflink/lib

我们来看个flink连接mysql 的例子

1.准备数据

CREATE TABLE `my_mysql_Sink` (

`word` varchar(50) NOT NULL,

`count` bigint(20) DEFAULT NULL,

`ts` timestamp NULL DEFAULT NULL,

PRIMARY KEY (`word`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO `my_mysql_Sink` VALUES ('ceshi',30,'2020-11-02 01:43:33'),('hed',50,'2020-11-02 01:43:33'),('hellp',30,'2020-11-02 01:43:33');

2.pyflink脚本如下

from pyflink.table import TableConfig,DataTypes,BatchTableEnvironment,TableEnvironment,EnvironmentSettings

env_seting=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()

t_env=BatchTableEnvironment.create( environment_settings=env_seting )

mysql_sink_ddl = """create table sink (`word` VARCHAR,`count` BIGINT,ts timestamp

) with (

'connector.type' = 'jdbc',

'connector.url' = 'jdbc:mysql://10.106.216.72:3306/test',

'connector.table' = 'my_mysql_Sink',

'connector.username' = 'goodhope',

'connector.password' = '123456',

'connector.write.flush.interval' = '1s')"""

t_env.execute_sql(mysql_sink_ddl)

t_env.execute_sql("select * from sink").print()

#t_env.execute("python_job")

3.运行结果如下:

python连接mysql_Flink 使用python连接mysql Flink 使用python连接mysql