天天看點

【最佳實踐】實時計算Flink在遊戲行業的實時數倉建設實踐行業背景業務場景技術架構業務名額業務代碼

行業背景

  • 行業現狀: 
    • 随着網際網路和移動網際網路的互相促進與融合,以及PC終端和各類移動終端在智能化和便攜性上的趨同,遊戲産品跨平台運作于各類終端的需求逐漸顯現,特别是網際網路頁面遊戲中的社交類遊戲等産品跨平台運作于各類移動終端已經出現,随着版權價值意識的增強,遊戲開發商和營運商在取得版權後,加強了對文化内容的開發利用,力圖以多形式多媒介的産品實作版權價值的最大化。
  • 大資料在遊戲行業中的作用:
    • 根據遊戲資料分析遊戲産品趨勢,實作精準營銷
    • 根據玩家付費和活躍度等進行玩家畫像,針對不同的玩家設計不同的商業化活動方案,提升付費玩家的體驗,提升遊戲消費額

業務場景

某遊戲公司開發了個遊戲APP,該公司在APP中會釋出一些遊戲場景、遊戲角色、裝備、精美皮膚等内容,玩家線上娛樂,産生充值購買等行為。

業務的建構涉及到幾個端:

  1. APP:應用程式,玩家通路入口,玩家主要進行如下操作:
    1. 新增賬號
    2. 線上娛樂
    3. 遊戲充值
  2. 背景系統:對玩家行為資料進行分析,提供給營運/運維人員,用于輔助公司決策。
    1. 實時歸檔日志:用于OLAP查詢或離線資料分析
    2. 實時KPI統計:統計不同時間段的遊戲點選量,作為确定遊戲活動開啟、版本更新、伺服器維護等操作時間的依據;根據遊戲收益金額,制定更合理的商業化活動方式
    3. 實時統計TopN遊戲:輔助公司對遊戲APP開發資源、營運資源的配置設定決策

技術架構

【最佳實踐】實時計算Flink在遊戲行業的實時數倉建設實踐行業背景業務場景技術架構業務名額業務代碼

架構解析:

資料采集:該場景中,數倉的資料來源有兩部分:使用者記錄檔采集至日志服務(SLS),使用者的購買充值等資訊則通過RDS Binlog日志同步至DataHub。

實時數倉架構:該場景中,整個實時數倉的聚合統計,全部通過Flink完成,Flink實時讀取SLS和DataHub的資料進行處理,并與維表進行關聯查詢等操作,最終實時統計的結果輸入到下遊資料庫ODPS和RDS中。

業務名額

  • 日志歸檔
  • KPI統計
    • 遊戲UV
    • 新增角色累積收益總額
    • 遊戲評論次數
  • 熱門遊戲TOP3

說明:該案例中僅包含以上場景及名額,在實際的應用場景下還包括遊戲賬号異地登入、玩家畫像等其他名額。

業務代碼

場景一:日志歸檔

本場景将使用者點選遊戲APP産生的日志,實時同步至ODPS進行日志歸檔,并提取日志産生的時間(按天、小時次元)等資訊,用于營運人員進行離線分析。

輸入表

CREATE TABLE game_log_source (
  log_t BIGINT,
  app_id VARCHAR ,
  app_ver VARCHAR,
  body VARCHAR,
  param1 VARCHAR,
  param2 VARCHAR,
  param3 VARCHAR,
  param4 VARCHAR,
  param5 VARCHAR,
  device_id VARCHAR,
  lcmid BIGINT 
) WITH (
    type= 'sls',
  ...
);           

輸出表

CREATE TABLE game_log (
  log_t bigint,
  app_ver VARCHAR,
  device_id VARCHAR,
  mbga_uid bigint,
  param1 VARCHAR,
  param2 VARCHAR,
  param3 VARCHAR,
  param4 VARCHAR,
  param5 VARCHAR,
  `user_id` VARCHAR,
  a_typ VARCHAR,
  `zone` VARCHAR,
  `ahour` bigint,
  `dt` bigint
) with (
  type = 'odps',
  ...
);           

