天天看點

Spark 2.0介紹:Spark SQL中的Time Window使用

文章目錄 [hide]

  • 1 Spark SQL中Window API
  • 2 時間序列資料
  • 3 将時間序列資料導入到DataFrame中
  • 4 計算2016年Apple股票周平均收盤價格
    • 4.1 步驟一:找出2016年的股票交易資料
    • 4.2 步驟二:計算平均值
    • 4.3 步驟三:列印window的值
  • 5 帶有開始時間的Time window
  • 6 資料檔案下載下傳

Spark SQL中Window API

  Spark SQL中的window API是從1.4版本開始引入的,以便支援更智能的分組功能。這個功能對于那些有SQL背景的人來說非常有用;但是在Spark 1.x中,window API一大缺點就是無法使用時間來建立視窗。時間在諸如金融、電信等領域有着非常重要的角色,基于時間來了解資料變得至關重要。

  不過值得高興的是,在Spark 2.0中,window API内置也支援time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常類似。在這篇文章中,我将介紹如何在Spark SQL中使用time windows。

時間序列資料

  在我們介紹如何使用time window之前,我們先來準備一份時間序列資料。本文将使用Apple公司從1980年到2016年期間的股票交易資訊。如下(完整的資料點選這裡擷取):

Date,Open,High,Low,Close,Volume,Adj Close

2016-7-11,96.75,97.650002,96.730003,96.980003,23298900,96.980003

2016-7-8,96.489998,96.889999,96.050003,96.68,28855800,96.68

2016-7-7,95.699997,96.5,95.620003,95.940002,24280900,95.940002

2016-7-6,94.599998,95.660004,94.370003,95.529999,30770700,95.529999

2016-7-5,95.389999,95.400002,94.459999,95.040001,27257000,95.040001

2016-7-1,95.489998,96.470001,95.330002,95.889999,25872300,95.889999

2016-6-30,94.440002,95.769997,94.300003,95.599998,35836400,95.599998

2016-6-29,93.970001,94.550003,93.629997,94.400002,36531000,94.400002

2016-6-28,92.900002,93.660004,92.139999,93.589996,40444900,93.589996

2016-6-27,93,93.050003,91.5,92.040001,45489600,92.040001

2016-6-24,92.910004,94.660004,92.650002,93.400002,75311400,93.400002

2016-6-23,95.940002,96.290001,95.25,96.099998,32240200,96.099998

2016-6-22,96.25,96.889999,95.349998,95.550003,28971100,95.550003

2016-6-21,94.940002,96.349998,94.68,95.910004,35229500,95.910004

2016-6-20,96,96.57,95.029999,95.099998,33942300,95.099998

2016-6-17,96.620003,96.650002,95.300003,95.330002,60595000,95.330002

2016-6-16,96.449997,97.75,96.07,97.550003,31236300,97.550003

2016-6-15,97.82,98.410004,97.029999,97.139999,29445200,97.139999

2016-6-14,97.32,98.480003,96.75,97.459999,31931900,97.459999

2016-6-13,98.690002,99.120003,97.099998,97.339996,38020500,97.339996

2016-6-10,98.529999,99.349998,98.480003,98.830002,31712900,98.830002

2016-6-9,98.5,99.989998,98.459999,99.650002,26601400,99.650002

2016-6-8,99.019997,99.559998,98.68,98.940002,20848100,98.940002

2016-6-7,99.25,99.870003,98.959999,99.029999,22409500,99.029999

2016-6-6,97.989998,101.889999,97.550003,98.629997,23292500,98.629997

2016-6-3,97.790001,98.269997,97.449997,97.919998,28062900,97.919998

2016-6-2,97.599998,97.839996,96.629997,97.720001,40004100,97.720001

2016-6-1,99.019997,99.540001,98.330002,98.459999,29113400,98.459999

2016-5-31,99.599998,100.400002,98.82,99.860001,42084800,99.860001

股票資料一共有六列,但是這裡我們僅關心Date和Close兩列,它們分别代表股票交易時間和當天收盤的價格。

将時間序列資料導入到DataFrame中

  我們有了樣本資料之後,需要将它導入到DataFrame中以便下面的計算。所有的time window API需要一個類型為timestamp的列。我們可以使用spark-csv工具包來解析上面的Apple股票資料(csv格式),這個工具可以自動推斷時間類型的資料并自動建立好模式。代碼如下:

scala>

val

stocksDF

=

spark.read.option(

"header"

,

"true"

).option(

"inferSchema"

,

"true"

).csv(

"file:///user/iteblog/applestock.csv"

)

stocksDF

:

