行業背景
- 行業現狀:
金融是現代經濟的核心。我國金融業在市場化改革和對外開放中不斷發展,金融總量大幅增長。金融穩定直接關系到國家經濟發展的前途和命運,金融業是國民經濟發展的晴雨表。對我國金融業發展現狀進行客觀分析,對金融業發展趨勢進行探索,有助于消除金融隐患,使金融業朝着健康、有序方向發展。
- 大資料在其行業中的作用:
- 金融服務和産品創新:借助社交網絡等平台産生的海量使用者和資料記錄着使用者群體的興趣和偏好情緒等資訊, 對客戶行為模式進行分析,可以帶來更貼近客戶需求的産品創新。
- 增強使用者體驗:通過大資料分析對客戶進行畫像,結合客戶畫像特征,為使用者提供個性化服務,增強使用者體驗。
業務場景
某保險公司開發了個金融類APP,該公司在APP中會投放保險廣告、釋出優惠活動,使用者通過APP進行投保等操作。
業務的建構涉及到幾個端:
- APP:應用程式,使用者通路入口,使用者通過APP點選浏覽保險廣告、優惠活動等,并進行投保下單。
- 背景系統:
a.營運人員:
(1)根據使用者送出的訂單統計指定時間段的總投保人數和總投保金額,輔助優化營運方案。
(2)對使用者的日常行為做出分析,分析出每個使用者比較關注的資訊,作為推薦系統的資料來源。
b:業務經理:
對重點客戶的投保金額變動進行監控,将投保金額變動較大的重點客戶推送給業務經理,業務經理針對性開展客戶挽留等操作。
技術架構
架構解析:
資料采集:該場景中,數倉的資料主要來源于APP等系統的埋點資訊,被實時采集至DATAHUB作為Flink的輸入資料。
實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的建構,全部通過Flink完成,Flink實時讀取DATAHUB的資料進行處理,并與維表進行關聯查詢等操作,最終實時統計的結果輸入到下遊資料庫RDS中。
業務名額
- 營運資料分析
- 每小時的投保人數
- 每小時的總保費
- 每小時總保單數
- 使用者行為監控
- 使用者原投保金額
- 使用者現投保金額
- 使用者行為分析
- 使用者最後通路的頁面類型
- 使用者最後通路的頁面位址
資料結構
場景一:營運資料分析
本場景用于計算每小時總投保人數和總保費。
使用者投保會生成一份訂單,訂單内容包括使用者id、使用者姓名、訂單号等。flink實時讀取訂單資訊,用where過濾出大于目前小時時間段的資料(資料過濾),然後根據使用者id做分組用last_value函數擷取每個使用者最終生成的訂單資訊(訂單去重),最後按照小時次元聚合統計目前小時的總保費和總投保人數。
輸入表
CREATE TABLE user_order
(
id bigint comment '使用者id'
,order_no varchar comment '訂單号'
,order_type bigint comment '訂單類型'
,pay_time bigint comment '支付時間'
,order_price double comment '訂單價格'
,customer_name varchar comment '使用者姓名'
,customer_tel varchar comment '使用者電話'
,certificate_no varchar comment '證件号碼'
,gmt_created bigint comment '建立時間'
,gmt_modified bigint comment '修改時間'
,account_payble double comment '應付金額'
) WITH (
type='datahub',
topic='user_order'
...
)
輸出表
CREATE TABLE hs_order (
biz_date varchar COMMENT 'yyyymmddhh'
,total_premium DOUBLE COMMENT '總保費'
,policy_cnt BIGINT COMMENT '保單數'
,policy_holder_cnt BIGINT COMMENT '投保人數'
,PRIMARY KEY (biz_date)
) WITH
(
type='rds',
tableName='adm_pfm_zy_order_gmv_msx_hs'
...
)
;
業務代碼
1:資料清洗
create view last_order
as
select
id as id
,last_value(order_no) as order_no
,last_value(customer_tel) as customer_tel
,last_value(gmt_modified) as gmt_modified
,last_value(account_payble) as account_payble
from user_order
where gmt_modified >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
group by id
;
2:資料彙總
insert into hs_order
select
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH') as biz_date
,sum(coalesce(account_payble,0)) as total_premium
,count(distinct order_no) as policy_cnt
,count(distinct customer_tel) as policy_holder_cnt
from last_order a
group by
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a
;
場景二:使用者行為監控
本場景對投保金額變動較大(總保額變動大于15%)的重點使用者進行監控。
Flink實時讀取使用者建立訂單資料,建立訂單包括使用者的id和現投保金額,通過where過濾沒有儲存成功的訂單。維表中存儲了業務經理關注的重點使用者資料(如原投保金額),通過建立訂單中的使用者id與維表進行關聯查詢,如果維表中存在此使用者且總保額下降15%以上,則将該使用者的詳細資訊推送給業務經理,并且在維表中更新該使用者投保金額及投保資訊。
create table update_info
(
id bigint comment '使用者id'
,channel varchar comment '管道編号'
,open_id varchar comment '訂單id'
,event varchar comment '事件類型'
,now_premium varchar comment '現投保金額'
,creator varchar comment '建立人'
,modifier varchar comment '最後修改人'
,gmt_modified bigint comment '修改時間'
,now_info varchar comment '現投保資訊'
) with (
type = 'datahub',
topic = 'dh_prd_dm_account_wechat_trace'
...
);
維表
create table raw_info
(
id bigint comment '使用者id'
,raw_premium varchar comment '原投保金額'
,raw_info varchar comment '原投保資訊'
,PRIMARY KEY(id)
,PERIOD FOR SYSTEM_TIME
) WITH (
type='ots',
tableName='adm_zy_acct_sub_wechat_list'
...
);
create table update_info
(
id bigint comment '使用者id'
,raw_info varchar comment '原投保資訊'
,now_info varchar comment '現投保資訊'
,raw_premium varchar comment '原投保金額'
,now_premium varchar comment '現投保金額'
,PRIMARY KEY(id)
) WITH (
type='rds',
tableName='wechat_activity_user'
...
);
業務代碼:
create view info_join as
select
t1.id as id
,t2.raw_info as raw_info
,t1.now_info as now_info
,t2.raw_premium as raw_premium
,t1.now_premium as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select
id as id
,raw_info as raw_info
,now_info as now_info
,raw_premium as raw_premium
,now_premium as now_premium
from info_join where now_premium<raw_premium*0.85
;
insert into raw_info
select
id as id
,now_premium as raw_premium
,now_info as raw_info
from info_join
;
場景三:使用者行為分析
本場景記錄使用者最後通路的頁面名稱和類型,作為使用者畫像的特征值。
Flink讀取使用者浏覽APP頁面的日志資訊,如使用者id、頁面名稱和頁面類型等資訊,根據使用者id和裝置id進行分組,通過last_value函數擷取使用者最後一次通路頁面的名稱和類型,輸出到RDS作為推薦系統的輸入資料,在下次使用者登入的時候為其推送相關廣告資訊,提升使用者廣告點選率和下單的成功率。
create table user_log
(
log_time bigint comment '日志采集日期(Linux時間)'
,device_id varchar comment '裝置id'
,account_id varchar comment '賬戶id'
,bury_point_type varchar comment '頁面類型或埋點類型'
,page_name varchar comment '頁面名稱或埋點時一級目錄'
) WITH (
type = 'datahub',
topic = 'edw_zy_evt_bury_point_log'
...
);
create table user_last_log
(
account_id varchar comment 'account_id'
,device_id varchar comment '裝置id'
,bury_point_type varchar comment '頁面類型'
,page_name varchar comment '頁面名稱'
,primary key(account_id)
) WITH (
type='rds',
tableName='adm_zy_moblie_charge_exchg_rs'
...
);
insert into user_last_log
select
account_id
,device_id
,last_value(bury_point_type) as bury_point_type
,last_value(page_name) as page_name
from user_log
where account_id is not null
group by account_id,device_id
實時計算 Flink 版産品交流群
阿裡雲實時計算Flink - 解決方案: https://developer.aliyun.com/article/765097 阿裡雲實時計算Flink - 場景案例: https://ververica.cn/corporate-practice 阿裡雲實時計算Flink - 産品詳情頁: https://www.aliyun.com/product/bigdata/product/sc