大家好,我是老兵。
本系列為大資料項目實戰系列,每期内容将講解
項目背景
、
技術架構
和核心
代碼
部分,幫助相關小夥伴快速了解大資料項目與技術。
在上期的基于Spark GraphFrame社交網絡實戰項目中,介紹了Spark圖計算與社交關系圖譜,文章反響很好。
本期将繼續介紹基于Spark和Grafana的
電商零售分析
項目,在文末附有電商資料集下載下傳位址,歡迎大家自行領取。話不多說,我們開始。
項目環境:JAVA、IDEA
項目技術:Spark、Grafana
技術難度:中等
1 項目介紹
網際網路背景下的大資料、AI領域不斷創新,衍生出多樣化的電商平台和商品推薦模式。
作為消費者,當我們打開某款購物APP時,随着你在平台上浏覽商品并點選,計算機在背景會記錄你的使用者行為,并為你生成專有的
客群畫像
,真正做到了
千人千面
、
精準推薦
的效果。
關于智能推薦和使用者畫像怎麼實作,我們将在後期系列中讨論。
本項目主要對零售商品進行資料分析,通過技術手段,分析哪幾款商品需求量最大(
購買排行top5
)、
熱門商品
每日變化趨勢、哪些省份是消費大省市(
消費省份分布
)、購買群體男女比率(
使用者群體分析
)。
項目基于
Spark
元件和
Grafana
工具,通過Spark資料分析,進行資料清洗、轉換、計算并儲存,最終Grafana進行可視化大屏展示。
2 系統介紹
項目程式采用Java語言編寫,技術元件采用Spark和Grafana工具。
系統整體分為資料采集、資料分析、資料可視化核心部分。
1)整體技術架構
資料采集層
通過
網絡爬蟲
或者下載下傳
公開資料集
的技術手段(文末提供免費資料集下載下傳)收集電商零售資料,形成結構化文本檔案、資料表。
-
資料分析層
基于微服務和Spark技術棧建構。微服務元件作為系統基礎底座(
非必須
),一般公司有專門的微服務團隊去做。
Spark作為資料分析元件,提供Spark記憶體計算、
資料查詢統計等功能,完成資料的加工、查詢和結果存儲。Spark SQL
-
資料存儲層
Spark計算後的資料落地到存儲媒體中,向外提供資料通路能力,項目中使用Mysql(redis、Hive也可)。
-
資料展示層
在展示層的技術選型方面,我選擇了
: 一個提供幾百種資料源、多種圖形樣式庫的可視化大屏元件,且支援SQL,比較友善。Grafana
2)資料流程設計
資料從源頭的
結構化
形式(csv/table)轉換為Spark的
RDD
形式,并最終流轉到資料庫中的
table表
形式存儲。
- Spark程式讀取源資料(csv、table)并轉成RDD(
)csvRDD
- 經過重複值、異常值、時間格式處理,形成中間RDD(
)transRDD
- Spark SQL進行資料名額計算(
),計算銷售排行、使用者省份分布等名額并儲存到Mysql中(top5xxRDD
)tb_xxAnalysis
- Grafana中進行SQL查詢展示,繪制大屏。
3)相關技術
-
Spark引擎
大資料生态圈常用
,記憶體級分布式分析架構,包含Spark Core、Spark SQL、Spark Streaming、Spark MLlib和Spark Graph等子產品。具體資料可以看我的相關文章,此處不再贅叙。計算引
-
Grafana元件
Grafana是一個開源的
和可視化
平台。提供分析
、查詢
、可視化
和告警
等功能。内部支援多種資料源,提供多種面闆、插件來快速将複雜的資料轉換為漂亮的圖形和可視化的工具,可自定義告警規則。 3 程式實作監控
前面鋪墊了這麼多,下面我們來看看代碼方面怎麼實作的。
先看下整體的架構,正如前面所說。我們在這裡分成了資料采集、資料分析和資料可視化三個部分。
3.1 系統環境
- Maven 3.5、Mysql 5.7
- jdk1.8、scala 2.12
- spark 3.0.2
- grafana 8.5
3.2 Spark初始化
1)環境jar包依賴
這裡使用了引入了spark-core和spark-sql的依賴包,且使用SparkSession方式建立SparkContext上下文。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.3</version>
</dependency>
2)Spark腳本
在系統入口這裡,為了友善本地和伺服器運作
靈活性
,可支援
批量
任務或者
單個
任務方式執行,通過參數傳入控制。
- 包括執行的程式類名、執行标志
- 執行日期範圍(不傳預設全量)
// 計算kpi清單(kpi.txt)
ProductAnalysis 1 2019-11-24 2019-12-22
RegionAnalysis 2 2019-11-24 2019-12-22
UserAnalysis 1 2019-11-24
// 伺服器執行腳本
#!/usr/bin/env bash
SPARK_HOME="/usr/hdp/xxxx/spark"
SPARK_MASTER="yarn"
MAIN_CLASS="com.demo.spark.analysis.launcher.AnalysisLauncher"
SPARK_SUBMIT_OPTS="--master yarn-client --driver-memory 20g --executor-cores 8 --executor-memory 40g --num-executors 5"
...
// 執行腳本指令
sh analysis.sh kpi.txt
3)Spark啟動
這裡為了友善觀察,統一改為Local運作模式;将執行類放入數組,使用
Java反射機制
動态執行分析子類。
// 解析傳遞的kpi.txt中的執行類參數
// PRODUCT_CLASS_NAME、USER_CLASS_NAME
// String[] classNames = parseArgs(args);
String[] classNames = {PRODUCT_CLASS_NAME, USER_CLASS_NAME, REGION_CLASS_NAME};
for (String className: classNames) {
Class c = Class.forName(PACKAGE_NAME + className);
BaseHandler handler = (BaseHandler) c.getConstructors()[0].newInstance();
logger.info("資料分析開始...");
handler.execute(spark, sparkContext);
}
3.3 資料采集子產品
使用SparkSession的
read()算子
讀取csv檔案,設定編碼格式,并進行簡單的
重複值
、
異常值
及
缺失值
處理; 結果儲存到資料庫表中。
// 讀取csv檔案并處理
Dataset productCsvDS = spark.read()
.format("csv")
.option("delimiter", ",")
.option("encoding", "gbk")
.schema(productStructType)
.option("header", "true")
.load(ORDER_FILE_PATH)
.na() // 空值删除
.drop( new String[] {"name", "price"});
// 寫入Mysql訂單表 (解決中文亂碼:設定Mysql 編碼utf8)
productCsvDS.write()
.mode(SaveMode.Overwrite)
.jdbc(JDBC_URL, DB_TABLE_PRODUCT, jdbcProperties);
// 定義StructType
private static StructType getProductStructType() {
StructType productStructType = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.StringType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("price", DataTypes.DoubleType, false)
...
});
return productStructType;
}
3.4 資料分析子產品
使用
Spark SQL
和
Spark内置算子
進行資料統計,不同場景的分析子類均需要實作execute()方法。
// 子類繼承抽象父類execute()方法
public abstract class BaseService {
public void execute(SparkSession spark, JavaSparkContext sparkContext){
// TODO: 繼承子類方法
// 1. 使用者行為分析
// 2. 零售産品分析
}
- 使用者月度購買量省份分布分析(示例)
// 注冊臨時表
prodCsvDS.registerTempTable("product");
orderCsvDS.registerTempTable("orderinfo");
// 列換行;分割prod_ids
String prodSplitSQL = "select " +
" order_id,order_dt," +
" products, prod_view.prod_id," +
" user_id,user_region" +
" from orderinfo " +
" lateral view explode(
split(products, '-')) prod_view as prod_id";
Dataset prodSplitDS = spark.sql(prodSplitSQL);
prodSplitDS.registerTempTable("order_prod");
// 資料統計&結果儲存
String top5regionSQL =
" select " +
" a.dt as dt," +
" a.region as region, " +
" a.cnt as cnt" +
" from (" +
" select " +
" max(order_dt) dt," +
" user_id,region," +
" count(1) cnt " +
" from orderInfoDS " +
" group by user_id,region" +
" ) a " +
" order by a.cnt desc" +
" limit 5 ";
Dataset top5regionDS = spark.sql(top5regionSQL);
top5regionDS.write()
.mode(SaveMode.Overwrite)
.jdbc(JDBC_URL, DB_TABLE_REGION_TOP5, jdbcProperties);
更多代碼方面的問題以及擷取電商零售資料集,歡迎咨詢gzh: 大資料兵工廠
3.5 資料可視化子產品
經過資料分析計算後,名額落表到Mysql資料庫中。這時我們可以在
Grafana界面
中配置相應的圖表。
如圖所示省份狀态分布情況中,我選擇了儀表盤圖形,并且進行了簡單的頁面SQL編輯擷取資料(需要提前配置Mysql
資料源
)。在編輯頁面中支援對圖表的屬性進行微調,提供豐富多樣的圖表庫。
在經過了合适的圖表選擇和屬性配置,并且設定取數時間範圍和重新整理頻率以,最終得到完整的項目成果: