Flink 使ç¨pythonè¿æ¥mysql
mysqlè¿æ¥é ç½®ä¾èµå
1.ä¸è½½flink-connector-jdbc_2.11 jarå ç½ç»è·¯å¾å¦ä¸
2.å°ä¸è½½jarå æ¾å°/flink-1.11.2/libä¸
flink-connector-jdbc_2.11ï¼flink-connector-jdbc_2.11-1.11.2.jar
msqlç驱å¨jarå ï¼ mysql-connector-java-5.1.47.jar
3.å°ä¸è½½jarå æ¾å°/python/site-packages/pyflink/libä¸
cp flink-connector-jdbc_2.11-1.11.2.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-MIKSrtht/lib/python3.7/site-packages/pyflink/lib
cp mysql-connector-java-5.1.47.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-MIKSrtht/lib/python3.7/site-packages/pyflink/lib
æ们æ¥ç个flinkè¿æ¥mysql çä¾å
1.åå¤æ°æ®
CREATE TABLE `my_mysql_Sink` (
`word` varchar(50) NOT NULL,
`count` bigint(20) DEFAULT NULL,
`ts` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `my_mysql_Sink` VALUES ('ceshi',30,'2020-11-02 01:43:33'),('hed',50,'2020-11-02 01:43:33'),('hellp',30,'2020-11-02 01:43:33');
2.pyflinkèæ¬å¦ä¸
from pyflink.table import TableConfig,DataTypes,BatchTableEnvironment,TableEnvironment,EnvironmentSettings
env_seting=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env=BatchTableEnvironment.create( environment_settings=env_seting )
mysql_sink_ddl = """create table sink (`word` VARCHAR,`count` BIGINT,ts timestamp
) with (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.106.216.72:3306/test',
'connector.table' = 'my_mysql_Sink',
'connector.username' = 'goodhope',
'connector.password' = '123456',
'connector.write.flush.interval' = '1s')"""
t_env.execute_sql(mysql_sink_ddl)
t_env.execute_sql("select * from sink").print()
#t_env.execute("python_job")
3.è¿è¡ç»æå¦ä¸ï¼