INSERT INTO game_log
SELECT
  log_t,
  app_ver,
  device_id,
  lcmid as mbga_uid,
  param1,
  param2,
  param3,
  param4,
  param5,
  SPLIT_INDEX (JSON_VALUE (body, '$.a_usr'), '@', 1) AS user_id,
  JSON_VALUE (body, '$.a_typ') AS a_typ,
  concat ('', SPLIT_INDEX (JSON_VALUE (body, '$.a_usr'), '@', 0)) AS `zone`,
  cast (from_unixtime (log_t, 'yyyyMMddHH') as bigint) AS `ahour`,
  cast (from_unixtime (log_t, 'yyyyMMdd') as bigint) AS `dt`
FROM
  game_log_source;           

場景二:KPI統計

本場景統計每天每小時的遊戲UV。

以server_date_day和game_id分組,然後與維表進行join擴充玩家資訊,使用類似count(distinct user_id) filter (where reg_hour=0)的方法求得00:00—00:59時間段的遊戲UV,進而統計每天每小時的遊戲UV。

CREATE TABLE agent_login (
  user_id                       VARCHAR,
    user_name                     VARCHAR,
    gender                        VARCHAR,
    birth                         VARCHAR,
    age                           VARCHAR,
    game_id                       VARCHAR,
    game_name                     VARCHAR,
    channel_id                    VARCHAR,
    game_channel_id               VARCHAR,
  os_type                       VARCHAR,
  server_date_day               VARCHAR,
  reg_date                      VARCHAR,
    reg_hour                      BIGINT,
    ad_id                         VARCHAR,
    reg_via                       VARCHAR,
    dt                            VARCHAR 
)WITH (
  type='datahub',
  ...
);           

次元表

CREATE TABLE advertising (
  id                                  INT,    
  ad_name                             VARCHAR,
  game_id                             INT,    
  game_name                           VARCHAR,
  media_id                            INT,
  media_account_id                    INT,
  package_id                          INT,
  ad_resource_id                      INT,
  ad_media_params                     VARCHAR,
  admin_id                            INT,
  create_time                         TIMESTAMP,
  PRIMARY KEY (package_id,ad_media_params),
  PERIOD FOR SYSTEM_TIME
)WITH (
  type='rds',
  ...
);           

CREATE TABLE hour_uv(
 `date`                         VARCHAR,
  ad_game_id                    VARCHAR,
  channel_id                    VARCHAR,
  package_id                    VARCHAR,
  ad_media_params               VARCHAR,
  hour_active_nuv_0             BIGINT,
  hour_active_nuv_1             BIGINT,
  hour_active_nuv_2             BIGINT,
  hour_active_nuv_3             BIGINT,
  hour_active_nuv_4             BIGINT,
  hour_active_nuv_5             BIGINT,
  hour_active_nuv_6             BIGINT,
  hour_active_nuv_7             BIGINT,
  hour_active_nuv_8             BIGINT,
  hour_active_nuv_9             BIGINT,
  hour_active_nuv_10            BIGINT,
  hour_active_nuv_11            BIGINT,
  hour_active_nuv_12            BIGINT,
  hour_active_nuv_13            BIGINT,
  hour_active_nuv_14            BIGINT,
  hour_active_nuv_15            BIGINT,
  hour_active_nuv_16            BIGINT,
  hour_active_nuv_17            BIGINT,
  hour_active_nuv_18            BIGINT,
  hour_active_nuv_19            BIGINT,
  hour_active_nuv_20            BIGINT,
  hour_active_nuv_21            BIGINT,
  hour_active_nuv_22            BIGINT,
  hour_active_nuv_23            BIGINT,
  create_time                   VARCHAR,
  via                           VARCHAR,
  media_id                      BIGINT,
  media_account_id              BIGINT,
  ad_resource_id                BIGINT,
  game_id                       BIGINT,
  admin_id                      BIGINT,
  ad_id                         BIGINT,
  os_type                       VARCHAR
  )WITH (
    type='rds',
    ...
  );           

