天天看點

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

作者:吳雲濤,騰訊 CSIG 進階工程師

導語 | 最近梳理了一下如何用 Flink 來實作實時的 UV、PV 名額的統計,并和公司内微視部門的同僚交流。然後針對該場景做了簡化,并發現使用 Flink SQL 來 實作這些名額的統計會更加便捷。

一、解決方案描述

1.1 概述

本方案結合本地自建 Kafka 叢集、騰訊雲流計算 Oceanus(Flink)、雲資料庫 Redis 對部落格、購物等網站 UV、PV 名額進行實時可視化分析。分析名額包含網站的獨立訪客數量(UV )、産品的點選量(PV)、轉化率(轉化率 = 成交次數 / 點選量)等。

相關概念介紹:UV(Unique Visitor):獨立訪客數量。通路您網站的一台用戶端為一個訪客,如使用者對同一頁面通路了 5 次,那麼該頁面的 UV 隻加 1,因為 UV 統計的是去重後的使用者數而不是通路次數。PV(Page View):點選量或頁面浏覽量。如使用者對同一頁面通路了 5 次,那麼該頁面的 PV 會加 5。

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

1.2 方案架構及優勢

根據以上實時名額統計場景,設計了如下架構圖:

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

涉及産品清單:

  • 本地資料中心(IDC)的自建 Kafka 叢集
  • 私有網絡 VPC
  • 專線接入/雲聯網/VPN連接配接/對等連接配接
  • 流計算 Oceanus (Flink)
  • 雲資料庫 Redis

二、前置準備

購買所需的騰訊雲資源,并打通網絡。自建的 Kafka 叢集需根據叢集所在區域需采用 VPN 連接配接、專線連接配接或對等連接配接的方式來實作網絡互通互聯。

2.1 建立私有網絡 VPC

