天天看點

【最佳實踐】實時計算 Flink 版在金融行業的實時數倉建設實踐

行業背景

  • 行業現狀: 

金融是現代經濟的核心。我國金融業在市場化改革和對外開放中不斷發展,金融總量大幅增長。金融穩定直接關系到國家經濟發展的前途和命運,金融業是國民經濟發展的晴雨表。對我國金融業發展現狀進行客觀分析,對金融業發展趨勢進行探索,有助于消除金融隐患,使金融業朝着健康、有序方向發展。

  • 大資料在其行業中的作用:
    1. 金融服務和産品創新:借助社交網絡等平台産生的海量使用者和資料記錄着使用者群體的興趣和偏好情緒等資訊, 對客戶行為模式進行分析,可以帶來更貼近客戶需求的産品創新。
    2. 增強使用者體驗:通過大資料分析對客戶進行畫像,結合客戶畫像特征,為使用者提供個性化服務,增強使用者體驗。

業務場景

某保險公司開發了個金融類APP,該公司在APP中會投放保險廣告、釋出優惠活動,使用者通過APP進行投保等操作。

業務的建構涉及到幾個端:

  1. APP:應用程式,使用者通路入口,使用者通過APP點選浏覽保險廣告、優惠活動等,并進行投保下單。
  2. 背景系統:

a.營運人員:

(1)根據使用者送出的訂單統計指定時間段的總投保人數和總投保金額,輔助優化營運方案。

(2)對使用者的日常行為做出分析,分析出每個使用者比較關注的資訊,作為推薦系統的資料來源。

b:業務經理:

對重點客戶的投保金額變動進行監控,将投保金額變動較大的重點客戶推送給業務經理,業務經理針對性開展客戶挽留等操作。

技術架構

【最佳實踐】實時計算 Flink 版在金融行業的實時數倉建設實踐

架構解析:

資料采集:該場景中,數倉的資料主要來源于APP等系統的埋點資訊,被實時采集至DATAHUB作為Flink的輸入資料。

實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的建構,全部通過Flink完成,Flink實時讀取DATAHUB的資料進行處理,并與維表進行關聯查詢等操作,最終實時統計的結果輸入到下遊資料庫RDS中。

業務名額

  • 營運資料分析
    • 每小時的投保人數
    • 每小時的總保費
    • 每小時總保單數
  • 使用者行為監控
    • 使用者原投保金額
    • 使用者現投保金額
  • 使用者行為分析
    • 使用者最後通路的頁面類型
    • 使用者最後通路的頁面位址

資料結構

場景一:營運資料分析

本場景用于計算每小時總投保人數和總保費。

使用者投保會生成一份訂單,訂單内容包括使用者id、使用者姓名、訂單号等。flink實時讀取訂單資訊,用where過濾出大于目前小時時間段的資料(資料過濾),然後根據使用者id做分組用last_value函數擷取每個使用者最終生成的訂單資訊(訂單去重),最後按照小時次元聚合統計目前小時的總保費和總投保人數。

輸入表

CREATE   TABLE  user_order
(
    id                          bigint    comment '使用者id'
    ,order_no                    varchar    comment '訂單号'
    ,order_type                  bigint    comment '訂單類型'
    ,pay_time                    bigint  comment '支付時間'
    ,order_price                 double    comment '訂單價格'
    ,customer_name               varchar    comment '使用者姓名'
    ,customer_tel                varchar    comment '使用者電話'
    ,certificate_no              varchar    comment '證件号碼'
    ,gmt_created                 bigint  comment '建立時間'
    ,gmt_modified                bigint  comment '修改時間'
    ,account_payble             double      comment '應付金額'

) WITH (
       type='datahub',
     topic='user_order'
       ...
)           

輸出表

CREATE    TABLE hs_order (
    biz_date              varchar COMMENT 'yyyymmddhh'
    ,total_premium         DOUBLE COMMENT '總保費'
    ,policy_cnt            BIGINT COMMENT '保單數'
    ,policy_holder_cnt     BIGINT COMMENT '投保人數'
    ,PRIMARY KEY (biz_date)
) WITH
 (
   type='rds',
   tableName='adm_pfm_zy_order_gmv_msx_hs'
   ...
 ) 
 ;           

業務代碼

1:資料清洗

create view  last_order
as 
select 
     id                                 as id               
    ,last_value(order_no)               as order_no                   
    ,last_value(customer_tel)           as customer_tel     
    ,last_value(gmt_modified)           as gmt_modified                      
    ,last_value(account_payble)         as account_payble   
    from user_order
    where gmt_modified  >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
    group by id
;           

2:資料彙總

insert into hs_order
select 
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select 
    from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')      as biz_date
    ,sum(coalesce(account_payble,0))  as total_premium
    ,count(distinct order_no)   as policy_cnt
    ,count(distinct customer_tel)  as policy_holder_cnt
