天天看點

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

最初于2019年6月24日釋出在https://hackersandslackers.com。

在PySpark DataFrame上執行類似SQL的聯接和聚合。

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

我們一起經曆了一段探索PySpark神奇世界的旅程。 在介紹了DataFrame轉換,結構化流和RDD之後,在我們進行深入研究之前,剩下的事情還不多。

為了總結本系列的内容,我們将回顧一下我們錯過的一些強大的DataFrame操作。 特别是,我們将專注于整體修改DataFrame的操作,例如Join和Aggregation。 讓我們從Joins開始,然後我們可以通路Aggregation并以一些可視化的想法結束。

在PySpark中加入DataFrames

我假設您已經熟悉類似SQL的聯接的概念。 為了在PySpark中進行示範,我将建立兩個簡單的DataFrame:

· 客戶資料框(指定為資料框1);

· 訂單DataFrame(指定為DataFrame 2)。

我們建立兩個DataFrame的代碼如下

# DataFrame 1valuesA = [(1, 'bob', 3462543658686),           (2, 'rob', 9087567565439),           (3, 'tim', 5436586999467),           (4, 'tom', 8349756853250)]customersDF = spark.createDataFrame(valuesA,['id', 'name', 'credit_card_number']) # DataFrame 2valuesB = [(1, 'ketchup', 'bob', 1.20),           (2, 'rutabaga', 'bob', 3.35),           (3, 'fake vegan meat', 'rob', 13.99),           (4, 'cheesey poofs', 'tim', 3.99),           (5, 'ice cream', 'tim', 4.95),           (6, 'protein powder', 'tom', 49.95)]ordersDF = spark.createDataFrame(valuesB,['id', 'product_name', 'customer', 'price']) # Show tablescustomersDF.show()ordersDF.show()
           

它們的外觀如下:

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

> The DataFrames we just created.

現在,我們有兩個簡單的資料表可以使用。

在聯接這兩個表之前,必須意識到Spark中的表聯接是相對"昂貴"的操作,也就是說,它們使用了大量的時間和系統資源。

内部聯接

在沒有指定我們要執行的聯接類型的情況下,PySpark将預設為内部聯接。 通過調用DataFrame上的join()方法可以進行聯接:

joinedDF = customersDF.join(ordersDF, customersDF.name == ordersDF.customer)

join()方法在現有的DataFrame上運作,我們将其他DataFrame聯接到現有的DataFrame上。 join()方法中的第一個參數是要添加或連接配接的DataFrame。

接下來,我們指定聯接的" on"。 在我們的示例中,我們告訴我們的聯接将customersDF的"名稱"列與ordersDF的"客戶"列進行比較。 結果是這樣的:

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

右,左和外連接配接

我們可以将關鍵字參數" how"傳遞到join()中,該參數指定我們要執行的聯接的類型。 如您所想,how參數如何接受内部,外部,左側和右側。 我們還可以通過how參數傳遞一些備援類型,如leftOuter(與left相同)。

交叉聯接

我們可以執行的最後一種連接配接類型是交叉連接配接,也稱為笛卡爾連接配接。 交叉聯接與其他類型的聯接有些不同,是以交叉聯接具有自己的DataFrame方法:

joinedDF = customersDF.crossJoin(ordersDF)

交叉聯接在DataFrame#2中的每個記錄在DataFrame#1中建立一個新行:

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

> Anatomy of a cross join.

通過我們的簡單示例,您可以看到PySpark支援與傳統持久資料庫系統(例如Oracle,IBM DB2,Postgres和MySQL)相同類型的聯接操作。 PySpark使用記憶體中方法建立彈性分布式資料幀(RDD)。 正如我們提到的那樣,在叢集中執行這些類型的聯接操作将既昂貴又耗時。 接下來,我們将讨論彙總資料,這是Spark的核心優勢。

彙總資料

Spark允許我們對資料執行強大的聚合功能,類似于您可能已經在SQL或Pandas中使用的功能。 我要彙總的資料是紐約市機動車碰撞的資料集,因為我是一個悲傷而扭曲的人!

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

我們将在這裡熟悉兩個函數:agg()和groupBy()。 這些通常串聯使用,但是agg()可以用于沒有groupBy()的資料集:

df.agg({"*": "count"}).show()

不執行groupBy()進行聚合通常并不完全有用:

