1.主题
基于flink,kafka,hive搭建实时数仓(flink-sql版本)
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5COhJDZ2YTYkNGZldzN2IDZxUGMyIWOhF2NlZDMyUGZ48CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
点击图片链接,从零开始学习大数据
2.适用读者对象
本文适用于hadoop大数据Flink进阶学员
3.基础环境信息
软件 | 版本 | 部署路径 |
haoop | 2.6.0-cdh5.14.4 | cdh默认安装 |
flink | flink-1.11.1 | /home/opt/flink/flink-1.11.1 |
kafka | kafka_2.11-2.2.1-kafka-4.1.0 | cdh默认安装 |
4.主题描述
随着大数据业务实时性要求的不断提高,实时的业务越来越多,事件化的数据源也越来越多,实时处理从次要部分变成了主要部分,传统的离线数仓很难满足数据实时需求,于是,实时数仓概念应运而生。
5.实战演练
5.1 数据流图
5.2 添加依赖包
把以下所有依赖包放到${FLINK_HOME}/lib目录(flink安装目录下的lib目录下)
flink-connector-hive_2.11-1.11.1.jar flink-connector-kafka_2.12-1.11.1.jar flink-connector-kafka-base_2.12-1.11.1.jar flink-csv-1.11.1.jar flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar hadoop-mapreduce-client-common-2.6.0-cdh5.14.4.jar hadoop-mapreduce-client-core-2.6.0-cdh5.14.4.jar hive-exec-1.1.0-cdh5.14.4.jar hive-metastore-1.1.0-cdh5.14.4.jar kafka_2.11-2.1.0.jar kafka-clients-2.1.0.jar |
5.3 修改配置文件${FLINK_HOME}/conf/flink-conf.yaml
#只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,才会触发提交hive分区操作,所以,我们需要合理的去配置 Checkpointclassloader.resolve-order: parent-first#设置任务使用的时间属性是eventtimepipeline.time-characteristic: EventTime#设置checkpoint的时间间隔execution.checkpointing.interval: 30000#确保检查点之间的间隔execution.checkpointing.min-pause: 10000#设置checkpoint的超时时间execution.checkpointing.timeout: 60000#设置任务取消后保留hdfs上的checkpoint文件execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION#设置checkpointing操作依赖包flink.execution.packages: org.apache.flink:flink-connector-kafka_2.12:1.11.1,org.apache.flink:flink-connector-kafka-base_2.12:1.11.1
5.4 配置并启动flink yarn-session
5.4.1 配置${FLINK_HOME}/bin/yarn-session.sh
#增加HADOOP_CLASSPATH变量export HADOOP_CLASSPATH=`hadoop classpath`#修改CC_CLASSPATH变量CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS:$HADOOP_CLASSPATH`
5.4.2 启动yarn-session模式
cd ${FLINK_HOME}/bin./yarn-session.sh
5.5 配置flink sql-client
5.5.1 添加配置${FLINK_HOME}/bin/sql-client.sh
export HADOOP_CLASSPATH=`hadoop classpath`HADOOP_CONF_DIR=/etc/hadoop/conf
5.5.2 配置${FLINK_HOME}/conf/sql-client-defaults.yaml
catalogs: - name: myhivetype: hive hive-conf-dir: /etc/hive/confdefault-database: defaultexecution: current-catalog: myhive current-database: default
5.5.3 启动sql-client.sh (Flink SQL命令行)
cd ${FLINK_HOME}/bin/./sql-client.sh embedded
5.6 启动应用
5.6.1 在Flink SQL命令行模式下执行以下代码
SET table.sql-dialect=hive;drop table hive_table;CREATE TABLE hive_table (`user` STRING,url STRING) PARTITIONED BY (dt STRING, hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');SET table.sql-dialect=default;drop table kafka_table;CREATE TABLE kafka_table (`user` STRING,`cTime` TIMESTAMP(3),`url` STRING, WATERMARK FOR cTime AS cTime - INTERVAL '5' SECOND) WITH ('connector'='kafka','topic'='FlinkDynamicTableKafkaDemo','properties.bootstrap.servers'='bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092','properties.group.id'='FlinkDynamicTableKafkaDemoGroup','scan.startup.mode'='latest-offset','format'='csv');SET table.sql-dialect=hive;INSERT INTO TABLE hive_table SELECT `user`, url, DATE_FORMAT(cTime, 'yyyy-MM-dd') as dt, DATE_FORMAT(cTime, 'HH') as hr, DATE_FORMAT(cTime, 'mm') as mi FROM kafka_table;
参数解析:
partition.time-extractor.timestamp-pattern :分区时间抽取器,与 DDL 中的分区字段保持一致;sink.partition-commit.trigger :分区触发器类型,可选 process-time 或partition-time。process-time:不需要上面的参数,也不需要水印,当当前时间大于分区创建时间 +sink.partition-commit.delay 中定义的时间,提交分区;partition-time:需要 Source 表中定义 watermark,当 watermark > 提取到的分区时间 +sink.partition-commit.delay 中定义的时间,提交分区;sink.partition-commit.delay :相当于延时时间;sink.partition-commit.policy.kind :怎么提交,一般提交成功之后,需要通知 metastore,这样 Hive 才能读到你最新分区的数据;如果需要合并小文件,也可以自定义 Class,通过实现 PartitionCommitPolicy 接口。
5.7 验证
5.7.1 启动kafka生产者往topic FlinkDynamicTableKafkaDemo 发送以下数据
cd /opt/cloudera/parcels/KAFKAbin/kafka-console-producer --broker-list bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092 --topic FlinkDynamicTableKafkaDemo
Mary,2018-12-17 12:00:00,./home Bob,2018-12-17 12:00:00,./cart Mary,2018-12-17 12:02:00,./prod?id=1 Mary,2018-12-17 12:55:00,./prod?id=4 Bob,2018-12-17 13:01:00,./prod?id=5 Liz,2018-12-17 13:30:00,./home Liz,2018-12-17 13:59:00,./prod?id=7 Mary,2018-12-17 14:00:00,./cart Liz,2018-12-17 14:02:00,./home Bob,2018-12-17 14:30:00,./prod?id=3 Bob,2018-12-17 14:40:00,./home Mary,2018-12-18 12:00:00,./home Bob,2018-12-18 12:00:00,./cart Mary,2018-12-18 12:02:00,./prod?id=1 Mary,2018-12-18 12:55:00,./prod?id=4 Bob,2018-12-18 13:01:00,./prod?id=5 Liz,2018-12-18 13:30:00,./home Liz,2018-12-18 13:59:00,./prod?id=7 Mary,2018-12-18 14:00:00,./cart Liz,2018-12-18 14:02:00,./home Bob,2018-12-18 14:30:00,./prod?id=3 Bob,2018-12-18 14:40:00,./home |
5.7.2 去喝杯茶,等待一分钟,启动hive客户端验证结果
beeline !connect jdbc:hive2://bigdata5.zhenglihan.com:10000 hive hive org.apache.hive.jdbc.HiveDriverselect * from hive_table;
6.总结:
本文集成了flink、kafka和hive三种组件,以flink-sql方式演示了数据实时从kafka到hive的过程,读者应掌握flink写入数据到hive以及提交hive分区的过程及其意义,才能真正掌握hive实时数仓的核心原理。
更为深度的讲解请扫描底部二维码关注公众号,关注后续博文,一起学习hadoop大数据!