org.apache.spark.sql.DataFrame

=

[Date

:

timestamp, Open

:

double ...

5

more fields

計算2016年Apple股票周平均收盤價格

  現在我們已經有了初始化好的資料,是以我們可以進行一些基于時間的視窗分析。在本例中我們将計算2016年Apple公司每周股票的收盤價格平均值。下面将一步一步進行介紹。

步驟一:找出2016年的股票交易資料

因為我們僅僅需要2016年的交易資料,是以我們可以對原始資料進行過濾,代碼如下:

scala>

val

stocks

2016

=

stocksDF.filter(

"year(Date)==2016"

)

stocks

2016

:

org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

=

[Date

:

timestamp, Open

:

double ...

5

more fields]

上面代碼片段我們使用了内置的year函數來提取出日期中的年。

步驟二:計算平均值

現在我們需要對每個星期建立一個視窗,這種類型的視窗通常被稱為tumbling window,代碼片段如下:

scala>

val

tumblingWindowDS

=

stocks

2016

.groupBy(window(stocks

2016

.col(

"Date"

),

"1 week"

)).agg(avg(

"Close"

).as(

"weekly_average"

))

tumblingWindowDS

:

org.apache.spark.sql.DataFrame

=

[window

:

struct<start

:

timestamp, end

:

timestamp>, weekly

_

average

:

double]

上面代碼中展示了如何使用 time window API。window一般在group by語句中使用。window方法的第一個參數指定了時間所在的列;第二個參數指定了視窗的持續時間(duration),它的機關可以是seconds、minutes、hours、days或者weeks。建立好視窗之後,我們可以計算平均值。

步驟三:列印window的值

我們可以列印出window中的值,我們先定義好列印的公共函數,代碼片段如下:

def

printWindow(windowDF

:

DataFrame, aggCol

:

String)

=

{

windowDF.sort(

"window.start"

).

select(

"window.start"

,

"window.end"

,s

"$aggCol"

).

show(truncate

=

false

)

}

然後我們列印出tumblingWindowDS中的值:

printWindow(tumblingWindowDS,

"weekly_average"

)

+---------------------+---------------------+------------------+

|start                |end                  |weekly

_

average    |

+---------------------+---------------------+------------------+

|

2015

-

12

-

31

08

:

00

:

00.0

|

2016

-

01

-

07

08

:

00

:

00.0

|

101.30249774999999

|

|

2016

-

01

-

07

08

:

00

:

00.0

|

2016

-

01

-

14

08

:

00

:

00.0

|

98.47199859999999

|

|

2016

-

01

-

14

08

:

00

:

00.0

|

2016

-

01

-

21

08

:

00

:

00.0

|

96.72000125000001

|

|

2016

-

01

-

21

08

:

00

:

00.0

|

2016

-

01

-

28

08

:

00

:

00.0

|

97.6719984

|

|

2016

-

01

-

28

08

:

00

:

00.0

|

2016

-

02

-

04

08

:

00

:

00.0

|

96.239999

|

|

2016

-

02

-

04

08

:

00

:

00.0

|

2016

-

02

-

11

08

:

00

:

00.0

|

94.39799819999999

|

|

2016

-

02

-

11

08

:

00

:

00.0

|

2016

-

02

-

18

08

:

00

:

00.0

|

96.2525005

|

|

2016

-

02

-

18

08

:

00

:

00.0

|

2016

-

02

-

25

08

:

00

:

00.0

|

96.09400000000001

|

|

2016

-

02

-

25

08

:

00

:

00.0

|

2016

-

03

-

03

08

:

00

:

00.0

|

99.276001

|

|

2016

-

03

-

03

08

:

00

:

00.0

|

2016

-

03

-

10

08

:

00

:

00.0

|

101.64000100000001

|

|

2016

-

03

-

10

08

:

00

:

00.0

|

2016

-

03

-

17

08

:

00

:

00.0

|

104.226001

|

|

2016

-

03

-

17

08

:

00

:

00.0

|

2016

-

03

-

24

08

:

00

:

00.0

|

106.0699996

|

|

2016

-

03

-

24

08

:

00

:

00.0

|

2016

-

03

-

31

08

:

00

:

00.0

|

107.8549995

|

|

2016

-

03

-

31

08

:

00

:

00.0

|

2016

-

04

-

07

08

:

00

:

00.0

|

110.08399979999999

|

|

2016

-

04

-

07

08

:

00

:

00.0

|

2016

-

04

-

14

08

:

00

:

00.0

|

110.4520004

|

|

2016

-

04

-

14

08

:

00

:

00.0

|

2016

-

04

-

21

08

:

00

:

00.0

|

107.46800060000001

|

|

2016

-

04

-

21

08

:

00

:

00.0

|

2016

-

04

-

28

08

:

00

:

00.0

|

101.5520004

|

|

2016

-

04

-

28

08

:

00

:

00.0

|

2016

-

05

-

05

08

:

00

:

00.0

|

93.9979994

|

|

2016

-

05

-

05

08

:

00

:

00.0

|

2016

-

05

-

12

08

:

00

:

00.0

|

92.35599959999999

|

|

2016

-

05

-

12

08

:

00

:

00.0

|

2016

-

05

-

19

08

:

00

:

00.0

|

93.3299974

|

+---------------------+---------------------+------------------+

only showing top

20

rows

上面的輸出按照

window.start

進行了排序,這個字段标記了視窗的開始時間。上面的輸出你可能已經看到了第一行的開始時間是2015-12-31,結束時間是2016-01-07。但是你從原始資料可以得到:2016年Apple公司的股票交易資訊是從2016-01-04開始的;原因是2016-01-01是元旦,而2016-01-02和2016-01-03正好是周末,期間沒有股票交易。

我們可以手動指定視窗的開始時間來解決這個問題。

帶有開始時間的Time window

  在前面的示例中,我們使用的是tumbling window。為了能夠指定開始時間,我們需要使用sliding window(滑動視窗)。到目前為止,沒有相關API來建立帶有開始時間的tumbling window,但是我們可以通過将視窗時間(window duration)和滑動時間(slide duration)設定成一樣來建立帶有開始時間的tumbling window。代碼如下:

val

iteblogWindowWithStartTime

=

stocks

2016

.groupBy(window(stocks

2016

.col(

"Date"

),

"1 week"

,

"1 week"

,

"4 days"

)).agg(avg(

"Close"

).as(

"weekly_average"

))

上面的示例中,

4 days

參數就是開始時間的偏移量;前兩個參數分别代表視窗時間和滑動時間,我們列印出這個視窗的内容:

printWindow(iteblogWindowWithStartTime,

"weekly_average"

)

+---------------------+---------------------+------------------+

|start                |end                  |weekly

_

average    |

+---------------------+---------------------+------------------+

|

2015

-

12

-

28

08

:

00

:

00.0

|

2016

-

01

-

04

08

:

00

:

00.0

|

105.349998

|

|

2016

-

01

-

04

08

:

00

:

00.0

|

2016

-

01

-

11

08

:

00

:

00.0

|

99.0699982

|

|

2016

-

01

-

11

08

:

00

:

00.0

|

2016

-

01

-

18

08

:

00

:

00.0

|

98.49999799999999

|

|

2016

-

01

-

18

08

:

00

:

00.0

|

2016

-

01

-

25

08

:

00

:

00.0

|

98.1220016

|

|

2016

-

01

-

25

08

:

00

:

00.0

|

2016

-

02

-

01

08

:

00

:

00.0

|

96.2539976

|

|

2016

-

02

-

01

08

:

00

:

00.0

|

2016

-

02

-

08

08

:

00

:

00.0

|

95.29199960000001

|

|

2016

-

02

-

08

08

:

00

:

00.0

|

2016

-

02

-

15

08

:

00

:

00.0

|

94.2374975

|

|

2016

-

02

-

15

08

:

00

:

00.0

|

2016

-

02

-

22

08

:

00

:

00.0

|

96.7880004

|

|

2016

-

02

-

22

08

:

00

:

00.0

|

2016

-

02

-

29

08

:

00

:

00.0

|

96.23000160000001

|

|

2016

-

02

-

29

08

:

00

:

00.0

|

2016

-

03

-

07

08

:

00

:

00.0

|

101.53200079999999

|

|

2016

-

03

-

07

08

:

00

:

00.0

|

2016

-

03

-

14

08

:

00

:

00.0

|

101.6199998

|

|

2016

-

03

-

14

08

:

00

:

00.0

|

2016

-

03

-

21

08

:

00

:

00.0

|

105.63600160000001

|

|

2016

-

03

-

21

08

:

00

:

00.0

|

2016

-

03

-

28

08

:

00

:

00.0

|

105.92749950000001

|

|

2016

-

03

-

28

08

:

00

:

00.0

|

2016

-

04

-

04

08

:

00

:

00.0

|

109.46799940000001

|

|

2016

-

04

-

04

08

:

00

:

00.0

|

2016

-

04

-

11

08

:

00

:

00.0

|

109.39799980000001

|

|

2016

-

04

-

11

08

:

00

:

00.0

|

2016

-

04

-

18

08

:

00

:

00.0

|

110.3820004

|

|

2016

-

04

-

18

08

:

00

:

00.0

|

2016

-

04

-

25

08

:

00

:

00.0

|

106.15400079999999

|

|

2016

-

04

-

25

08

:

00

:

00.0

|

2016

-

05

-

02

08

:

00

:

00.0

|

96.8759994

|

|

2016

-

05

-

02

08

:

00

:

00.0

|

2016

-

05

-

09

08

:

00

:

00.0

|

93.6240004

|

|

2016

-

05

-

09

08

:

00

:

00.0

|

2016

-

05

-

16

08

:

00

:

00.0

|

92.13399799999999

|

+---------------------+---------------------+------------------+

only showing top

20

rows

從上面的結果可以看出,我們已經有了一個從2016-01-04的結果;不過結果中還有2015年的資料。原因是我們的開始時間是

4 days

,2016-01-04之前的一周資料也會被顯示出,我們可以使用filter來過濾掉那行資料:

val

filteredWindow

=

iteblogWindowWithStartTime.filter(

"year(window.start)=2016"

)

現在來看看輸出的結果:

printWindow(filteredWindow,

"weekly_average"

)

+---------------------+---------------------+------------------+

|start                |end                  |weekly

_

average    |

+---------------------+---------------------+------------------+

|

2016

-

01

-

04

08

:

00

:

00.0

|

2016

-

01

-

11

08

:

00

:

00.0

|

99.0699982

|

|

2016

-

01

-

11

08

:

00

:

00.0

|

2016

-

01

-

18

08

:

00

:

00.0

|

98.49999799999999

|

|

2016

-

01

-

18

08

:

00

:

00.0

|

2016

-

01

-

25

08

:

00

:

00.0

|

98.1220016

|

|

2016

-

01

-

25

08

:

00

:

00.0

|

2016

-

02

-

01

08

:

00

:

00.0

|

96.2539976

|

|

2016

-

02

-

01

08

:

00

:

00.0

|

2016

-

02

-

08

08

:

00

:

00.0

|

95.29199960000001

|

|

2016

-

02

-

08

08

:

00

:

00.0

|

2016

-

02

-

15

08

:

00

:

00.0

|

94.2374975

|

|

2016

-

02

-

15

08

:

00

:

00.0

|

2016

-

02

-

22

08

:

00

:

00.0

|

96.7880004

|

|

2016

-

02

-

22

08

:

00

:

00.0

|

2016

-

02

-

29

08

:

00

:

00.0

|

96.23000160000001

|

|

2016

-

02

-

29

08

:

00

:

00.0

|

2016

-

03

-

07

08

:

00

:

00.0

|

101.53200079999999

|

|

2016

-

03

-

07

08

:

00

:

00.0

|

2016

-

03

-

14

08

:

00

:

00.0

|

101.6199998

|

|

2016

-

03

-

14

08

:

00

:

00.0

|

2016

-

03

-

21

08

:

00

:

00.0

|

105.63600160000001

|

|

2016

-

03

-

21

08

:

00

:

00.0

|

2016

-

03

-

28

08

:

00

:

00.0

|

105.92749950000001

|

|

2016

-

03

-

28

08

:

00

:

00.0

|

2016

-

04

-

04

08

:

00

:

00.0

|

109.46799940000001

|

|

2016

-

04

-

04

08

:

00

:

00.0

|

2016

-

04

-

11

08

:

00

:

00.0

|

109.39799980000001

|

|

2016

-

04

-

11

08

:

00

:

00.0

|

2016

-

04

-

18

08

:

00

:

00.0

|

110.3820004

|

|

2016

-

04

-

18

08

:

00

:

00.0

|

2016

-

04

-

25

08

:

00

:

00.0

|

106.15400079999999

|

|

2016

-

04

-

25

08

:

00

:

00.0

|

2016

-

05

-

02

08

:

00

:

00.0

|

96.8759994

|

|

2016

-

05

-

02

08

:

00

:

00.0

|

2016

-

05

-

09

08

:

00

:

00.0

|

93.6240004

|

|

2016

-

05

-

09

08

:

00

:

00.0

|

2016

-

05

-

16

08

:

00

:

00.0

|

92.13399799999999

|

|

2016

-

05

-

16

08

:

00

:

00.0

|

2016

-

05

-

23

08

:

00

:

00.0

|

94.77999880000002

|

+---------------------+---------------------+------------------+

only showing top

20

rows

到目前為止,我們已經了解了如何在Spark中使用Window了。

轉載自過往記憶(http://www.iteblog.com/)

繼續閱讀