天天看點

使用 Spark DataFrame 進行大資料分析

目錄

1. 什麼是 spark dataframe

2. 為什麼要用 spark dataframe

3. 建立 dataframe

4. 操作 dataframe

4.1 在 dataframe 上執行 sql 語句

4.2 spark dataframe 與 pandas dataframe 轉換

5. 一些經驗

5.1 spark json 格式問題

5.2 spark dataframe 和 pandas dataframe 選擇問題

1. 什麼是 spark dataframe

先來看看官方原汁原味的文檔是怎麼介紹的:

A DataFrame is 

a distributed collection of data

 organized into named columns. It is conceptually equivalent to a 

table in a relational database

 or a data frame in R/Python, but with 

richer optimizations

 under the hood. DataFrames can be constructed from a wide array of sources such as: 

structured data files, tables in Hive, external databases, or existing RDDs

.

我們可以看到 spark dataframe 的幾個關鍵點:

  • 分布式的資料集
  • 類似關系型資料庫中的table,或者 excel 裡的一張 sheet,或者 python/R 裡的 dataframe
  • 擁有豐富的操作函數,類似于 rdd 中的算子
  • 一個 dataframe 可以被注冊成一張資料表,然後用 sql 語言在上面操作
  • 豐富的建立方式
    • 已有的RDD
    • 結構化資料檔案
    • JSON資料集
    • Hive表
    • 外部資料庫

2. 為什麼要用 spark dataframe

為什麼要用 dataframe,從細節實作上來說,這個問題比較複雜,不過,基本上下面這張圖就能說明所有問題了:

使用 Spark DataFrame 進行大資料分析

但是,本文是從基礎角度來說 spark dataframe,先不糾結這些細節問題,先了解一些基礎的原理和優勢,關于上面那張圖裡面的内容,看後期安排,也許在之後第 15 篇左右會專門講。

DataFrame API 是在 R 和 Python data frame 的設計靈感之上設計的,具有以下功能特性:

  • 從KB到PB級的資料量支援;
  • 多種資料格式和多種存儲系統支援;
  • 通過Spark SQL 的 Catalyst優化器進行先進的優化,生成代碼;
  • 通過Spark無縫內建所有大資料工具與基礎設施;
  • 為Python、Java、Scala和R語言(SparkR)API;

簡單來說,dataframe 能夠更友善的操作資料集,而且因為其底層是通過 spark sql 的 Catalyst優化器生成優化後的執行代碼,是以其執行速度會更快。總結下來就是,使用 spark dataframe 來建構 spark app,能:

  • write less : 寫更少的代碼
  • do more : 做更多的事情
  • faster : 以更快的速度

3. 建立 dataframe

因為 spark sql,dataframe,datasets 都是共用 spark sql 這個庫的,三者共享同樣的代碼優化,生成以及執行流程,是以 sql,dataframe,datasets 的入口都是 sqlContext。可用于建立 spark dataframe 的資料源有很多,我們就講最簡單的從結構化檔案建立 dataframe。

使用 Spark DataFrame 進行大資料分析

下面是我自己建立 spark sc 都模版:

sc_conf = SparkConf()
sc_conf.setAppName("03-DataFrame-01")
sc_conf.setMaster(SPARK_MASTER)
sc_conf.set('spark.executor.memory', '2g')
sc_conf.set('spark.logConf', True)
sc_conf.getAll()
try:
    sc.stop()
    time.sleep(1)
except:
    sc = SparkContext(conf=sc_conf)
    sqlContext = SQLContext(sc)
           
  • step 2 : 建立 dataframe,從 json 檔案

資料檔案說明:中國 A 股上市公司基本資訊,可以在這裡取到:stock_5.json

使用 Spark DataFrame 進行大資料分析

注:這裡的 json 檔案并不是标準的 json 檔案,spark 目前也不支援讀取标準的 json 檔案。你需要預先把标準的 json 檔案處理成 spark 支援的格式: 每一行是一個 json 對象。

比如說,官網的 

people.json

 這個例子,它要求的格式是:

{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}
           

但對這個檔案來看,标準的 json 格式隻有下面兩種:

