天天看点

flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

1.主题

     基于flink,kafka,hive搭建实时数仓(flink-sql版本)

flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

点击图片链接,从零开始学习大数据

2.适用读者对象

    本文适用于hadoop大数据Flink进阶学员

3.基础环境信息

软件 版本 部署路径
haoop 2.6.0-cdh5.14.4 cdh默认安装
flink flink-1.11.1 /home/opt/flink/flink-1.11.1
kafka kafka_2.11-2.2.1-kafka-4.1.0 cdh默认安装

4.主题描述

    随着大数据业务实时性要求的不断提高,实时的业务越来越多,事件化的数据源也越来越多,实时处理从次要部分变成了主要部分,传统的离线数仓很难满足数据实时需求,于是,实时数仓概念应运而生。

5.实战演练

    5.1 数据流图

flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

    5.2 添加依赖包

        把以下所有依赖包放到${FLINK_HOME}/lib目录(flink安装目录下的lib目录下)

flink-connector-hive_2.11-1.11.1.jar

flink-connector-kafka_2.12-1.11.1.jar

flink-connector-kafka-base_2.12-1.11.1.jar

flink-csv-1.11.1.jar

flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar

hadoop-mapreduce-client-common-2.6.0-cdh5.14.4.jar

hadoop-mapreduce-client-core-2.6.0-cdh5.14.4.jar

hive-exec-1.1.0-cdh5.14.4.jar

hive-metastore-1.1.0-cdh5.14.4.jar

kafka_2.11-2.1.0.jar

kafka-clients-2.1.0.jar

    5.3 修改配置文件${FLINK_HOME}/conf/flink-conf.yaml

#只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,才会触发提交hive分区操作,所以,我们需要合理的去配置 Checkpointclassloader.resolve-order: parent-first#设置任务使用的时间属性是eventtimepipeline.time-characteristic: EventTime#设置checkpoint的时间间隔execution.checkpointing.interval: 30000#确保检查点之间的间隔execution.checkpointing.min-pause: 10000#设置checkpoint的超时时间execution.checkpointing.timeout: 60000#设置任务取消后保留hdfs上的checkpoint文件execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION#设置checkpointing操作依赖包flink.execution.packages: org.apache.flink:flink-connector-kafka_2.12:1.11.1,org.apache.flink:flink-connector-kafka-base_2.12:1.11.1
           

    5.4 配置并启动flink yarn-session

        5.4.1 配置${FLINK_HOME}/bin/yarn-session.sh

#增加HADOOP_CLASSPATH变量export HADOOP_CLASSPATH=`hadoop classpath`#修改CC_CLASSPATH变量CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS:$HADOOP_CLASSPATH`
           

        5.4.2 启动yarn-session模式

cd ${FLINK_HOME}/bin./yarn-session.sh
           
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

    5.5 配置flink sql-client

        5.5.1 添加配置${FLINK_HOME}/bin/sql-client.sh 

export HADOOP_CLASSPATH=`hadoop classpath`HADOOP_CONF_DIR=/etc/hadoop/conf
           

        5.5.2 配置${FLINK_HOME}/conf/sql-client-defaults.yaml

catalogs:  - name: myhivetype: hive    hive-conf-dir: /etc/hive/confdefault-database: defaultexecution:  current-catalog: myhive  current-database: default
           

        5.5.3 启动sql-client.sh (Flink SQL命令行)

cd ${FLINK_HOME}/bin/./sql-client.sh embedded
           
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

    5.6 启动应用

        5.6.1 在Flink SQL命令行模式下执行以下代码

SET table.sql-dialect=hive;drop table hive_table;CREATE TABLE hive_table (`user` STRING,url STRING) PARTITIONED BY (dt STRING, hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');SET table.sql-dialect=default;drop table kafka_table;CREATE TABLE kafka_table (`user` STRING,`cTime` TIMESTAMP(3),`url` STRING,   WATERMARK FOR cTime AS cTime - INTERVAL '5' SECOND) WITH ('connector'='kafka','topic'='FlinkDynamicTableKafkaDemo','properties.bootstrap.servers'='bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092','properties.group.id'='FlinkDynamicTableKafkaDemoGroup','scan.startup.mode'='latest-offset','format'='csv');SET table.sql-dialect=hive;INSERT INTO TABLE hive_table SELECT `user`, url, DATE_FORMAT(cTime, 'yyyy-MM-dd') as dt, DATE_FORMAT(cTime, 'HH') as hr, DATE_FORMAT(cTime, 'mm') as mi  FROM kafka_table;
           

        参数解析:

partition.time-extractor.timestamp-pattern :分区时间抽取器,与 DDL 中的分区字段保持一致;sink.partition-commit.trigger :分区触发器类型,可选 process-time 或partition-time。process-time:不需要上面的参数,也不需要水印,当当前时间大于分区创建时间 +sink.partition-commit.delay 中定义的时间,提交分区;partition-time:需要 Source 表中定义 watermark,当 watermark > 提取到的分区时间 +sink.partition-commit.delay 中定义的时间,提交分区;sink.partition-commit.delay :相当于延时时间;sink.partition-commit.policy.kind :怎么提交,一般提交成功之后,需要通知 metastore,这样 Hive 才能读到你最新分区的数据;如果需要合并小文件,也可以自定义 Class,通过实现 PartitionCommitPolicy 接口。
           
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

    5.7 验证

        5.7.1 启动kafka生产者往topic FlinkDynamicTableKafkaDemo 发送以下数据

cd /opt/cloudera/parcels/KAFKAbin/kafka-console-producer --broker-list bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092 --topic FlinkDynamicTableKafkaDemo
           
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

Mary,2018-12-17 12:00:00,./home

Bob,2018-12-17 12:00:00,./cart

Mary,2018-12-17 12:02:00,./prod?id=1

Mary,2018-12-17 12:55:00,./prod?id=4

Bob,2018-12-17 13:01:00,./prod?id=5

Liz,2018-12-17 13:30:00,./home

Liz,2018-12-17 13:59:00,./prod?id=7

Mary,2018-12-17 14:00:00,./cart

Liz,2018-12-17 14:02:00,./home

Bob,2018-12-17 14:30:00,./prod?id=3

Bob,2018-12-17 14:40:00,./home

Mary,2018-12-18 12:00:00,./home

Bob,2018-12-18 12:00:00,./cart

Mary,2018-12-18 12:02:00,./prod?id=1

Mary,2018-12-18 12:55:00,./prod?id=4

Bob,2018-12-18 13:01:00,./prod?id=5

Liz,2018-12-18 13:30:00,./home

Liz,2018-12-18 13:59:00,./prod?id=7

Mary,2018-12-18 14:00:00,./cart

Liz,2018-12-18 14:02:00,./home

Bob,2018-12-18 14:30:00,./prod?id=3

Bob,2018-12-18 14:40:00,./home

        5.7.2 去喝杯茶,等待一分钟,启动hive客户端验证结果

beeline !connect jdbc:hive2://bigdata5.zhenglihan.com:10000 hive hive org.apache.hive.jdbc.HiveDriverselect * from hive_table;
           
flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

6.总结:

    本文集成了flink、kafka和hive三种组件,以flink-sql方式演示了数据实时从kafka到hive的过程,读者应掌握flink写入数据到hive以及提交hive分区的过程及其意义,才能真正掌握hive实时数仓的核心原理。

    更为深度的讲解请扫描底部二维码关注公众号,关注后续博文,一起学习hadoop大数据!

flink source 同步_hadoop大数据Flink专题篇搭建基于Flink的实时数仓(一)

继续阅读