私有網絡(VPC)是一塊在騰訊雲上自定義的邏輯隔離網絡空間,在建構 Oceanus 叢集、Redis 元件等服務時選擇的網絡建議選擇同一個 VPC,網絡才能互通。否則需要使用對等連接配接、NAT 網關、VPN 等方式打通網絡。私有網絡建立步驟請參考幫助文檔(https://cloud.tencent.com/document/product/215/36515)。

2.2 建立 Oceanus 叢集

流計算 Oceanus 是大資料産品生态體系的實時化分析利器,是基于 Apache Flink 建構的具備一站開發、無縫連接配接、亞秒延時、低廉成本、安全穩定等特點的企業級實時大資料分析平台。流計算 Oceanus 以實作企業資料價值最大化為目标,加速企業實時化數字化的建設程序。

在 Oceanus 控制台的【叢集管理】->【建立叢集】頁面建立叢集,選擇地域、可用區、VPC、日志、存儲,設定初始密碼等。VPC 及子網使用剛剛建立好的網絡。建立完後 Flink 的叢集如下:

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

2.3 建立 Redis 叢集

在Redis 控制台(https://console.cloud.tencent.com/redis#/)的【建立執行個體】頁面建立叢集,選擇與其他元件同一地域,同區域的同一私有網絡 VPC,這裡還選擇同一子網。

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

2.4 配置自建 Kafka 叢集

2.4.1 修改自建 Kafka 叢集配置

自建 Kafka 叢集連接配接時 bootstrap-servers 參數常常使用 hostname 而不是 ip 來連接配接。但用自建 Kafka 叢集連接配接騰訊雲上的 Oceanus 叢集為全托管叢集, Oceanus 叢集的節點上無法解析自建叢集的 hostname 與 ip 的映射關系,是以需要改監聽器位址由 hostname 為 ip 位址連接配接的形式。

将 config/server.properties 配置檔案中 advertised.listeners 參數配置為IP位址。示例:

# 0.10.X及以後版本
advertised.listeners=PLAINTEXT://10.1.0.10:9092
# 0.10.X之前版本
advertised.host.name=PLAINTEXT://10.1.0.10:9092           

複制

修改後重新開機 Kafka 叢集。

! 若在雲上使用到自建的zookeeper位址,也需要将zk配置中的hostname修改IP位址形式。

2.4.2 模拟發送資料到topic

本案例使用topic為topic為 uvpv-demo。

1)Kafka 用戶端

進入自建 Kafka 叢集節點,啟動 Kafka 用戶端,模拟發送資料。

./bin/kafka-console-producer.sh --broker-list 10.1.0.10:9092 --topic uvpv-demo
>{"record_type":0, "user_id": 2, "client_ip": "100.0.0.2", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":0, "user_id": 3, "client_ip": "100.0.0.3", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
>{"record_type":1, "user_id": 2, "client_ip": "100.0.0.1", "product_id": 101, "create_time": "2021-09-08 16:20:00"}           

複制

2)使用腳本發送

腳本一:Java 代碼參考:https://cloud.tencent.com/document/product/597/54834

腳本二:Python 腳本。參考之前案例中 python 腳本進行适當修改即可:

《視訊直播:實時資料可視化分析》

2.5 打通自建 IDC 叢集到騰訊雲網絡通信

自建 Kafka 叢集聯通騰訊雲網絡,可通過以下前 3 種方式打通自建 IDC 到騰訊雲的網絡通信。

  • 專線接入

    https://cloud.tencent.com/document/product/216

    适用于本地資料中心 IDC 與騰訊雲網絡打通。

  • 雲聯網

    https://cloud.tencent.com/document/product/877

    适用于本地資料中心 IDC 與騰訊雲網絡打通,也可用于雲上不同地域間私有網絡 VPC 打通。

  • VPN連接配接

    https://cloud.tencent.com/document/product/554

    适用于本地資料中心 IDC 與騰訊雲網絡打通。

  • 對等連接配接+ NAT網關

    對等連接配接:https://cloud.tencent.com/document/product/553

    NAT網關:https://cloud.tencent.com/document/product/552

    适合雲上不同地域間私有網絡 VPC 打通,不适合本地 IDC 到騰訊雲網絡。

本方案中使用了 VPN 連接配接的方式,實作本地 IDC 和雲上網絡的通信。參考連結:

建立 VPC 到 IDC 的連接配接(路由表)(https://cloud.tencent.com/document/product/554/52854)

根據方案繪制了下面的網絡架構圖:

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

三 方案實作

3.1 業務目标

利用流計算 Oceanus 實作網站 UV、PV、轉化率名額的實時統計,這裡隻列取以下3種統計名額:

  • 網站的獨立訪客數量 UV。Oceanus 處理後在 Redis 中通過 set 類型存儲獨立訪客數量,同時也達到了對同一訪客的資料去重的目的。
  • 網站商品頁面的點選量 PV。Oceanus 處理後在 Redis 中使用 list 類型存儲頁面點選量。
  • 轉化率(轉化率 = 成交次數 / 點選量)。Oceanus 處理後在 Redis 中用 String 存儲即可。

3.2 源資料格式

Kafka topic:uvpv-demo(浏覽記錄)

字段 類型 含義
record_type int 客戶号
user_id varchar 客戶ip位址
client_ip varchar 房間号
product_id Int 進入房間時間
create_time timestamp 建立時間

Kafka 内部采用 json 格式存儲,資料格式如下:

# 浏覽記錄
{
 "record_type":0,  # 0 表示浏覽記錄
 "user_id": 6,
 "client_ip": "100.0.0.6",
 "product_id": 101,
 "create_time": "2021-09-06 16:00:00"
}

# 購買記錄
{
 "record_type":1, # 1 表示購買記錄
 "user_id": 6,
 "client_ip": "100.0.0.8",
 "product_id": 101,
 "create_time": "2021-09-08 18:00:00"
}           

複制

3.3 編寫 Flink SQL 作業

示例中實作了 UV、PV 和轉化率3個名額的擷取邏輯,并寫入 Sink 端。

1、定義 Source

CREATE TABLE `input_web_record` (
  `record_type` INT,
  `user_id` INT,
  `client_ip` VARCHAR,
  `product_id` INT,
  `create_time` TIMESTAMP,
  `times` AS create_time,
  WATERMARK FOR times AS times - INTERVAL '10' MINUTE
) WITH (
    'connector' = 'kafka',   -- 可選 'kafka','kafka-0.11'. 注意選擇對應的内置 Connector
    'topic' = 'uvpv-demo',  
    'scan.startup.mode' = 'earliest-offset',
    --'properties.bootstrap.servers' = '82.157.27.147:9092',
    'properties.bootstrap.servers' = '10.1.0.10:9092',  
    'properties.group.id' = 'WebRecordGroup',  -- 必選參數, 一定要指定 Group ID
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 結構解析異常
    'json.fail-on-missing-field' = 'false'   -- 如果設定為 true, 則遇到缺失字段會報錯 設定為 false 則缺失字段設定為 null
);           

複制

2、定義 Sink

-- UV sink
CREATE TABLE `output_uv` (  
`userids`   STRING,
`user_id` STRING
) WITH (
 'connector' = 'redis',          
 'command' = 'sadd',              -- 使用集合儲存uv(支援指令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379',  -- redis連接配接位址,叢集模式多個節點使用'',''分隔。
 -- 'additional-key' = '<key>',   -- 用于指定hset和zadd的key。hset、zadd必須設定。
 'password' = 'yourpassword'  
);

-- PV sink
CREATE TABLE `output_pv` (  
`pagevisits`   STRING,
`product_id` STRING,
`hour_count` BIGINT
) WITH (
 'connector' = 'redis',          
 'command' = 'lpush',              -- 使用清單儲存pv(支援指令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379',   -- redis連接配接位址,叢集模式多個節點使用'',''分隔。
 -- 'additional-key' = '<key>',   -- 用于指定hset和zadd的key。hset、zadd必須設定。
 'password' = 'yourpassword'  
);

-- 轉化率 sink
CREATE TABLE `output_conversion_rate` (  
`conversion_rate`   STRING,
`rate` STRING
) WITH (
 'connector' = 'redis',        
 'command' = 'set',              -- 使用清單儲存pv(支援指令:set、lpush、sadd、hset、zadd)
 'nodes' = '192.28.28.217:6379', -- redis連接配接位址,叢集模式多個節點使用'',''分隔。
 -- 'additional-key' = '<key>', -- 用于指定hset和zadd的key。hset、zadd必須設定。
 'password' = 'yourpassword'  
);           

複制

3、業務邏輯

-- 加工得到 UV 名額,統計所有時間内的 UV
INSERT INTO output_uv
SELECT
 'userids' AS `userids`,
CAST(user_id AS string) AS user_id
FROM input_web_record ;

-- 加工并得到 PV 名額,統計每 10 分鐘内的 PV
INSERT INTO output_pv
SELECT
 'pagevisits' AS pagevisits,
CAST(product_id AS string) AS product_id,
SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
user_id;

-- 加工并得到轉化率名額,統計每 10 分鐘内的轉化率
INSERT INTO output_conversion_rate
SELECT
 'conversion_rate' AS conversion_rate,
CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY  
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id;           

複制

3.4 結果驗證

通常情況,會通過 Web 網站來展示統計到的 UV、PV 名額,這裡為了簡單直接在Redis 控制台(https://console.cloud.tencent.com/redis#/)登入進行查詢:

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

userids: 存儲 UV

pagevisits: 存儲 PV

conversion_rate: 存儲轉化率,即購買商品次數/總頁面點選量。

四 總結

通過自建 Kafka 叢集采集資料,在流計算 Oceanus (Flink) 中實時進行字段累加、視窗聚合等操作,将加工後的資料存儲在雲資料庫Redis,統計到實時重新整理的 UV、PV 等名額。這個方案在 Kafka json 格式設計時為了簡便易懂做了簡化處理,将浏覽記錄和産品購買記錄都放在了同一個 topic 中,重點通過打通自建 IDC 和騰訊雲産品間的網絡來展現整個方案。針對超大規模的 UV 去重,微視的同僚采用了 Redis hyperloglog 方式來實作 UV 統計。相比直接使用 set 類型方式有極小的記憶體空間占用的優點,詳情見連結:https://cloud.tencent.com/developer/article/1889162。

流計算 Oceanus 限量秒殺專享活動火爆進行中↓↓

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

點選文末「閱讀原文」,了解騰訊雲流計算 Oceanus 更多資訊~

騰訊雲大資料

名額統計:基于流計算 Oceanus(Flink) 實作實時 UVPV 統計三 方案實作四 總結

長按二維碼

關注我們