天天看点

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

可能会有一些截图中会有错误提示,是因为本地的包一直包下载有问题,截完图已经下载好了。

创建包结构

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建一个基础信息类

所有输出到mysql数据库中的自定义MR任务的自定义key均需要实现该抽象类

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

代码内容,主要是实现org.apache.hadoop.io.WritableComparable类,其它不需要写

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建平台信息类

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建全局配置常数类

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

后面慢慢添加内容

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

继续完成PlatformInfoKey,添加构造函数

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

常数类添加信息

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

根据给定的参数值,构建多个不同维度的平台维度对象

完成参数验证

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

然后构建平台信息

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加write和readFields方法

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建compareTo方法

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加get/set、hashCode、toString、equals方法,自动生成就可以

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建信息业务接口和实现类

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

完成接口

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加实现类

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建一个jdbc的管理器

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

常数类添加数据库配置

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

JdbcManager添加jdbc连接

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

关闭数据库连接

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

继续完成BaseInfoConverterImpl,添加默认构造函数

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加一个缓存数据类型

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

继续完成BaseInfoConverterImpl,添加方法创建cache key

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建获取ID的方法

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

继续编写

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加下面的代码

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

补全里面的sql语句

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加两个参数

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

完成getConnection()

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

完成executeSql

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

设置参数

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加关闭方法

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建一个UDF,模拟数据库平台数据记录

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加内容

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

添加evaluate方法

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建一个测试类

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建数据库

create database reportDROP TABLE IF EXISTS `platform`;CREATE TABLE `platform` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',`platform_name` varchar(45) DEFAULT NULL COMMENT '平台名称',`platform_version` varchar(10) DEFAULT NULL COMMENT '平台版本',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='平台信息表';
           
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

Sql语句

String querySql = "SELECT `id` FROM `platform` WHERE `platform_name` = ? AND `platform_version` = ? order by `id`";String insertSql = "INSERT INTO `platform`(`platform_name`, `platform_version`) VALUES(?, ?)";
           
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

修改配置:

连接本地数据库

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

运行结果

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

数据库中

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

本地测试成功开始打包项目放进集群进行测试

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

把打包好的jar包改名为hive_udf3.jar放到集群里

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

进入MySQL中创建report数据库

create database report;

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建表platform

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

进入hive

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

把jar包添加到hive当中

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

可以用list jar 和 delete jar 分别显示jar和删除jar

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

创建临时函数

create temporary function convert_bl as 'com.xlgl.wzy.hive.udf.PlatformConverterUDF';
           
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

我们可以用show_functions查看函数

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

我们查看已有表的数据

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
select ename,convert_bl(ename,0) lower_name from emp;
           
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

这里报了一个异常

这个问题可能是出在maven给的版本和hive的版本不一样导致可以替换成自己的版本

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

然后重新打成jar包运行一下

再查询一下

select ename,convert_bl(ename,0) lower_name from emp;
           
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)
flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

可以在mysql中查看数据

flink sql udf jar包_编写Hive的UDF(查询平台数据同时向mysql添加数据)

继续阅读