今天我就做一個簡單的小分享 ,四個小案例 分别是PV和UV的scala版本和java版本的
在學習scala的我們要知道,scala與java是可以互通的
話不多說 直接來看我們今天的需求 :
1 . PV是頁面浏覽量,在這裡即是日志記錄數。
2 . UV是使用者通路量(獨立訪客)。把日志資料中的ip位址去重後,再進行統計即可。
首先我們準備好已經采集的點選流資料
大緻長這個樣子 這裡我放大概20條資料用來才考參考 但是我為了展現spark的厲害我這裡實際資料就是差不多1w多條吧
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPBJmaOJDW0ZkMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZwpmL4kDO4ATNzEjM2IDNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
221.130.41.168 - - [18/Sep/2013:06:50:37 +0000] "GET /feed/ HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
157.55.35.40 - - [18/Sep/2013:06:51:13 +0000] "GET /robots.txt HTTP/1.1" 200 150 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"
50.116.27.194 - - [18/Sep/2013:06:51:35 +0000] "POST /wp-cron.php?doing_wp_cron=1379487095.2510800361633300781250 HTTP/1.0" 200 0 "-" "WordPress/3.6; http://blog.fens.me"
58.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /nodejs-socketio-chat/ HTTP/1.1" 200 10818 "http://www.google.com/url?sa=t&rct=j&q=nodejs%20%E5%BC%82%E6%AD%A5%E5%B9%BF%E6%92%AD&source=web&cd=1&cad=rja&ved=0CCgQFjAA&url=%68%74%74%70%3a%2f%2f%62%6c%6f%67%2e%66%65%6e%73%2e%6d%65%2f%6e%6f%64%65%6a%73%2d%73%6f%63%6b%65%74%69%6f%2d%63%68%61%74%2f&ei=rko5UrylAefOiAe7_IGQBw&usg=AFQjCNG6YWoZsJ_bSj8kTnMHcH51hYQkAA&bvm=bv.52288139,d.aGc" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
58.215.204.118 - - [18/Sep/2013:06:51:36 +0000] "GET /wp-includes/js/jquery/jquery-migrate.min.js?ver=1.2.1 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
58.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
58.215.204.118 - - [18/Sep/2013:06:51:36 +0000] "GET /wp-includes/js/comment-reply.min.js?ver=3.6 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
58.215.204.118 - - [18/Sep/2013:06:51:36 +0000] "GET /wp-content/uploads/2013/08/chat.png HTTP/1.1" 200 48968 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
接下來是PV java版本的:
public static void main(String[] args) {
// 1.建立SparkConf對象。設定appName和master位址
SparkConf sparkConf = new SparkConf().setAppName("java-PV-APP").setMaster("local[2]");
// 2.建立SparkContext對象
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 3.讀取資料檔案
JavaRDD<String> contentRDD = jsc.textFile("D:\\AA\\input\\access.log");
// 4.将一行資料作為輸入,輸出(pv,1)
JavaPairRDD<String, Integer> pvAndOneRDD = contentRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>("pv", 1);
}
});
// 5.聚合操作
JavaPairRDD<String, Integer> resultRDD = pvAndOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 6.收集資料
Map<String, Integer> finalResult = resultRDD.collectAsMap();
System.out.println(finalResult);
// 7.釋放SparkContext
jsc.stop();
}
這個是測試的結果
這個是PV scala版的
/**
* spark實作點選流日志分析案例:PV
*/
object PVScala {
// 執行入口
def main(args: Array[String]): Unit = {
// 1.建立SparkConf對象。設定appName和master位址
val conf: SparkConf = new SparkConf().setAppName("scala-PV-APP").setMaster("local[2]")
// 2.建立SparkContext對象
val context: SparkContext = new SparkContext(conf)
context.setLogLevel("WARN")
// 3.讀取資料檔案
val contentRDD: RDD[String] = context.textFile("D:\\AA\\input\\access.log")
// 4.将一行資料作為輸入,輸出(pv,1)
val pvAndOneRDD: RDD[(String, Int)] = contentRDD.map(x=>("pv",1))
// 5.聚合輸出
val resultRDD: RDD[(String, Int)] = pvAndOneRDD.reduceByKey(_+_)
// 6.收集資料
val finalRDD: Array[(String, Int)] = resultRDD.collect
println(finalRDD.toBuffer)
// 7.關閉SparkContext
context.stop()
}
}
測試結果如下:
可以看到 java版和scala版本的結果是一樣的
然後我們來看一下UV
首先是java版本的
public static void main(String[] args) {
// 1.建立SparkConf對象。設定appName和master位址
SparkConf sparkConf = new SparkConf().setAppName("java-UV-APP").setMaster("local[2]");
// 2.建立SparkContext對象
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 3.讀取資料檔案
JavaRDD<String> contentRDD = jsc.textFile("D:\\AA\\input\\access.log");
// 4.取出每一行資料中的ip位址
JavaRDD<String> ipsRDD = contentRDD.map(new Function<String, String>() {
public String call(String v1) throws Exception {
String[] arr = v1.split(" ");
return arr[0];
}
});
// 5.重ip位址,最後輸出格式 ("UV",1)
JavaPairRDD<String, Integer> uvAndOneRDD = ipsRDD.distinct().mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>("uv", 1);
}
});
// 6.聚合輸出
JavaPairRDD<String, Integer> resultRDD = uvAndOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 7.收集資料
Map<String, Integer> stringIntegerMap = resultRDD.collectAsMap();
System.out.println(stringIntegerMap);
// 8.釋放SparkContext
jsc.stop();
}
然後給上去重的總量:
接下來就是scala版本的:
object UVScala {
// 執行入口
def main(args: Array[String]): Unit = {
// 1.建立SparkConf對象。設定appName和master位址
val conf: SparkConf = new SparkConf().setAppName("scala-UV-APP").setMaster("local[2]")
// 2.建立SparkContext對象
val context: SparkContext = new SparkContext(conf)
context.setLogLevel("WARN")
// 3.讀取資料檔案
val contentRDD: RDD[String] = context.textFile("D:\\AA\\input\\access.log")
// 4.取出每一行資料中的ip位址
val ipsRDD: RDD[String] = contentRDD.map(_.split(" ")).map(x=>x(0))
// 5.去重ip位址,最後輸出格式 ("UV",1)
val uvAndOneRDD: RDD[(String, Int)] = ipsRDD.distinct().map(x=>("uv",1))
// 6.聚合輸出
val resultRDD: RDD[(String, Int)] = uvAndOneRDD.reduceByKey(_+_)
// 7.收集資料
val finalRDD: Array[(String, Int)] = resultRDD.collect
println(finalRDD.toBuffer)
// 8.關閉SparkContext
context.stop()
}
}
我們看一下結果如何 是否是和java版本的結果是一樣的:
測試結果可以看到,兩個版本的結果是一樣的。
從兩者代碼上來看的話,對于scala會比較優雅,寫出來會簡單一點,配合着神奇的 “_” 對于我們的代碼會更加優雅
今天簡單的分享到一個PV和UV ,下次更新的話應該會簡單的講解TopN即通路量
最高的N。