{"name": ["Yin", "Michael"],
 "address":[ 
    {"city":"Columbus","state":"Ohio"}, 
    {"city":null, "state":"California"} 
  ]
}
### 或者
[ 
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
]
           

是以在用 spark sql 來讀取一個 json 檔案的時候,務必要提前處理好 json 的檔案格式,這裡我們已經提前處理好了,檔案如下所示:

{"ticker":"000001","tradeDate":"2016-03-30","exchangeCD":"XSHE","secShortName":"\u5e73\u5b89\u94f6\u884c","preClosePrice":10.43,"openPrice":10.48,"dealAmount":19661,"turnoverValue":572627417.1299999952,"highestPrice":10.7,"lowestPrice":10.47,"closePrice":10.7,"negMarketValue":126303384220.0,"marketValue":153102835340.0,"isOpen":1,"secID":"000001.XSHE","listDate":"1991-04-03","ListSector":"\u4e3b\u677f","totalShares":14308676200},
{"ticker":"000002","tradeDate":"2016-03-30","exchangeCD":"XSHE","secShortName":"\u4e07\u79d1A","preClosePrice":24.43,"openPrice":0.0,"dealAmount":0,"turnoverValue":0.0,"highestPrice":0.0,"lowestPrice":0.0,"closePrice":24.43,"negMarketValue":237174448154.0,"marketValue":269685994760.0,"isOpen":0,"secID":"000002.XSHE","listDate":"1991-01-29","ListSector":"\u4e3b\u677f","totalShares":11039132000}
           
### df is short for dataframe
df = sqlContext.read.json('hdfs://10.21.208.21:8020/user/mercury/stock_5.json')
print df.printSchema()
print df.select(['ticker', 'secID', 'tradeDate', 'listDate', 'openPrice', 'closePrice', 
                 'highestPrice', 'lowestPrice', 'isOpen']).show(n=5)
           
使用 Spark DataFrame 進行大資料分析

4. 操作 dataframe

同 rdd 一樣,dataframe 也有很多專屬于自己的算子,用于操作整個 dataframe 資料集,我們以後都簡稱為 dataframe api 吧,用 

算子

, 

DSL

 這類的稱呼對不熟悉的人來說不易了解,下面這裡是完整的 api 清單:spark dataframe api

4.1 在 dataframe 上執行 sql 語句

使用 Spark DataFrame 進行大資料分析

4.2 spark dataframe 與 pandas dataframe 轉換

一圖勝千言啊:

使用 Spark DataFrame 進行大資料分析

縱觀 spark 的誕生和發展,我覺得 spark 有一點做得非常明智:對同類産品的相容。從大的方面來說,就像 spark 官網的這段話一樣: Runs Everywhere: Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.,spark 對 hadoop 系産品的相容,讓 hadoop 系的開發人員可以輕松的從 hadoop 轉到 spark;從小的方面來說,spark 對一些細分工具也照顧 [相容] 得很好,比如說 spark 推出了 dataframe,人家就可以支援 spark dataframe 和 pandas dataframe 的轉換。

熟悉 pandas dataframe 的都了解,pandas 裡的 dataframe 可以做很多事情,比如說畫圖,儲存為各種類型的檔案,做資料分析什麼的。我覺得,可以在 spark 的 dataframe 裡做資料處理,分析的整個邏輯,然後可以把最後的結果轉化成 pandas 的 dataframe 來展示。當然,如果你的資料量小,也可以直接用 pandas dataframe 來做。

使用 Spark DataFrame 進行大資料分析

5. 一些經驗

5.1 spark json 格式問題

spark 目前也不支援讀取标準的 json 檔案。你需要預先把标準的 json 檔案處理成 spark 支援的格式: 每一行是一個 json 對象。

5.2 spark dataframe 和 pandas dataframe 選擇問題

如果資料量小,結構簡單,可以直接用 pandas dataframe 來做分析;如果資料量大,結構複雜 [嵌套結構],那麼推薦用 spark dataframe 來做資料分析,然後把結果轉成 pandas dataframe,用 pandas dataframe 來做展示和報告。