文章目錄 [hide]
- 1 Spark SQL中Window API
- 2 時間序列資料
- 3 将時間序列資料導入到DataFrame中
- 4 計算2016年Apple股票周平均收盤價格
- 4.1 步驟一:找出2016年的股票交易資料
- 4.2 步驟二:計算平均值
- 4.3 步驟三:列印window的值
- 5 帶有開始時間的Time window
- 6 資料檔案下載下傳
Spark SQL中Window API
Spark SQL中的window API是從1.4版本開始引入的,以便支援更智能的分組功能。這個功能對于那些有SQL背景的人來說非常有用;但是在Spark 1.x中,window API一大缺點就是無法使用時間來建立視窗。時間在諸如金融、電信等領域有着非常重要的角色,基于時間來了解資料變得至關重要。
不過值得高興的是,在Spark 2.0中,window API内置也支援time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常類似。在這篇文章中,我将介紹如何在Spark SQL中使用time windows。
時間序列資料
在我們介紹如何使用time window之前,我們先來準備一份時間序列資料。本文将使用Apple公司從1980年到2016年期間的股票交易資訊。如下(完整的資料點選這裡擷取):
|
股票資料一共有六列,但是這裡我們僅關心Date和Close兩列,它們分别代表股票交易時間和當天收盤的價格。
将時間序列資料導入到DataFrame中
我們有了樣本資料之後,需要将它導入到DataFrame中以便下面的計算。所有的time window API需要一個類型為timestamp的列。我們可以使用spark-csv工具包來解析上面的Apple股票資料(csv格式),這個工具可以自動推斷時間類型的資料并自動建立好模式。代碼如下:
scala>
val
stocksDF
=
spark.read.option(
"header"
,
"true"
).option(
"inferSchema"
,
"true"
).csv(
"file:///user/iteblog/applestock.csv"
)
stocksDF
:
org.apache.spark.sql.DataFrame
=
[Date
:
timestamp, Open
:
double ...
5
more fields
計算2016年Apple股票周平均收盤價格
現在我們已經有了初始化好的資料,是以我們可以進行一些基于時間的視窗分析。在本例中我們将計算2016年Apple公司每周股票的收盤價格平均值。下面将一步一步進行介紹。
步驟一:找出2016年的股票交易資料
因為我們僅僅需要2016年的交易資料,是以我們可以對原始資料進行過濾,代碼如下:
|
上面代碼片段我們使用了内置的year函數來提取出日期中的年。
步驟二:計算平均值
現在我們需要對每個星期建立一個視窗,這種類型的視窗通常被稱為tumbling window,代碼片段如下:
|
上面代碼中展示了如何使用 time window API。window一般在group by語句中使用。window方法的第一個參數指定了時間所在的列;第二個參數指定了視窗的持續時間(duration),它的機關可以是seconds、minutes、hours、days或者weeks。建立好視窗之後,我們可以計算平均值。
步驟三:列印window的值
我們可以列印出window中的值,我們先定義好列印的公共函數,代碼片段如下:
|
然後我們列印出tumblingWindowDS中的值:
|
上面的輸出按照
window.start
進行了排序,這個字段标記了視窗的開始時間。上面的輸出你可能已經看到了第一行的開始時間是2015-12-31,結束時間是2016-01-07。但是你從原始資料可以得到:2016年Apple公司的股票交易資訊是從2016-01-04開始的;原因是2016-01-01是元旦,而2016-01-02和2016-01-03正好是周末,期間沒有股票交易。
我們可以手動指定視窗的開始時間來解決這個問題。
帶有開始時間的Time window
在前面的示例中,我們使用的是tumbling window。為了能夠指定開始時間,我們需要使用sliding window(滑動視窗)。到目前為止,沒有相關API來建立帶有開始時間的tumbling window,但是我們可以通過将視窗時間(window duration)和滑動時間(slide duration)設定成一樣來建立帶有開始時間的tumbling window。代碼如下:
|
上面的示例中,
4 days
參數就是開始時間的偏移量;前兩個參數分别代表視窗時間和滑動時間,我們列印出這個視窗的内容:
|
從上面的結果可以看出,我們已經有了一個從2016-01-04的結果;不過結果中還有2015年的資料。原因是我們的開始時間是
4 days
,2016-01-04之前的一周資料也會被顯示出,我們可以使用filter來過濾掉那行資料:
|
現在來看看輸出的結果:
|
到目前為止,我們已經了解了如何在Spark中使用Window了。
轉載自過往記憶(http://www.iteblog.com/)