INSERT INTO hour_uv
select 
server_date_day as server_date,
o.game_id,
o.channel_id,
o.game_channel_id,
o.ad_id,
count(distinct user_id) filter (where reg_hour=0) as hour_active_nuv_0,
count(distinct user_id) filter (where reg_hour=1) as hour_active_nuv_1,
count(distinct user_id) filter (where reg_hour=2) as hour_active_nuv_2,
count(distinct user_id) filter (where reg_hour=3) as hour_active_nuv_3,
count(distinct user_id) filter (where reg_hour=4) as hour_active_nuv_4,
count(distinct user_id) filter (where reg_hour=5) as hour_active_nuv_5,
count(distinct user_id) filter (where reg_hour=6) as hour_active_nuv_6,
count(distinct user_id) filter (where reg_hour=7) as hour_active_nuv_7,
count(distinct user_id) filter (where reg_hour=8) as hour_active_nuv_8,
count(distinct user_id) filter (where reg_hour=9) as hour_active_nuv_9,
count(distinct user_id) filter (where reg_hour=10) as hour_active_nuv_10,
count(distinct user_id) filter (where reg_hour=11) as hour_active_nuv_11,
count(distinct user_id) filter (where reg_hour=12) as hour_active_nuv_12,
count(distinct user_id) filter (where reg_hour=13) as hour_active_nuv_13,
count(distinct user_id) filter (where reg_hour=14) as hour_active_nuv_14,
count(distinct user_id) filter (where reg_hour=15) as hour_active_nuv_15,
count(distinct user_id) filter (where reg_hour=16) as hour_active_nuv_16,
count(distinct user_id) filter (where reg_hour=17) as hour_active_nuv_17,
count(distinct user_id) filter (where reg_hour=18) as hour_active_nuv_18,
count(distinct user_id) filter (where reg_hour=19) as hour_active_nuv_19,
count(distinct user_id) filter (where reg_hour=20) as hour_active_nuv_20,
count(distinct user_id) filter (where reg_hour=21) as hour_active_nuv_21,
count(distinct user_id) filter (where reg_hour=22) as hour_active_nuv_22,
count(distinct user_id) filter (where reg_hour=23) as hour_active_nuv_23,
dt,
reg_via,
cast(min(ad.media_id) as bigint),
cast(min(ad.media_account_id) as bigint),
cast(min(ad.ad_resource_id) as bigint),
cast(min(ad.game_id) as bigint),
cast(min(ad.admin_id) as bigint),
cast(min(ad.id) as bigint),
COALESCE((case when o.os_type = 'h5' then 'android' else o.os_type end),'android')
from agent_login AS o 
LEFT JOIN advertising FOR SYSTEM_TIME AS OF PROCTIME() AS ad 
on (o.ad_id=ad.ad_media_params and o.game_channel_id=ad.package_id)  
where server_date_day=reg_date 
group by server_date_day,o.game_id,o.channel_id,o.game_channel_id,o.ad_id,dt,reg_via,COALESCE((case when o.os_type = 'h5' then 'android' else o.os_type end),'android');           

本場景統計新增遊戲角色在不同時間段内(新增日、新增日和次日、新增日至新增第3日)産生的收益總額。

以reg_date和game_id分組,與維表join擴充玩家資訊,通過TopN進行去重,然後比如使用sum(money) filter (where reg_date>=server_date_day-14),得到新增角色後15天内的收益金額。進而得到新增遊戲角色後不同時間段的收益總額。

CREATE TABLE `order` (
    `server`                      VARCHAR,
  os_type                       VARCHAR,
  create_time                   VARCHAR,
  update_time                   VARCHAR,
  money                         DOUBLE,
  user_id                       VARCHAR,
  id                            VARCHAR,
  channel_id                    VARCHAR,
  order_sn                      VARCHAR,
  status                        VARCHAR,
  game_id                       VARCHAR,
  game_channel_id               VARCHAR,
  first_order_date              VARCHAR,
  server_date_day               VARCHAR,  --角色登入時間
  reg_date                      VARCHAR,  --角色釋出時間
  ad_id                         VARCHAR,
  via                           VARCHAR,
  reg_via                       VARCHAR,
  server_ts                     VARCHAR,
  game_name                     VARCHAR,
  package_name                  VARCHAR,
  dt                            VARCHAR
) WITH (
  type = 'datahub',
  ...
);           

次元A表

CREATE TABLE advertising (
     id                                  INT,
  channel_id                          INT,
  game_id                             INT,
  game_name                           VARCHAR,
  media_id                            INT,
  media_account_id                    INT,
  package_id                          INT,
  package_name                        VARCHAR,
  ad_resource_id                      INT,
  ad_media_params                     VARCHAR,
  `type`                              TINYINT,
  status                              TINYINT,
  admin_id                            INT,
  create_time                         TIMESTAMP,
  update_time                         TIMESTAMP,
    PRIMARY KEY (package_id,ad_media_params),
    PERIOD FOR SYSTEM_TIME
) WITH (
  type= 'rds',
  ...
);           

次元B表