from  last_order a 
group by 
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a 
;           

場景二:使用者行為監控

本場景對投保金額變動較大(總保額變動大于15%)的重點使用者進行監控。

Flink實時讀取使用者建立訂單資料,建立訂單包括使用者的id和現投保金額,通過where過濾沒有儲存成功的訂單。維表中存儲了業務經理關注的重點使用者資料(如原投保金額),通過建立訂單中的使用者id與維表進行關聯查詢,如果維表中存在此使用者且總保額下降15%以上,則将該使用者的詳細資訊推送給業務經理,并且在維表中更新該使用者投保金額及投保資訊。

create table update_info
(
 id             bigint      comment '使用者id'
,channel        varchar     comment '管道編号'
,open_id        varchar     comment '訂單id'
,event          varchar     comment '事件類型'
,now_premium  varchar     comment '現投保金額'
,creator        varchar     comment '建立人'
,modifier       varchar     comment '最後修改人'
,gmt_modified   bigint      comment '修改時間'
,now_info       varchar     comment '現投保資訊'
) with (
    type = 'datahub',
    topic = 'dh_prd_dm_account_wechat_trace'
    ...
   
);           

維表

create table raw_info
(
     id                 bigint  comment '使用者id'
    ,raw_premium      varchar  comment '原投保金額'
    ,raw_info           varchar  comment '原投保資訊'
    ,PRIMARY KEY(id)
    ,PERIOD FOR SYSTEM_TIME
) WITH (
    type='ots',
    tableName='adm_zy_acct_sub_wechat_list'
    ...
);
           

create table update_info
(
     id               bigint  comment '使用者id'
    ,raw_info         varchar comment '原投保資訊'
    ,now_info         varchar comment '現投保資訊'
    ,raw_premium      varchar comment '原投保金額'
    ,now_premium      varchar comment '現投保金額'
    ,PRIMARY KEY(id)
) WITH (
    type='rds',
    tableName='wechat_activity_user'
    ...
);
           

業務代碼:

create view info_join as 
select
      t1.id               as  id
    ,t2.raw_info          as  raw_info
    ,t1.now_info          as  now_info  
    ,t2.raw_premium     as raw_premium
    ,t1.now_premium     as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;           
insert into update_info
select 
     id                        as id  
    ,raw_info                  as raw_info
    ,now_info                  as now_info
    ,raw_premium               as raw_premium  
    ,now_premium               as now_premium  
from info_join where now_premium<raw_premium*0.85
;           
insert into raw_info
select 
     id                        as id  
    ,now_premium               as raw_premium  
    ,now_info                  as raw_info
from info_join
;           

場景三:使用者行為分析

本場景記錄使用者最後通路的頁面名稱和類型,作為使用者畫像的特征值。

Flink讀取使用者浏覽APP頁面的日志資訊,如使用者id、頁面名稱和頁面類型等資訊,根據使用者id和裝置id進行分組,通過last_value函數擷取使用者最後一次通路頁面的名稱和類型,輸出到RDS作為推薦系統的輸入資料,在下次使用者登入的時候為其推送相關廣告資訊,提升使用者廣告點選率和下單的成功率。

create table user_log
(
 log_time                bigint  comment '日志采集日期(Linux時間)' 
,device_id               varchar  comment '裝置id'
,account_id              varchar  comment '賬戶id'
,bury_point_type         varchar  comment '頁面類型或埋點類型'
,page_name               varchar  comment '頁面名稱或埋點時一級目錄'
) WITH (
    type = 'datahub',
    topic = 'edw_zy_evt_bury_point_log'
    ...
);
           

create table user_last_log
(
     account_id         varchar  comment 'account_id'
    ,device_id          varchar    comment  '裝置id'
    ,bury_point_type    varchar  comment '頁面類型'
    ,page_name          varchar  comment '頁面名稱'
    ,primary key(account_id)
) WITH (
    type='rds',
    tableName='adm_zy_moblie_charge_exchg_rs'
    ...
    
);
           

insert into user_last_log
select
    account_id
    ,device_id
    ,last_value(bury_point_type)  as bury_point_type
    ,last_value(page_name)  as page_name
from user_log
where account_id is not null 
group by account_id,device_id
           

實時計算 Flink 版産品交流群

【最佳實踐】實時計算 Flink 版在金融行業的實時數倉建設實踐
阿裡雲實時計算Flink - 解決方案: https://developer.aliyun.com/article/765097 阿裡雲實時計算Flink - 場景案例: https://ververica.cn/corporate-practice 阿裡雲實時計算Flink - 産品詳情頁: https://www.aliyun.com/product/bigdata/product/sc

繼續閱讀