+--------+

|count(1) |

+--------+

| 1000 |

+--------+

通過将agg()與groupby()結合起來,讓我們從資料中獲得更深層的含義。

使用groupBy()

讓我們看看哪個自治市鎮在事故數量上處于領先地位:

import pyspark.sql.functions as fdf.groupby('borough').agg(f.count('borough').alias('count')).show()

結果:

+-------------+-----+

| borough |count |

+-------------+-----+

| QUEENS | 241 |

| BROOKLYN | 182 |

| BRONX | 261 |

| MANHATTAN | 272 |

|STATEN ISLAND | 44 |

+-------------+-----+

曼哈頓以我們的樣本中的272起事故為首! 聚在一起,曼哈頓。 讓我們看看哪個區是最緻命的區:

df.groupby('borough').agg(f.sum('number_of_persons_injured').alias('injuries')).orderBy('injuries', ascending=False).show()

開始了:

+-------------+--------+

| borough |injuries|

+-------------+--------+

| MANHATTAN | 62 |

| QUEENS | 59 |

| BRONX | 57 |

| BROOKLYN | 47 |

|STATEN ISLAND| 14 |

+-------------+--------+

好吧…那好。 讓我們避開曼哈頓!

按多列分組

通常,我們會希望按多列進行分組,以檢視更複雜的細分。 在這裡,我們按自治市鎮和"主要貢獻因素"進行分組:

aggDF = df.groupby('borough', 'contributing_factor_vehicle_1').agg(f.sum('number_of_persons_injured').alias('injuries')).orderBy('borough', 'injuries', ascending=False)aggDF = aggDF.filter(aggDF.injuries > 1)display(aggDF)
           

這将向我們展示每個行政區最常見的事故類型:

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

> Drivers in Manhattan need to pay attention! Get off your phones!!

到目前為止,我們已經使用count和sum函數進行了彙總。 如您所料,我們還可以使用min,max和avg函數進行聚合。 還有一個特别值得一提的功能,叫做corr()。 corr函數可幫助我們确定列之間的相關強度。

确定列相關

如果您是資料科學類型,那麼您會喜歡使用corr()進行聚合。 corr()确定兩列的相關強度,并輸出代表該相關的整數:

df.agg(corr("a", "b").alias('correlation')).collect()
           

輸出示例:

[Row(correlation=1.0)]

使用PySpark的Aggregation功能,您會發現您可以進入功能強大的聚合管道并真正回答複雜的問題。 這些問題的答案必須以令人愉悅且易于了解的視覺形式呈現。 讓我們考慮這些聚合的可視化。

聚合的資料塊可視化

如果您正在使用Databricks筆記本電腦,那麼display()指令會标配大量出色的可視化效果,以補充我們執行的所有彙總。 當試圖了解我們建立的聚合的分布時,這些功能特别有用。

我繼續前進,整理了以下事故中受傷人員的細目分類。 我們将結果劃分為Borough,然後檢視騎自行車的人和駕車者之間受傷人數的分布:

dataframe groupby_PySpark DataFrame方法:join()和groupBy()在PySpark中加入DataFrames内部聯接右,左和外連接配接交叉聯接彙總資料使用groupBy()按多列分組确定列相關聚合的資料塊可視化快樂足迹

> Creating a visualization in Databricks.

在建立條形圖時,"鍵"确定x軸上的值。 我在這裡通過多個"值"進行測量,也就是說,将顯示沿y軸的多次測量。

此特定的圖表非常适合堆積條形圖,我們通過将條形圖指定為顯示類型,然後在其他選項中指定堆積來建立堆積條形圖。 Databrick允許各種附加的酷視覺效果,例如地理圖表,散點圖等等。 似乎在曼哈頓散步要安全得多!

快樂足迹

我們在一起進行PySpark之旅中經曆了很多事情。 我很樂意将您永遠留在這裡,每個好父母都知道什麼時候該讓他們的孩子離開巢穴并獨自飛翔。 我會給你一些父母給我的建議:去找份工作,離開我該死的房子。

(本文翻譯自Todd Birchard的文章《PySpark Macro DataFrame Methods: join() and groupBy()》,參考:https://hackingandslacking.com/pyspark-macro-dataframe-methods-join-and-groupby-477a57836ff)

繼續閱讀