CREATE TABLE advertising_divided (
    id                          INT,
  ad_id                       INT,
  media_id                    INT,
  media_account_id            INT,
  ad_resource_id              INT,
  game_id                     INT,
  package_id                  INT,
  ad_media_params             VARCHAR,
  ratio                       decimal(10,2),
  divide_date                 VARCHAR,
  create_time                 TIMESTAMP,
  update_time                 TIMESTAMP,
    PRIMARY KEY (package_id,ad_media_params,divide_date),
    PERIOD FOR SYSTEM_TIME
) WITH (
  type= 'rds',
  ...
);           

CREATE TABLE total_revenue (
    `date`                        VARCHAR,
  ad_game_id                    VARCHAR,
  channel_id                    VARCHAR,
  package_id                    VARCHAR,
  ad_media_params               VARCHAR,
  pay_people_yet                BIGINT,
  pay_amount_yet                DECIMAL,
  pay_amount_1                  DECIMAL,
  pay_amount_2                  DECIMAL,
  pay_amount_3                  DECIMAL,
  split_share_rate              decimal(10,2),
  create_time                   VARCHAR,
  via                           VARCHAR,
  media_id                      BIGINT,
  media_account_id              BIGINT,
  ad_resource_id                BIGINT,
  game_id                       BIGINT,
  admin_id                      BIGINT,
  ad_id                         BIGINT,
  os_type                       VARCHAR,
    PRIMARY KEY (`date`,ad_game_id,channel_id,package_id,ad_media_params,create_time,via,os_type)
) WITH (
  type= 'rds',
  ...
);           

INSERT INTO total_revenue
select reg_date,o.game_id,o.channel_id,o.game_channel_id,o.ad_id,
count(distinct user_id) filter (where server_date_day=first_order_date) as pay_people_yet,
cast(sum(money) as decimal),
--某日新增的角色中,每個角色在接下來1天内(新增日)為遊戲帶來的收入
cast(sum(money) filter (where reg_date>=server_date_day) as decimal)as pay_amount_1,
--某日新增的角色中,每個角色在接下來2天内(新增日和之後的2日)為遊戲帶來的收入
cast(sum(money) filter (where reg_date>=DATE_SUB(server_date_day,1)) as decimal) as pay_amount_2,
--某日新增的角色中,每個角色在接下來3天内(新增日和之後的3日)為遊戲帶來的收入
cast(sum(money) filter (where reg_date>=DATE_SUB(server_date_day,2)) as decimal) as pay_amount_3,
cast(max(COALESCE(ra.ratio,0)) as decimal(10,2)),dt,reg_via,cast(min(ad.media_id) as bigint),cast(min(ad.media_account_id) as bigint),cast(min(ad.ad_resource_id) as bigint),cast(min(ad.game_id) as bigint),cast(min(ad.admin_id) as bigint),cast(min(ad.id) as bigint),COALESCE((case when o.os_type = 'h5' then 'android' else o.os_type end),'android')
from 
(
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY order_sn ORDER BY server_date_day ASC) as rowNum
  FROM `order`
) AS o 
LEFT JOIN advertising FOR SYSTEM_TIME AS OF PROCTIME() AS ad on (o.ad_id=ad.ad_media_params and o.game_channel_id=ad.package_id)  
LEFT JOIN advertising_divided FOR SYSTEM_TIME AS OF PROCTIME() AS ra on (o.ad_id=ra.ad_media_params and o.game_channel_id=ra.package_id and DATE_FORMAT(o.server_date_day,'yyyy-MM-dd','yyyy-MM')=ra.`divide_date`)  
WHERE rowNum = 1 group by reg_date,o.game_id,o.channel_id,o.game_channel_id,o.ad_id,dt,reg_via,COALESCE((case when o.os_type = 'h5' then 'android' else o.os_type end),'android');           

遊戲評論使用者數

本場景按照三分鐘次元的滾動視窗統計評論遊戲的使用者數。

使用者評論遊戲後産生日志資料,Flink對Json格式的日志資料進行解析并清洗,擷取app_id、遊戲評論時間

day

、遊戲評論的使用者id等資訊,以app_id和

day

進行分組,通過三分鐘的滾動視窗函數進行聚合,統計得到對應的遊戲評論使用者數。

埋點資料樣例

{
    "app_id":"",
     "body":{
        "lid":"",
        "affcode":"",
    }
    "app_table":"",
    "log_t":"",
 
}           

