大資料之電商分析系統(二)
本文承接上一篇(電商分析系統一)
一:項目需求具體實作5-10
-
頁面轉換率統計
計算頁面單跳轉化率, 什麼是頁面單跳轉換率, 比如一個使用者在一次 Session過程中通路的頁面路徑 3,5,7,9,10,21,那麼頁面 3 跳到頁面 5 叫一次單跳,7-9 也叫一次單跳,那麼單跳轉化率就是要統計頁面點選的機率,比如: 計算 3-5 的單跳轉化率,先擷取符合條件的 Session 對于頁面 3 的通路次數(PV)為 A,然後擷取符合條件的 Session 中通路了頁面 3 又緊接着通路了頁面 5 的次數為 B,那麼 B/A 就是 3-5 的頁面單跳轉化率,我們記為 C;那麼頁面 5-7 的轉化率怎麼求呢?先需要求出符合條件的 Session 中通路頁面 5 又緊接着通路了頁面 7 的次數為 D,那麼 D/B即為 5-7 的單跳轉化率。
産品經理,可以根據這個名額,去嘗試分析, 整個網站,産品, 各個頁面的表現怎麼樣,是不是需要去優化産品的布局;吸引使用者最終可以進入最後的支付頁面。資料分析師,可以此資料做更深一步的計算和分析。企業管理層, 可以看到整個公司的網站, 各個頁面的之間的跳轉的表現如何,可以适當調整公司的經營戰略或政策。
需要根據查詢對象中設定的 Session 過濾條件,先将對應得 Session過濾出來,然後根據查詢對象中設定的頁面路徑, 計算頁面單跳轉化率, 比如查詢的頁面路徑為:3、5、7、8,那麼就要計算 3-5、5-7、7-8 的頁面單跳轉化率。需要注意的一點是, 頁面的通路時有先後的。
資料源解析:
使用者通路資料表: UserVisitAction
PageOneStepCoverRate 頁面切片轉化率 package com.ityouxin.page import java.util.UUID import com.ityouxin.commons.conf.ConfigurationManager import com.ityouxin.commons.constant.Constants import com.ityouxin.commons.model.UserVisitAction import com.ityouxin.commons.utils.{DateUtils, NumberUtils, ParamUtils} import com.ityouxin.session.PageSplitConvertRate import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Dataset, SaveMode, SparkSession} import net.sf.json.JSONObject import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import scala.collection.mutable import scala.collection.mutable.ListBuffer /*需求5:求每個頁面切片的轉化率*/ object PageOneStepConverRate { def main(ags: Array[String]): Unit = { //初始化配置資訊 val conf = new SparkConf().setMaster("local[*]").setAppName("SessionAnalyzer") //初始化SparkSession val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() //擷取sparkContext val sparkContext = spark.sparkContext //擷取任務配置 val jsonStr = ConfigurationManager.config.getString("task.params.json") val taskParm = JSONObject.fromObject(jsonStr) //查詢user_visit_action表的資料 按照日期範圍 val userVisitActionRDD: RDD[UserVisitAction] = getActionRDDByDateRange(spark, taskParm) //将使用者行為資訊轉換為k-v元組 val sessionidActionRDD: RDD[(String, UserVisitAction)] = userVisitActionRDD.map(uva => { (uva.session_id, uva) }) //緩存資料 sessionidActionRDD.persist(StorageLevel.MEMORY_ONLY) //得到每個session的所有使用者行為資料的RDD val sessionidActionsRDD: RDD[(String, Iterable[UserVisitAction])] = sessionidActionRDD.groupByKey() //生成每個session會話單挑頁面切片 即每個會話的單挑點 RDD[flag,1] val pageSplitRDD: RDD[(String, Int)] = generateAndMatchPageSplit(sparkContext, sessionidActionsRDD, taskParm) //擷取單跳頁面的PageVisit [flag,pv] println("-------------------------------------") println(pageSplitRDD.collect().mkString(",")) println("*************************************") val pageSplitPvMap: collection.Map[String, Long] = pageSplitRDD.countByKey() pageSplitPvMap.foreach(println) //求查詢條件的第一個頁面的PV val startPagePv: Long = getStartPagePV(taskParm,sessionidActionsRDD) //計算頁面流的各個頁面切片的轉換率 val convertRateMap: ListBuffer[(String, Double)] = computePageSplitConvertRate(taskParm,pageSplitPvMap,startPagePv) // val convertRateMap: RDD[(String, Double)] = spark.sparkContext.makeRDD(pageSplitConvertRateList) // convertRateMap //擷取任務的taskid val taskUUID: String = UUID.randomUUID().toString //持久化頁面切片的轉化率 persitConvertRate(spark,taskUUID,convertRateMap) } //持久化得到的頁面切片轉換率 def persitConvertRate(spark: SparkSession, taskUUID: String, convertRateMap: ListBuffer[(String, Double)]): Unit = { //資料類型為5_6=0.95|2_3=0.92|6_7=1.01|3_4=1.08|4_5=1.02|1_2=0.1 val convertRate: String = convertRateMap.map(item => { item._1 + "=" + item._2 }).mkString("|") //.toList.sortWith((x,y)=>{x.split("_")(0) < y.split("_")(0)}) println(convertRate) //封裝整理好的頁面切片轉化率和任務id的對象 val pageSplitConvertRateRDD: RDD[PageSplitConvertRate] = spark.sparkContext.makeRDD(Array(PageSplitConvertRate(taskUUID,convertRate))) import spark.implicits._ //儲存到資料庫 pageSplitConvertRateRDD.toDF().write .format("jdbc") .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)) .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)) .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)) .option("dbtable","page_split_convert_rate") .mode(SaveMode.Append) .save() } //計算頁面流的各個頁面的切片轉化率 def computePageSplitConvertRate(taskParm: JSONObject, pageSplitPvMap: collection.Map[String, Long], startPagePv: Long) = { val convertRateMap=new mutable.HashMap[String,Double]() //要求計算單挑轉換率的配置 1,2,3,4,5,6,7 -> 1_2 2_3 3_4 4_5 5_6 6_7 與之前的方法類似 var targetPageFlow: String = ParamUtils.getParam(taskParm,Constants.PARAM_TARGET_PAGE_FLOW) //targetPageFlow = targetPageFlow.sorted val targetPages: List[String] = targetPageFlow.split(",").toList val targetPagePairs: List[String] = targetPages.slice(0,targetPages.length-1).zip(targetPages.tail) .map(item=>(item._1+"_"+item._2)) //.sortWith((x,y)=>{x.split("_")(0) < y.split("_")(0)}) /*val tuples: List[(String, String)] = targetPages.slice(0,targetPages.length-1).zip(targetPages.tail) val tuplesSorted: List[(String, String)] = tuples.sortWith { case (x, y) => x._1 > x._2 y._1 > y._2 } val targetPagePairs: List[String] = tuplesSorted.map( item => item._1 + "_" + item._2 )*/ //優化排序 //val targetPagePairs: List[String] = targetPagePairsNoSort.sortWith((x,y)=>x.split("_")(0) < y.split("_")(0)) val list1: ListBuffer[String] = new mutable.ListBuffer[String] val list2:ListBuffer[Double] = new mutable.ListBuffer[Double] //更新頁面的flag的pv var lastPageSplitPv: Double = startPagePv.toDouble //周遊 targetPagePairs 取得每個targetPage 然後再更新頁面的pv //拿第一個頁面的pv與1_2 2_3 3_4 4_5 5_6 6_7 其他單挑 求出每個頁面的轉換率 for (targetPage <- targetPagePairs){ //從單挑頁面的pv的map中得到每一個頁面的轉換率 val targetPageSplitPv: Double = pageSplitPvMap(targetPage).toDouble //計算轉換率 val convertRate: Double = NumberUtils.formatDouble(targetPageSplitPv/lastPageSplitPv,2) //将每次的到的轉換率儲存到容器中 //convertRateMap.put(targetPage,convertRate) list1.append(targetPage) list2.append(convertRate) //為下一個單挑頁面轉換率的計算更新最新的pv值 lastPageSplitPv = targetPageSplitPv } val listsZip: ListBuffer[(String, Double)] = list1.zip(list2) //傳回計算好的頁面切片的轉化率 listsZip } //查詢開始頁面的通路 def getStartPagePV(taskParm: JSONObject, sessionidActionsRDD: RDD[(String, Iterable[UserVisitAction])]):Long = { //擷取開始的PARAM_TARGET_PAGE_FLOW 資訊 val targetPageFlow: String = ParamUtils.getParam(taskParm,Constants.PARAM_TARGET_PAGE_FLOW) //分割 轉Long 擷取第一個 1 val startPageId: Long = targetPageFlow.split(",")(0).toLong //對之前得到的使用者行為RDD進行先優化式過濾後map val startPageRDD: RDD[Long] = sessionidActionsRDD.flatMap { case (sid, uvas) => //過濾 映射 成RDD uvas.filter(startPageId == _.page_id).map(_.page_id) } startPageRDD.count() } //擷取每個會話的單跳切片 類似配置資訊 def generateAndMatchPageSplit(sparkContext: SparkContext, sessionidActionsRDD: RDD[(String, Iterable[UserVisitAction])], taskParm: JSONObject) = { //要求計算單挑轉換率 先得到配置 1,2,3,4,5,6,7 -> 1_2 2_3 3_4 4_5.... val targetPageFlow = ParamUtils.getParam(taskParm, Constants.PARAM_TARGET_PAGE_FLOW) val targetPages: List[String] = targetPageFlow.split(",").toList //擷取1,2,3,4,5,6 val targetPagesSlice: List[String] = targetPages.slice(0, targetPages.length - 1) //2,3,4,5,6,7 val targetPagesTail: List[String] = targetPages.tail //(1,2),(2,3),(3,4),(4,5),(5,6),(6,7) val targetPagesZip: List[(String, String)] = targetPagesSlice.zip(targetPagesTail) //将得到的list 按照查詢條件轉換成頁面單挑flag 得到查詢條件 val targetPagesPairs: List[String] = targetPagesZip.map(item => { item._1 + "_" + item._2 }) println(targetPagesPairs.mkString(",")) //将查詢條件的單跳flag結果 廣播出去 val targetPagePairsBroadcast: Broadcast[List[String]] = sparkContext.broadcast(targetPagesPairs) //對使用者的行為資料集進行處理,排序 對action_time進行排序 //不進行指派 直接傳回flatmap後RDD也是可以的 sessionidActionsRDD.flatMap { case (sessionid, uvas) => val sortUVASList: List[UserVisitAction] = uvas.toList //使用者的行為資料 按照時間排序 sortUVASList.sortWith( //對使用者的action_time進行排序 (uva1, uva2) => { DateUtils.parseTime(uva1.action_time).getTime < DateUtils.parseTime(uva2.action_time).getTime } ) //使用者順序通路的頁面 val soredPages: List[AnyVal] = sortUVASList.map(item => { if (item.page_id != null) item.page_id }) //目前session頁面的單跳flag println(soredPages.mkString(",")) val sessionPagePairs: List[String] = soredPages.slice(0, soredPages.length - 1).zip(soredPages.tail).map(item => item._1 + "_" + item._2) //過濾出需要統計的頁面單跳 sessionPagePairs.filter( targetPagePairsBroadcast.value.contains(_) ).map((_, 1)) } } //按照日期範圍查找資料 def getActionRDDByDateRange(spark: SparkSession, taskParm: JSONObject) = { //擷取開始時間 val startDate: String = ParamUtils.getParam(taskParm, Constants.PARAM_START_DATE) //擷取結束時間 val endDate = ParamUtils.getParam(taskParm, Constants.PARAM_END_DATE) //導入spark的隐式轉換 import spark.implicits._ //在表中查詢,注意變量需要用單引号引入 val SQLQuery = "select * from user_visit_action where date >= '" + startDate + "' and date <= '" + endDate + "'" //将查詢出的資料轉換成dataSet,再轉換成RDD val ds: Dataset[UserVisitAction] = spark.sql(SQLQuery).as[UserVisitAction] ds.rdd } }
-
各區域Top3商品統計
根據使用者指定的日期查詢條件範圍,統計各個區域下的最熱門【點選】的 top3商品,區域資訊、各個城市的資訊在項目中用固定值進行配置,因為不怎麼變動。
區域等級表
A 華北華東
B 華南華中
C 西北西南
D 東北其他
AreaTop3ProducetApp 類 package com.ityouxin.product import java.util.UUID import com.ityouxin.commons.conf.ConfigurationManager import com.ityouxin.commons.constant.Constants import com.ityouxin.commons.utils.ParamUtils import net.sf.json.JSONObject import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} //主要使用spark sql object AreaTop3ProductApp { //六 def main(args: Array[String]): Unit = { //x需求六 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("AreaTop3ProductApp") //初始化SparkSession val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() //擷取sc val sc: SparkContext = spark.sparkContext //根據配置工具類ConfigurationManager來擷取config,擷取任務配置 val jsonStr = ConfigurationManager.config.getString("task.params.json") //将擷取到的配置String轉換成json格式,便于傳遞 val taskParm: JSONObject = JSONObject.fromObject(jsonStr) //擷取開始時間 val startDate: String = ParamUtils.getParam(taskParm,Constants.PARAM_START_DATE) //擷取結束時間 val endDate = ParamUtils.getParam(taskParm,Constants.PARAM_END_DATE) //得到城市的點選行為RDD RDD["cirtid",Row("cityid","click_product_id")] val cityClickActionRDD: RDD[(Long, Row)] = getCityClickActionRDD(spark,startDate,endDate) //查詢城市資訊 val cityInfoRDD: RDD[(Long, Row)] = getCityInfoRDD(spark) //生成臨時表 tmp_click_product_basic //"city_id" "city_name" "area" "product_id" generateTempClickProductBasicTable(spark,cityClickActionRDD,cityInfoRDD) //生成各個區域各個商品之間的點選次數的臨時表 temp_area_product_click_count //"area" "product_id" "click_count" "city_infos" generateTempTemAreaProductClickCountTable(spark) //關聯商品資訊表 generateTempAreaFullProDuctClickCountTable(spark) //擷取每個區域的top3的商品 val areaTop3ProductDF: DataFrame = getAreaTop3ProductInfo(spark) //把統計好的資料寫入到Mysql資料庫中 val taskUUID = UUID.randomUUID().toString import spark.implicits._ val areaTop3RDD: RDD[AreaTop3Product] = areaTop3ProductDF.rdd.map( row => AreaTop3Product(taskUUID, row.getAs[String]("area"), row.getAs[String]("area_level"), row.getAs[Long]("product_id"), row.getAs[String]("city_infos"), row.getAs[Long]("click_count"), row.getAs[String]("product_name"), row.getAs[String]("product_status") ) ) areaTop3RDD.toDF().write .format("jdbc") .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL)) .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER)) .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)) .option("dbtable","area_top3_product") .mode(SaveMode.Append) .save() spark.close() } //擷取每個區域熱門top3的商品 def getAreaTop3ProductInfo(spark: SparkSession) = { val sql ="select area," + " case " + " when area='華北' or area='華東' THEN 'A'" + " when area='華南' or area='華中' THEN 'B'" + " when area='西北' or area='西南' THEN 'C'" + " else 'D'" + " end area_level" + " ,product_id ,click_count ,city_infos ,product_name,product_status from " + "(select area ,product_id ,click_count ,city_infos ,product_name,product_status ," + " row_number() Over ( partition by area order by click_count desc ) rn " + " from tmp_area_fullprod_click_count ) t where t.rn<=3" spark.sql(sql) } //進行表之間的關聯 tmp_area_product_click_count 和 product_info 關聯 def generateTempAreaFullProDuctClickCountTable(spark: SparkSession): Unit = { val sql ="select t.area , t.product_id,t.click_count,t.city_infos,p.product_name," + "if(get_json_object(p.extend_info,\"$.product_status\")='0','Self','Third Party') product_status" + " from tmp_area_product_click_count t join product_info p on t.product_id = p.product_id" val df: DataFrame = spark.sql(sql) df.show() df.createOrReplaceTempView("tmp_area_fullprod_click_count") } //擷取,每個區域的每個商品點選次數的臨時表 area" "product_id" "click_count" "city_infos" def generateTempTemAreaProductClickCountTable(spark: SparkSession): Unit = { val sql ="select t.area,t.product_id,count(*) click_count ," + "concat_ws(\",\",collect_set(concat_ws(\":\",t.city_id,t.city_name))) city_infos" + " from tmp_click_product_basic t group by t.area,t.product_id" val df: DataFrame = spark.sql(sql) df.show() df.createOrReplaceTempView("tmp_area_product_click_count") } //産生臨時表 tmp_click_product_basic 存儲資料 "city_id" "city_name" "area" "product_id" def generateTempClickProductBasicTable(spark: SparkSession, cityClickActionRDD: RDD[(Long, Row)], cityInfoRDD: RDD[(Long, Row)]) = { //将城市點選行為RDD與城市資訊RDDjoin val joinRDD: RDD[(Long, (Row, Row))] = cityClickActionRDD.join(cityInfoRDD) val mappedRDD: RDD[(Long, String, String, Long)] = joinRDD.map { case (cityid, (action, cityInfo)) => val productId: Long = action.getLong(1) val cityName: String = cityInfo.getString(1) val area: String = cityInfo.getString(2) (cityid, cityName, area, productId) } import spark.implicits._ val df: DataFrame = mappedRDD.toDF("city_id","city_name","area","product_id") df.show() //建立臨時表 df.createOrReplaceTempView("tmp_click_product_basic") } //查詢尋城市資訊 def getCityInfoRDD(spark: SparkSession) = { val cityInfo = Array( (0L, "北京", "華北"), (1L, "上海", "華東"), (2L, "南京", "華東"), (3L, "廣州", "華南"), (4L, "三亞", "華南"), (5L, "武漢", "華中"), (6L, "長沙", "華中"), (7L, "西安", "西北"), (8L, "成都", "西南"), (9L, "哈爾濱", "東北")) import spark.implicits._ val cityInfoRDD: RDD[(Long, String, String)] = spark.sparkContext.makeRDD(cityInfo) val cityInfoDF: DataFrame = cityInfoRDD.toDF("city_id","city_name","area") //轉換格式 cityInfoDF.rdd.map(item => (item.getAs[Long]("city_id"),item)) } //擷取城市的點選行為資料 def getCityClickActionRDD(spark: SparkSession, startDate: String, endDate: String) = { val clickActionRDDDF: DataFrame = spark.sql("select city_id,click_product_id from user_visit_action " + "where click_product_id is not null and click_product_id !=-1 " + "and date >= '" + startDate + "' and date <= '" + endDate + "'") clickActionRDDDF.rdd.map( item =>{ (item.getAs[Long]("city_id"),item) } ) } //查詢cityInfoRDD的資訊 }
AreaTop3Product的封裝樣例類 package com.ityouxin.product case class AreaTop3Product ( taskId:String, area:String, areaLevel:String, productid:Long, cityInfos:String, clickCount:Long, productName:String, productStatus:String)
HiveDB 擷取每個表中的資料 package com.ityouxin.product import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession object HiveDB { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HiveDB") //初始化SparkSession val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() //擷取sc val sc: SparkContext = spark.sparkContext //擷取資料結構資訊 spark.sql("show tables").show() spark.sql("desc user_visit_action").show() spark.sql("slect * from user_visit_action").show(5,false) spark.sql("desc product_info").show(5,false) spark.sql("desc user_info").show() spark.sql("select * from user_info").show(5,false) } }
- 廣告黑名單實時統計
實作實時的動态黑名單機制:将每天對某個廣告點選超過 100 次的使用者拉黑。
資料源解析
Kafka資料: timestamp province city userid adid
資料結構:
((0L, “北京”, “華北”), (1L, “上海”, “華東”), (2L, “南京”, “華東”), (3L, “廣州”, “華南”), (4L,“三亞”, “華南”), (5L, “武漢”, “華中”), (6L, “長沙”, “華中”), (7L, “西安”, “西北”), (8L, "成都 ", “西南”), (9L, “哈爾濱”, “東北”))
- 廣告點選實時統計‘
每天各省各城市各廣告的點選流量實時統計。
資料源解析:
Kafka資料: timestamp province city userid adid
-
各省熱門廣告實時統計
統計每天各省 top3 熱門廣告
資料源解析:
資料來源于需求八 updateStateByKey 得到的Dstream
Dstream[( dateKey_province_city_adid , count)]
-
最近一個小時廣告點選實時統計
統計各廣告最近 1 小時内的點選量趨勢:各廣告最近 1 小時内各分鐘的點選量
資料源解析:
Kafka資料源 timestamp province city userid adid
需求7-10具體實作
AdClickRealTimeStat 類
package com.ityouxin.advertise
import java.util.Date
import com.ityouxin.commons.conf.ConfigurationManager
import com.ityouxin.commons.utils.DateUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import scala.collection.mutable.ArrayBuffer
object AdClickRealTimeStat {
def main(args: Array[String]): Unit = {
//初始化配置資訊
val conf: SparkConf = new SparkConf().setAppName("AdClickRealTimeStat").setMaster("local[*]")
//初始化SparkSession
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
//初始化sc
val sc: SparkContext = spark.sparkContext
val ssc = new StreamingContext(sc,Seconds(5))
//設定檢查點
ssc.checkpoint("./streaming_checkpoint")
//得到kafka的配置資訊
val broker_list=ConfigurationManager.config.getString("kafka.broker.list")
//擷取建立topics的配置
val topics: String = ConfigurationManager.config.getString("kafka.topics")
//擷取kafka的參數資訊 将 kafka 參數映射為 map
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker_list,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> "adverter",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
)
//建立一個流來讀取kafka資料源 得到實時的DS
val adRealTimeDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams)
)
//得到日志的value資料
val adRealTimeLogValueDS: DStream[String] = adRealTimeDS.map(item => item.value())
adRealTimeLogValueDS.print()
//需求七
//根據黑名單用使用者資料進行過濾
val filteredAdRealTimeLogDS:DStream[(Long,String)] = filterByBlackList(spark,adRealTimeLogValueDS)
filteredAdRealTimeLogDS.print()
//需求七
//動态添加黑名單
generateDynamicBlackList(filteredAdRealTimeLogDS)
//八
//統計每天省市廣告的點選流量
val aggregatedDS:DStream[(String,Long)] = calculateRealTimeStat(filteredAdRealTimeLogDS)
//九
//計算每天每個省Top3的熱門廣告
calculateRealTimeProvinceTop3Ad(spark,aggregatedDS)
//需求十
//計算最近每個小時的滑動視窗内的和廣告每分鐘的點選量趨勢
calculateAdClickCountByWindow(adRealTimeLogValueDS)
//啟動ssc
ssc.start()
ssc.awaitTermination()
}
//計算最近每個小時的滑動視窗内的和廣告每分鐘的點選量趨勢
def calculateAdClickCountByWindow(adRealTimeLogValueDS: DStream[String]) = {
//擷取每分鐘廣告的點選量
val pairDStream: DStream[(String, Long)] = adRealTimeLogValueDS.map {
case log =>
val logSplited: Array[String] = log.split(" ")
//得到分鐘 日期格式為yyyyMMddHHmm
val timeMinute: String = DateUtils.formatTimeMinute(new Date(logSplited(0).toLong))
//得到adid
val adid: Long = logSplited(4).toLong
//傳回每分鐘的點選量廣告id
(timeMinute + "_" + adid, 1L)
}
//進行擷取窗戶大小為1小時的DS
val aggrDS: DStream[(String, Long)] = pairDStream.reduceByKeyAndWindow(
(a: Long, b: Long) => a + b,
Minutes(60L),
Seconds(10L)
)
//對得到的視窗函數進行周遊,拼裝出最新的使用者每小時每分鐘的點選量字元串
aggrDS.foreachRDD{
rdd =>
//對每個分區進行周遊
rdd.foreachPartition{
items =>
//建立一個AdClickTrend類型的容器
val adClickTrends = ArrayBuffer[AdClickTrend]()
//格式 items timeMinute + "_" + adid,count
for (item <- items){
val keySplited: Array[String] = item._1.split("_")
//yyyyMMddHHmm
val dateMinute: String = keySplited(0)
val adid = keySplited(1).toLong
val clickCount = item._2
//重新拼接出需要的日期格式 yyyy-MM-dd HH:mm
val date = DateUtils.formatDate(DateUtils.parseDateKey(dateMinute.substring(0,8)))
val hour = dateMinute.substring(8,10)
val minute = dateMinute.substring(10)
adClickTrends += AdClickTrend(date,hour,minute,adid,clickCount)
}
//入庫
AdClickTrendDAO.updateBatch(adClickTrends.toArray)
}
}
}
//計算每天每個省的top3的熱門廣告
def calculateRealTimeProvinceTop3Ad(spark: SparkSession,
aggregatedDS: DStream[(String, Long)]) = {
//轉換各省份次元的廣告點選量
val top3DStream: DStream[Row] = aggregatedDS.transform {
//拼接字元串 從原始的資料中映射 組成新的rdd
rdd =>
val mappedRDD: RDD[(String, Long)] = rdd.map {
case (key, count) =>
val keySplited: Array[String] = key.split("_")
val date: String = keySplited(0)
val province: String = keySplited(1)
val adid: Long = keySplited(3).toLong
val clickCount = count
val provkey = date + "_" + province + "_" + adid
(provkey, count)
}
//每天每個省ad的點選量
val dailyAdClickCountByProvinceRDD: RDD[(String, Long)] = mappedRDD.reduceByKey(_ + _)
val rowRDD: RDD[(String, String, Long, Long)] = dailyAdClickCountByProvinceRDD.map {
case (provkey, count) =>
//date + "_" + province + "_" + adid
val provSplitKey: Array[String] = provkey.split("_")
val date: String = provSplitKey(0)
val province: String = provSplitKey(1)
val adid = provSplitKey(2).toLong
val clickeCount = count
//格式化時間
val dateFormat = DateUtils.formatDate(DateUtils.parseDateKey(date))
(dateFormat, province, adid, clickeCount)
}
import spark.implicits._
val df: DataFrame = rowRDD.toDF("date", "province", "ad_id", "click_count")
//建立一個臨時表 将資料出存入臨時表中
df.createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")
//Sql執行Top3的查詢 查詢出Top3的資料
val provincrTop3AdDF: DataFrame = spark.sql("select date,province,ad_id,click_count from " +
"(select date,province,ad_id,click_count," +
"row_number() over( partition by province order by click_count desc ) rn " +
"from tmp_daily_ad_click_count_by_prov ) t where t.rn <=3")
provincrTop3AdDF.rdd
}
//對每天每省的廣告的Top3進行周遊後進行重新封裝
top3DStream.foreachRDD{
rdd =>
rdd.foreachPartition{
items =>
val adProvinceTop3s = ArrayBuffer[AdProvinceTop3]()
//date, province, ad_id, clicke_Count
for (item <- items){
val date: String = item.getString(0)
val province: String = item.getString(1)
val adid: Long = item.getLong(2)
val clickCount: Long = item.getLong(3)
//将封裝好的對象依次存入容器中
adProvinceTop3s += AdProvinceTop3(date,province,adid,clickCount)
}
//入庫 更新
AdProvinceTop3DAO.updateBatch(adProvinceTop3s.toArray)
}
}
}
//動态統計每個省市的每天廣告的點選流量
def calculateRealTimeStat(filteredAdRealTimeLogDS: DStream[(Long, String)]): DStream[(String, Long)] = {
val mappedDS: DStream[(String, Long)] = filteredAdRealTimeLogDS.map {
case (userid, log) =>
val logSplited: Array[String] = log.split(" ")
val timeStamp: String = logSplited(0)
val date: String = DateUtils.formatDateKey(new Date(timeStamp.toLong))
val province: String = logSplited(1)
val city: String = logSplited(2)
val adid: Long = logSplited(4).toLong
val key = date + "_" + province + "_" + city + "_" + adid
(key, 1L)
}
//有狀态轉換DS
val aggregatedDS: DStream[(String, Long)] = mappedDS.updateStateByKey[Long] {
(values: Seq[Long], old: Option[Long]) => {
Some(values.sum + old.getOrElse(0L))
}
}
aggregatedDS.foreachRDD{
rdd =>
rdd.foreachPartition{
items =>
val adStats = ArrayBuffer[AdStat]()
//date + "_" + province + "_" + city + "_" + adid
for (item <- items){
val keySplited:Array[String] = item._1.split("_")
val date = keySplited(0)
val province = keySplited(1)
val city = keySplited(2)
val adid = keySplited(3).toLong
val clickCount = item._2
adStats += AdStat(date,province,city,adid,clickCount)
}
//入庫
AdStatDAO.updateBatch(adStats.toArray)
}
}
aggregatedDS
}
//動态添加黑名單
def generateDynamicBlackList(filteredAdRealTimeLogDS: DStream[(Long, String)]) = {
//每天每個使用者每個廣告對應的DS
val dailyUserAdClickDstream: DStream[(String, Long)] = filteredAdRealTimeLogDS.map {
//資料格式=timestamp province city userid adid
case (userid, log) =>
val logSplited: Array[String] = log.split(" ")
val timestamp: String = logSplited(0)
val date = new Date(timestamp.toLong)
//轉換日期的資料格式 yyyyMMdd
val dateKey: String = DateUtils.formatDateKey(date)
val adid: String = logSplited(4)
//拼接字元串
val key: String = dateKey + "_" + userid + "_" + key
(key, 1L)
}
//聚合計算每天每個使用者每個廣告 點選的總數
val dailyUserADClickCountDS: DStream[(String, Long)] = dailyUserAdClickDstream.reduceByKey(_+_)
dailyUserADClickCountDS.foreachRDD{
rdd =>
rdd.foreachPartition{items:Iterator[(String,Long)]=>
//定義容器 用來存放廣告點選數
val adUserClickCounts: ArrayBuffer[AdUserClickCount] = ArrayBuffer[AdUserClickCount]()
//item -> yyyyMMdd_userid_adid,count 周遊出日期和userid adid count 然後進行更新到容器中
for (item <- items){
//分割字元串
val keySplited: Array[String] = item._1.split("_")
//yyyy-MM-dd
val date = DateUtils.formatDate(DateUtils.parseDateKey(item._1.split("_")(0)))
//userid
val userid: Long = keySplited(1).toLong
//adid
val adid: Long = keySplited(2).toLong
//clickCount
val clickCount = item._2
adUserClickCounts += AdUserClickCount(date,userid,adid,clickCount)
}
//入庫
AdUserClickCountDAO.updateBatch(adUserClickCounts.toArray)
}
}
//判斷使用者的操作行為是否超過或者等于100
val blackListDS: DStream[(String, Long)] = dailyUserADClickCountDS.filter {
case (key, count) =>
val keySplited: Array[String] = key.split("_")
val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
//userid
val userid: Long = keySplited(1).toLong
//adid
val adid: Long = keySplited(2).toLong
//查詢莫一天莫以使用者對某一廣告點選總數
val clickCount: Int = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)
if (clickCount >= 100) {
true
} else {
false
}
}
blackListDS
//有可能一個使用者會點選多個廣告,是以需要去重
val blackListUserDS: DStream[Long] = blackListDS.map(item => item._1.split("_")(1).toLong)
val distinctBlackUserDS: DStream[Long] = blackListUserDS.transform(rdd =>
rdd.distinct()
)
distinctBlackUserDS.foreachRDD{
rdd =>
rdd.foreachPartition{
items =>
val adBlackLists: ArrayBuffer[AdBlacklist] = ArrayBuffer[AdBlacklist]()
//userid
for (item <- items){
adBlackLists += AdBlacklist(item)
}
AdBlacklistDAO.insertBatch(adBlackLists.toArray)
}
}
}//end 動态入庫
//根據黑名單使用者資料進行過濾
def filterByBlackList(spark: SparkSession,
adRealTimeLogValueDS: DStream[String]): DStream[(Long, String)] = {
adRealTimeLogValueDS.transform{
rdd =>
//查詢黑名單使用者資料
val blacklists: Array[AdBlacklist] = AdBlacklistDAO.findAll()
val blcaklistRDD: RDD[(Long, Boolean)] = spark.sparkContext.makeRDD(
blacklists.map(item=>(item.userid,true))
)
//轉換格式 log:(timestamp privince city userid adid) =>(userid,log)
val mappedRDD: RDD[(Long, String)] = rdd.map((log:String)=>{
val userid: Long = log.split(" ")(3).toLong
(userid,log)
})
//把一批資料和黑名單資料進行左外連接配接
val joinedRDD: RDD[(Long, (String, Option[Boolean]))] = mappedRDD.leftOuterJoin(blcaklistRDD)
//對連接配接厚度額資料進行過濾,過濾掉黑名單使用者的log
val filteredRDD: RDD[(Long, (String, Option[Boolean]))] = joinedRDD.filter{
case (userid,(log,black))=>
if (black.isDefined && black.get) false else true
}
filteredRDD.map{
case(userid,(log,black))=>
(userid,log)
}
}
}
}
DataModel 類 資料模型
package com.ityouxin.advertise
/**
* 廣告黑名單
*
*
*/
case class AdBlacklist(userid:Long)
/**
* 使用者廣告點選量
*
*
*/
case class AdUserClickCount(date:String,
userid:Long,
adid:Long,
clickCount:Long)
/**
* 廣告實時統計
*
*
*/
case class AdStat(date:String,
province:String,
city:String,
adid:Long,
clickCount:Long)
/**
* 各省top3熱門廣告
*
*
*/
case class AdProvinceTop3(date:String,
province:String,
adid:Long,
clickCount:Long)
/**
* 廣告點選趨勢
*
*
*/
case class AdClickTrend(date:String,
hour:String,
minute:String,
adid:Long,
clickCount:Long)
JDBCHelper 入庫操作
package com.ityouxin.advertise
import java.sql.ResultSet
import com.ityouxin.commons.pool.{CreateMySqlPool, QueryCallback}
import scala.collection.mutable.ArrayBuffer
/**
* 使用者黑名單DAO類
*/
object AdBlacklistDAO {
/**
* 批量插入廣告黑名單使用者
*
* @param adBlacklists
*/
def insertBatch(adBlacklists: Array[AdBlacklist]) {
// 批量插入
val sql = "INSERT INTO ad_blacklist VALUES(?)"
val paramsList = new ArrayBuffer[Array[Any]]()
// 向paramsList添加userId
for (adBlacklist <- adBlacklists) {
val params: Array[Any] = Array(adBlacklist.userid)
paramsList += params
}
// 擷取對象池單例對象
val mySqlPool = CreateMySqlPool()
// 從對象池中提取對象
val client = mySqlPool.borrowObject()
// 執行批量插入操作
client.executeBatch(sql, paramsList.toArray)
// 使用完成後将對象傳回給對象池
mySqlPool.returnObject(client)
}
/**
* 查詢所有廣告黑名單使用者
*
* @return
*/
def findAll(): Array[AdBlacklist] = {
// 将黑名單中的所有資料查詢出來
val sql = "SELECT * FROM ad_blacklist"
val adBlacklists = new ArrayBuffer[AdBlacklist]()
// 擷取對象池單例對象
val mySqlPool = CreateMySqlPool()
// 從對象池中提取對象
val client = mySqlPool.borrowObject()
// 執行sql查詢并且通過處理函數将所有的userid加入array中
client.executeQuery(sql, null, new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val userid = rs.getInt(1).toLong
adBlacklists += AdBlacklist(userid)
}
}
})
// 使用完成後将對象傳回給對象池
mySqlPool.returnObject(client)
adBlacklists.toArray
}
}
/**
* 使用者廣告點選量DAO實作類
*
*/
object AdUserClickCountDAO {
def updateBatch(adUserClickCounts: Array[AdUserClickCount]) {
// 擷取對象池單例對象
val mySqlPool = CreateMySqlPool()
// 從對象池中提取對象
val client = mySqlPool.borrowObject()
// 首先對使用者廣告點選量進行分類,分成待插入的和待更新的
val insertAdUserClickCounts = ArrayBuffer[AdUserClickCount]()
val updateAdUserClickCounts = ArrayBuffer[AdUserClickCount]()
val selectSQL = "SELECT count(*) FROM ad_user_click_count WHERE date=? AND userid=? AND adid=? "
for (adUserClickCount <- adUserClickCounts) {
val selectParams: Array[Any] = Array(adUserClickCount.date, adUserClickCount.userid, adUserClickCount.adid)
// 根據傳入的使用者點選次數統計資料從已有的ad_user_click_count中進行查詢
client.executeQuery(selectSQL, selectParams, new QueryCallback {
override def process(rs: ResultSet): Unit = {
// 如果能查詢到并且點選次數大于0,則認為是待更新項
if (rs.next() && rs.getInt(1) > 0) {
updateAdUserClickCounts += adUserClickCount
} else {
insertAdUserClickCounts += adUserClickCount
}
}
})
}
// 執行批量插入
val insertSQL = "INSERT INTO ad_user_click_count VALUES(?,?,?,?)"
val insertParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
// 将待插入項全部加入到參數清單中
for (adUserClickCount <- insertAdUserClickCounts) {
insertParamsList += Array[Any](adUserClickCount.date, adUserClickCount.userid, adUserClickCount.adid, adUserClickCount.clickCount)
}
// 執行批量插入
client.executeBatch(insertSQL, insertParamsList.toArray)
// 執行批量更新
// clickCount=clickCount + :此處的UPDATE是進行累加
val updateSQL = "UPDATE ad_user_click_count SET clickCount=clickCount + ? WHERE date=? AND userid=? AND adid=?"
val updateParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
// 将待更新項全部加入到參數清單中
for (adUserClickCount <- updateAdUserClickCounts) {
updateParamsList += Array[Any](adUserClickCount.clickCount, adUserClickCount.date, adUserClickCount.userid, adUserClickCount.adid)
}
// 執行批量更新
client.executeBatch(updateSQL, updateParamsList.toArray)
// 使用完成後将對象傳回給對象池
mySqlPool.returnObject(client)
}
/**
* 根據多個key查詢使用者廣告點選量
*
* @param date 日期
* @param userid 使用者id
* @param adid 廣告id
* @return
*/
def findClickCountByMultiKey(date: String, userid: Long, adid: Long): Int = {
// 擷取對象池單例對象
val mySqlPool = CreateMySqlPool()
// 從對象池中提取對象
val client = mySqlPool.borrowObject()
val sql = "SELECT clickCount FROM ad_user_click_count " +
"WHERE date=? " +
"AND userid=? " +
"AND adid=?"
var clickCount = 0
val params = Array[Any](date, userid, adid)
// 根據多個條件查詢指定使用者的點選量,将查詢結果累加到clickCount中
client.executeQuery(sql, params, new QueryCallback {
override def process(rs: ResultSet): Unit = {
if (rs.next()) {
clickCount = rs.getInt(1)
}
}
})
// 使用完成後将對象傳回給對象池
mySqlPool.returnObject(client)
clickCount
}
}
/**
* 廣告實時統計DAO實作類
*
*
*
*/
object AdStatDAO {
def updateBatch(adStats: Array[AdStat]) {
// 擷取對象池單例對象
val mySqlPool = CreateMySqlPool()
// 從對象池中提取對象
val client = mySqlPool.borrowObject()
// 區分開來哪些是要插入的,哪些是要更新的
val insertAdStats = ArrayBuffer[AdStat]()
val updateAdStats = ArrayBuffer[AdStat]()
val selectSQL = "SELECT count(*) " +
"FROM ad_stat " +
"WHERE date=? " +
"AND province=? " +
"AND city=? " +
"AND adid=?"
for (adStat <- adStats) {
val params = Array[Any](adStat.date, adStat.province, adStat.city, adStat.adid)
// 通過查詢結果判斷目前項時待插入還是待更新
client.executeQuery(selectSQL, params, new QueryCallback {
override def process(rs: ResultSet): Unit = {
if (rs.next() && rs.getInt(1) > 0) {
updateAdStats += adStat
} else {
insertAdStats += adStat
}
}
})
}
// 對于需要插入的資料,執行批量插入操作
val insertSQL = "INSERT INTO ad_stat VALUES(?,?,?,?,?)"
val insertParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
for (adStat <- insertAdStats) {
insertParamsList += Array[Any](adStat.date, adStat.province, adStat.city, adStat.adid, adStat.clickCount)
}
client.executeBatch(insertSQL, insertParamsList.toArray)
// 對于需要更新的資料,執行批量更新操作
// 此處的UPDATE是進行覆寫
val updateSQL = "UPDATE ad_stat SET clickCount=? " +
"WHERE date=? " +
"AND province=? " +
"AND city=? " +
"AND adid=?"
val updateParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
for (adStat <- updateAdStats) {
updateParamsList += Array[Any](adStat.clickCount, adStat.date, adStat.province, adStat.city, adStat.adid)
}
client.executeBatch(updateSQL, updateParamsList.toArray)
// 使用完成後将對象傳回給對象池
mySqlPool.returnObject(client)
}
}
/**
* 各省份top3熱門廣告DAO實作類
*
*
*
*/
object AdProvinceTop3DAO {
def updateBatch(adProvinceTop3s: Array[AdProvinceTop3]) {
// 擷取對象池單例對象
val mySqlPool = CreateMySqlPool()
// 從對象池中提取對象
val client = mySqlPool.borrowObject()
// dateProvinces可以實作一次去重
// AdProvinceTop3:date province adid clickCount,由于每條資料由date province adid組成
// 當隻取date province時,一定會有重複的情況
val dateProvinces = ArrayBuffer[String]()
for (adProvinceTop3 <- adProvinceTop3s) {
// 組合新key
val key = adProvinceTop3.date + "_" + adProvinceTop3.province
// dateProvinces中不包含目前key才添加
// 借此去重
if (!dateProvinces.contains(key)) {
dateProvinces += key
}
}
// 根據去重後的date和province,進行批量删除操作
// 先将原來的資料全部删除
val deleteSQL = "DELETE FROM ad_province_top3 WHERE date=? AND province=?"
val deleteParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
for (dateProvince <- dateProvinces) {
val dateProvinceSplited = dateProvince.split("_")
val date = dateProvinceSplited(0)
val province = dateProvinceSplited(1)
val params = Array[Any](date, province)
deleteParamsList += params
}
client.executeBatch(deleteSQL, deleteParamsList.toArray)
// 批量插入傳入進來的所有資料
val insertSQL = "INSERT INTO ad_province_top3 VALUES(?,?,?,?)"
val insertParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
// 将傳入的資料轉化為參數清單
for (adProvinceTop3 <- adProvinceTop3s) {
insertParamsList += Array[Any](adProvinceTop3.date, adProvinceTop3.province, adProvinceTop3.adid, adProvinceTop3.clickCount)
}
client.executeBatch(insertSQL, insertParamsList.toArray)
// 使用完成後将對象傳回給對象池
mySqlPool.returnObject(client)
}
}
/**
* 廣告點選趨勢DAO實作類
*
*
*
*/
object AdClickTrendDAO {
def updateBatch(adClickTrends: Array[AdClickTrend]) {
// 擷取對象池單例對象
val mySqlPool = CreateMySqlPool()
// 從對象池中提取對象
val client = mySqlPool.borrowObject()
// 區分開來哪些是要插入的,哪些是要更新的
val updateAdClickTrends = ArrayBuffer[AdClickTrend]()
val insertAdClickTrends = ArrayBuffer[AdClickTrend]()
val selectSQL = "SELECT count(*) " +
"FROM ad_click_trend " +
"WHERE date=? " +
"AND hour=? " +
"AND minute=? " +
"AND adid=?"
for (adClickTrend <- adClickTrends) {
// 通過查詢結果判斷目前項時待插入還是待更新
val params = Array[Any](adClickTrend.date, adClickTrend.hour, adClickTrend.minute, adClickTrend.adid)
client.executeQuery(selectSQL, params, new QueryCallback {
override def process(rs: ResultSet): Unit = {
if (rs.next() && rs.getInt(1) > 0) {
updateAdClickTrends += adClickTrend
} else {
insertAdClickTrends += adClickTrend
}
}
})
}
// 執行批量更新操作
// 此處的UPDATE是覆寫
val updateSQL = "UPDATE ad_click_trend SET clickCount=? " +
"WHERE date=? " +
"AND hour=? " +
"AND minute=? " +
"AND adid=?"
val updateParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
for (adClickTrend <- updateAdClickTrends) {
updateParamsList += Array[Any](adClickTrend.clickCount, adClickTrend.date, adClickTrend.hour, adClickTrend.minute, adClickTrend.adid)
}
client.executeBatch(updateSQL, updateParamsList.toArray)
// 執行批量更新操作
val insertSQL = "INSERT INTO ad_click_trend VALUES(?,?,?,?,?)"
val insertParamsList: ArrayBuffer[Array[Any]] = ArrayBuffer[Array[Any]]()
for (adClickTrend <- insertAdClickTrends) {
insertParamsList += Array[Any](adClickTrend.date, adClickTrend.hour, adClickTrend.minute, adClickTrend.adid, adClickTrend.clickCount)
}
client.executeBatch(insertSQL, insertParamsList.toArray)
// 使用完成後将對象傳回給對象池
mySqlPool.returnObject(client)
}
}