作者:吳雲濤,騰訊 CSIG 進階工程師
導語 | 最近梳理了一下如何用 Flink 來實作實時的 UV、PV 名額的統計,并和公司内微視部門的同僚交流。然後針對該場景做了簡化,并發現使用 Flink SQL 來 實作這些名額的統計會更加便捷。
一、解決方案描述
1.1 概述
本方案結合本地自建 Kafka 叢集、騰訊雲流計算 Oceanus(Flink)、雲資料庫 Redis 對部落格、購物等網站 UV、PV 名額進行實時可視化分析。分析名額包含網站的獨立訪客數量(UV )、産品的點選量(PV)、轉化率(轉化率 = 成交次數 / 點選量)等。
相關概念介紹:UV(Unique Visitor):獨立訪客數量。通路您網站的一台用戶端為一個訪客,如使用者對同一頁面通路了 5 次,那麼該頁面的 UV 隻加 1,因為 UV 統計的是去重後的使用者數而不是通路次數。PV(Page View):點選量或頁面浏覽量。如使用者對同一頁面通路了 5 次,那麼該頁面的 PV 會加 5。
1.2 方案架構及優勢
根據以上實時名額統計場景,設計了如下架構圖:
涉及産品清單:
- 本地資料中心(IDC)的自建 Kafka 叢集
- 私有網絡 VPC
- 專線接入/雲聯網/VPN連接配接/對等連接配接
- 流計算 Oceanus (Flink)
- 雲資料庫 Redis
二、前置準備
購買所需的騰訊雲資源,并打通網絡。自建的 Kafka 叢集需根據叢集所在區域需采用 VPN 連接配接、專線連接配接或對等連接配接的方式來實作網絡互通互聯。
2.1 建立私有網絡 VPC
私有網絡(VPC)是一塊在騰訊雲上自定義的邏輯隔離網絡空間,在建構 Oceanus 叢集、Redis 元件等服務時選擇的網絡建議選擇同一個 VPC,網絡才能互通。否則需要使用對等連接配接、NAT 網關、VPN 等方式打通網絡。私有網絡建立步驟請參考幫助文檔(https://cloud.tencent.com/document/product/215/36515)。
2.2 建立 Oceanus 叢集
流計算 Oceanus 是大資料産品生态體系的實時化分析利器,是基于 Apache Flink 建構的具備一站開發、無縫連接配接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大資料分析平台。流計算 Oceanus 以實作企業資料價值最大化為目标,加速企業實時化數字化的建設程序。
在 Oceanus 控制台的【叢集管理】->【建立叢集】頁面建立叢集,選擇地域、可用區、VPC、日志、存儲,設定初始密碼等。VPC 及子網使用剛剛建立好的網絡。建立完後 Flink 的叢集如下:
2.3 建立 Redis 叢集
在Redis 控制台(https://console.cloud.tencent.com/redis#/)的【建立執行個體】頁面建立叢集,選擇與其他元件同一地域,同區域的同一私有網絡 VPC,這裡還選擇同一子網。
2.4 配置自建 Kafka 叢集
2.4.1 修改自建 Kafka 叢集配置
自建 Kafka 叢集連接配接時 bootstrap-servers 參數常常使用 hostname 而不是 ip 來連接配接。但用自建 Kafka 叢集連接配接騰訊雲上的 Oceanus 叢集為全托管叢集, Oceanus 叢集的節點上無法解析自建叢集的 hostname 與 ip 的映射關系,是以需要改監聽器位址由 hostname 為 ip 位址連接配接的形式。
将 config/server.properties 配置檔案中 advertised.listeners 參數配置為IP位址。示例:
# 0.10.X及以後版本
advertised.listeners=PLAINTEXT://10.1.0.10:9092
# 0.10.X之前版本
advertised.host.name=PLAINTEXT://10.1.0.10:9092
複制
修改後重新開機 Kafka 叢集。
! 若在雲上使用到自建的zookeeper位址,也需要将zk配置中的hostname修改IP位址形式。
2.4.2 模拟發送資料到topic
本案例使用topic為topic為 uvpv-demo。
1)Kafka 用戶端
進入自建 Kafka 叢集節點,啟動 Kafka 用戶端,模拟發送資料。
./bin/kafka-console-producer.sh --broker-list 10.1.0.10:9092 --topic uvpv-demo
>{"record_type":0, "user_id": 2, "client_ip": "100.0.0.2", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":0, "user_id": 3, "client_ip": "100.0.0.3", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":1, "user_id": 2, "client_ip": "100.0.0.1", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
複制
2)使用腳本發送
腳本一:Java 代碼參考:https://cloud.tencent.com/document/product/597/54834
腳本二:Python 腳本。參考之前案例中 python 腳本進行适當修改即可:
《視訊直播:實時資料可視化分析》
2.5 打通自建 IDC 叢集到騰訊雲網絡通信
自建 Kafka 叢集聯通騰訊雲網絡,可通過以下前 3 種方式打通自建 IDC 到騰訊雲的網絡通信。
-
專線接入
https://cloud.tencent.com/document/product/216
适用于本地資料中心 IDC 與騰訊雲網絡打通。
-
雲聯網
https://cloud.tencent.com/document/product/877
适用于本地資料中心 IDC 與騰訊雲網絡打通,也可用于雲上不同地域間私有網絡 VPC 打通。
-
VPN連接配接
https://cloud.tencent.com/document/product/554
适用于本地資料中心 IDC 與騰訊雲網絡打通。
-
對等連接配接+ NAT網關
對等連接配接:https://cloud.tencent.com/document/product/553
NAT網關:https://cloud.tencent.com/document/product/552
适合雲上不同地域間私有網絡 VPC 打通,不适合本地 IDC 到騰訊雲網絡。
本方案中使用了 VPN 連接配接的方式,實作本地 IDC 和雲上網絡的通信。參考連結:
建立 VPC 到 IDC 的連接配接(路由表)(https://cloud.tencent.com/document/product/554/52854)
根據方案繪制了下面的網絡架構圖:
三 方案實作
3.1 業務目标
利用流計算 Oceanus 實作網站 UV、PV、轉化率名額的實時統計,這裡隻列取以下3種統計名額:
- 網站的獨立訪客數量 UV。Oceanus 處理後在 Redis 中通過 set 類型存儲獨立訪客數量,同時也達到了對同一訪客的資料去重的目的。
- 網站商品頁面的點選量 PV。Oceanus 處理後在 Redis 中使用 list 類型存儲頁面點選量。
- 轉化率(轉化率 = 成交次數 / 點選量)。Oceanus 處理後在 Redis 中用 String 存儲即可。
3.2 源資料格式
Kafka topic:uvpv-demo(浏覽記錄)
字段 | 類型 | 含義 |
---|---|---|
record_type | int | 客戶号 |
user_id | varchar | 客戶ip位址 |
client_ip | varchar | 房間号 |
product_id | Int | 進入房間時間 |
create_time | timestamp | 建立時間 |
Kafka 内部采用 json 格式存儲,資料格式如下:
# 浏覽記錄
{
"record_type":0, # 0 表示浏覽記錄
"user_id": 6,
"client_ip": "100.0.0.6",
"product_id": 101,
"create_time": "2021-09-06 16:00:00"
}
# 購買記錄
{
"record_type":1, # 1 表示購買記錄
"user_id": 6,
"client_ip": "100.0.0.8",
"product_id": 101,
"create_time": "2021-09-08 18:00:00"
}
複制
3.3 編寫 Flink SQL 作業
示例中實作了 UV、PV 和轉化率3個名額的擷取邏輯,并寫入 Sink 端。
1、定義 Source
CREATE TABLE `input_web_record` (
`record_type` INT,
`user_id` INT,
`client_ip` VARCHAR,
`product_id` INT,
`create_time` TIMESTAMP,
`times` AS create_time,
WATERMARK FOR times AS times - INTERVAL '10' MINUTE
) WITH (
'connector' = 'kafka', -- 可選 'kafka','kafka-0.11'. 注意選擇對應的内置 Connector
'topic' = 'uvpv-demo',
'scan.startup.mode' = 'earliest-offset',
--'properties.bootstrap.servers' = '82.157.27.147:9092',
'properties.bootstrap.servers' = '10.1.0.10:9092',
'properties.group.id' = 'WebRecordGroup', -- 必選參數, 一定要指定 Group ID
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- 忽略 JSON 結構解析異常
'json.fail-on-missing-field' = 'false' -- 如果設定為 true, 則遇到缺失字段會報錯 設定為 false 則缺失字段設定為 null
);
複制
2、定義 Sink
-- UV sink
CREATE TABLE `output_uv` (
`userids` STRING,
`user_id` STRING
) WITH (
'connector' = 'redis',
'command' = 'sadd', -- 使用集合儲存uv(支援指令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis連接配接位址,叢集模式多個節點使用'',''分隔。
-- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必須設定。
'password' = 'yourpassword'
);
-- PV sink
CREATE TABLE `output_pv` (
`pagevisits` STRING,
`product_id` STRING,
`hour_count` BIGINT
) WITH (
'connector' = 'redis',
'command' = 'lpush', -- 使用清單儲存pv(支援指令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis連接配接位址,叢集模式多個節點使用'',''分隔。
-- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必須設定。
'password' = 'yourpassword'
);
-- 轉化率 sink
CREATE TABLE `output_conversion_rate` (
`conversion_rate` STRING,
`rate` STRING
) WITH (
'connector' = 'redis',
'command' = 'set', -- 使用清單儲存pv(支援指令:set、lpush、sadd、hset、zadd)
'nodes' = '192.28.28.217:6379', -- redis連接配接位址,叢集模式多個節點使用'',''分隔。
-- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必須設定。
'password' = 'yourpassword'
);
複制
3、業務邏輯
-- 加工得到 UV 名額,統計所有時間内的 UV
INSERT INTO output_uv
SELECT
'userids' AS `userids`,
CAST(user_id AS string) AS user_id
FROM input_web_record ;
-- 加工并得到 PV 名額,統計每 10 分鐘内的 PV
INSERT INTO output_pv
SELECT
'pagevisits' AS pagevisits,
CAST(product_id AS string) AS product_id,
SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
user_id;
-- 加工并得到轉化率名額,統計每 10 分鐘内的轉化率
INSERT INTO output_conversion_rate
SELECT
'conversion_rate' AS conversion_rate,
CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id;
複制
3.4 結果驗證
通常情況,會通過 Web 網站來展示統計到的 UV、PV 名額,這裡為了簡單直接在Redis 控制台(https://console.cloud.tencent.com/redis#/)登入進行查詢:
userids: 存儲 UV
pagevisits: 存儲 PV
conversion_rate: 存儲轉化率,即購買商品次數/總頁面點選量。
四 總結
通過自建 Kafka 叢集采集資料,在流計算 Oceanus (Flink) 中實時進行字段累加、視窗聚合等操作,将加工後的資料存儲在雲資料庫Redis,統計到實時重新整理的 UV、PV 等名額。這個方案在 Kafka json 格式設計時為了簡便易懂做了簡化處理,将浏覽記錄和産品購買記錄都放在了同一個 topic 中,重點通過打通自建 IDC 和騰訊雲産品間的網絡來展現整個方案。針對超大規模的 UV 去重,微視的同僚采用了 Redis hyperloglog 方式來實作 UV 統計。相比直接使用 set 類型方式有極小的記憶體空間占用的優點,詳情見連結:https://cloud.tencent.com/developer/article/1889162。
流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓
點選文末「閱讀原文」,了解騰訊雲流計算 Oceanus 更多資訊~
騰訊雲大資料
長按二維碼
關注我們