CREATE TABLE log_input (
  `message` VARCHAR,
  ts AS case when JSON_VALUE(`message`, '$.log_t') is NULL then TO_TIMESTAMP('1970-01-01 00:00:00') else TO_TIMESTAMP(cast(JSON_VALUE(`message`, '$.log_t') as BIGINT)) end,
  WATERMARK wk1 FOR ts as withOffset(ts, 180000)  --Watermark計算方法,偏移1分鐘
) WITH (
  type='sls',
  ...
);           

create table total_comments (
  app_id VARCHAR,
  comment_name VARCHAR,
  comment_type VARCHAR,
  kpi_type_val VARCHAR,
  comment_value bigint,
  `day` VARCHAR,
  createtime timestamp,
  PRIMARY KEY (app_id,comment_name,comment_type,kpi_type_val,`day`)
) with (
  type = 'rds',
  ...
);           

解析Json資料并進行清洗

CREATE VIEW user_session AS 
SELECT CAST(TO_DATE(cast(now() as VARCHAR),'yyyyMMdd') as VARCHAR) as `day`,ts,
JSON_VALUE(`message`, '$.app_id') as app_id, 
JSON_VALUE(JSON_VALUE(`message`, '$.body'), '$.lid') as lid,  --遊戲評論的使用者id
JSON_VALUE(JSON_VALUE(`message`, '$.body'), '$.affcode') as affcode 
from log_input  
where JSON_VALUE(`message`, '$.app_table') = 'user_session'
and JSON_VALUE(`message`, '$.body') is not null
and JSON_VALUE(`message`, '$.body') <> ''
and CHAR_LENGTH(cast(JSON_VALUE(`message`, '$.log_t') as varchar)) = 13
and JSON_VALUE(`message`, '$.app_id') is not NULL
and JSON_VALUE(`message`, '$.app_id') <> ''
and JSON_VALUE(JSON_VALUE(`message`, '$.body'), '$.affcode') is not null
and JSON_VALUE(JSON_VALUE(`message`, '$.body'), '$.affcode') <> 'PRESSURE_TEST'
and JSON_VALUE(JSON_VALUE(`message`, '$.body'), '$.lid') is not null
and JSON_VALUE(JSON_VALUE(`message`, '$.body'), '$.lid') <> '';           

統計3分鐘次元的評論次數

INSERT INTO total_comments
SELECT
app_id,
'comment_name' as comment_name,
'comment' as comment_type,
affcode as comment_type_val,
count(DISTINCT lid) as comment_value,
`day`,
CURRENT_TIMESTAMP as createtime
from  user_session
GROUP BY `day`,TUMBLE(ts, INTERVAL '3' MINUTE),app_id,affcode;           

場景三:熱門遊戲TOP3

本場景是用于計算每天的熱門遊戲的排行榜。

在遊戲商城前端下載下傳頁面進行埋點,将埋點資料同步至DataHub,以time和game_app分組,計算單天内每個遊戲的總下載下傳次數。對下載下傳次數進行topn排序,得到下載下傳次數最多的三個遊戲作為最熱門遊戲。

CREATE TABLE source_table(
  game_app VARCHAR ,--遊戲名稱
  `time` VARCHAR    --時間(本場景為天)
)WITH (
  TYPE='datahub',
  ...
);           

CREATE TABLE result_table(
Ranking BIGINT,
`time` VARCHAR,
game_app VARCHAR,
number BIGINT,
primary key(`time`,game_app)
)WITH (
  TYPE='rds',
  ...
);           

INSERT INTO result_table
SELECT 
Ranking,
`time`,
game_app,
number
FROM (
  SELECT *,
     ROW_NUMBER() OVER (PARTITION BY `time` ORDER BY number desc) AS Ranking
  FROM (
        SELECT 
       `time` AS `time`,
        COUNT(game_app) AS number,
        game_app
        FROM  source_table
        GROUP BY `time`,game_app
    )a
) 
WHERE Ranking <= 3            

實時計算 Flink 版産品交流群

【最佳實踐】實時計算Flink在遊戲行業的實時數倉建設實踐行業背景業務場景技術架構業務名額業務代碼
阿裡雲實時計算Flink - 解決方案: https://developer.aliyun.com/article/765097 阿裡雲實時計算Flink - 場景案例: https://ververica.cn/corporate-practice 阿裡雲實時計算Flink - 産品詳情頁: https://www.aliyun.com/product/bigdata/product/sc