天天看點

Schema RDD(DataFrame)----Spark SQL操作1. 使用Spark SQL2. 讀取和存儲資料3. 使用者自定義函數(UDF)

SchemaRDD是存放 Row 對象的 RDD,每個 Row 對象代表一行記錄。 SchemaRDD 還包含記錄的結構資訊(即資料字段)。 SchemaRDD 看起來和普通的 RDD 很像,但是在内部, SchemaRDD 可以利用結構資訊更加高效地存儲資料。 此外, SchemaRDD 還支援 RDD 上所沒有的一些新操作,比如運作 SQL 查詢。 SchemaRDD 可以從外部資料源建立,也可以從查詢結果或普通 RDD 中建立。

若要把 Spark SQL 連接配接到一個部署好的 Hive 上,你必須把 hive-site.xml 複制到Spark 的配置檔案目錄中( $SPARK_HOME/conf)。即使沒有部署好 Hive, Spark SQL 也可以運作。如果你沒有部署好 Hive, Spark SQL 會在目前的工作目錄中建立出自己的Hive 中繼資料倉庫, 叫作 metastore_db。此外,如果你嘗試使用 HiveQL 中的 CREATE TABLE(并非 CREATE EXTERNAL TABLE)語句來建立表,這些表會被放在你預設的檔案系統中的/user/hive/warehouse 目錄中( 如果你的 classpath 中有配好的 hdfs-site.xml,預設的檔案系統就是 HDFS,否則就是本地檔案系統)。

1. 使用Spark SQL

初始化

# 導入Spark SQL
from pyspark.sql import HiveContext, Row
# 當不能引入hive依賴時
from pyspark.sql import SQLContext, Row
hiveCtx = HiveContext(sc)
           

基本查詢

input = hiveCtx.jsonFile(inputFile)
# 注冊輸入的SchemaRDD
input.registerTempTable("tweets")
# 依據retweetCount ( 轉發計數)選出推文
topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
           

讀取資料和執行查詢都會傳回 SchemaRDD。SchemaRDD 這個名字可能會被改為 DataFrame。SchemaRDD 仍然是 RDD, 是以你可以對其應用已有的 RDD 轉化操作,比如 map() 和filter()。然而, SchemaRDD 也提供了一些額外的功能支援。最重要的是,你可以把任意 SchemaRDD 注冊為臨時表,這樣就可以使用 HiveContext.sql 或 SQLContext.sql 來對它進行查詢了。 你可以通過 SchemaRDD 的 registerTempTable() 方法這麼做。(臨時表是目前使用的 HiveContext 或 SQLContext 中的臨時變量,在你的應用退出時這些臨時表就不再存在了)

SchemaRDD 可以存儲一些基本資料類型,也可以存儲由這些類型組成的結構體和數組。

Spark SQL/HiveQL類型----Scala類型----Java類型----Python

TINYINT----Byte----Byte/byte----int/long ( 在 -128 到 127 之間 )

SMALLINT----Short----Short/short----int/long ( 在 -32768 到 32767之間 )

INT----Int----Int/int----int 或 long

BIGINT----Long----Long/long----long

FLOAT----Float----Float /float----float

DOUBLE----Double----Double/double----float

DECIMAL----Scala.math.BigDecimal----java.math.BigDecimal----decimal.Decimal

STRING----String----String----string

BINARY----Array[Byte]----byte[]----bytearray

BOOLEAN----Boolean----Boolean/boolean----bool

TIMESTAMP----java.sql.TimeStamp----java.sql.TimeStamp----datetime.datetime

ARRAY<DATA_TYPE>----Seq----List----list、 tuple 或 array

MAP<KEY_TYPE, VAL_TYPE>----Map----Map----dict

STRUCT<COL1:COL1_TYPE, ...>----Row----Row----Row

Row 對象表示 SchemaRDD 中的記錄,其本質就是一個定長的字段數組。在 Scala/Java 中,Row 對象有一系列 getter 方法,可以通過下标擷取每個字段的值。在 Python 中,由于沒有顯式的類型系統, Row 對象變得稍有不同。我們使用 row[i] 來通路第 i 個元素。除此之外, Python 中的 Row 還支援以 row.column_name 的形式使用名字來通路其中的字段。

topTweetText = topTweets.map(lambda row: row.text)
           

Spark SQL 的緩存機制與 Spark 中的稍有不同。由于我們知道每個列的類型資訊,是以Spark 可以更加高效地存儲資料。 為了確定使用更節約記憶體的表示方式進行緩存而不是儲存整個對象, 應當使用專門的 hiveCtx.cacheTable("tableName") 方法。

2. 讀取和存儲資料

Spark SQL 支援很多種結構化資料源,可以讓你跳過複雜的讀取過程,輕松從各種資料源中讀取到 Row 對象。這些資料源包括 Hive 表、 JSON 和 Parquet 檔案。此外,當你使用SQL 查詢這些資料源中的資料并且隻用到了一部分字段時, Spark SQL 可以智能地隻掃描這些用到的字段,而不是像 SparkContext.hadoopFile 中那樣簡單粗暴地掃描全部資料。

2.1. Apache Hive

當從 Hive 中讀取資料時, Spark SQL 支援任何 Hive 支援的存儲格式( SerDe),包括文本檔案、 RCFiles、 ORC、 Parquet、 Avro,以及 Protocol Buffer。

from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT key, value FROM mytable")
keys = rows.map(lambda row: row[0])
           

2.2. Parquet

Parquet是一種流行的列式存儲格式,可以高效地存儲具有嵌套字段的記錄。 Parquet 格式經常在 Hadoop 生态圈中被使用,它也支援 Spark SQL 的全部資料類型。 Spark SQL 提供了直接讀取和存儲 Parquet 格式檔案的方法。

# 從一個有name和favouriteAnimal字段的Parquet檔案中讀取資料
rows = hiveCtx.parquetFile(parquetFile)
names = rows.map(lambda row: row.name)
print "Everyone"
print names.collect()
           

你也可以把 Parquet 檔案注冊為 Spark SQL 的臨時表,并在這張表上運作查詢語句。

# 尋找熊貓愛好者
tbl = rows.registerTempTable("people")
pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"")
print "Panda friends"
print pandaFriends.map(lambda row: row.name).collect()
           

2.3. JSON

如果你有一個 JSON 檔案,其中的記錄遵循同樣的結構資訊,那麼 Spark SQL 就可以通過掃描檔案推測出結構資訊, 并且讓你可以使用名字通路對應字段。要讀取 JSON 資料,隻要調用 hiveCtx 中的 jsonFile() 方法即可。

input = hiveCtx.jsonFile(inputFile)
           

2.4. 基于RDD

除了讀取資料,也可以基于 RDD 建立 SchemaRDD。在 Scala 中,帶有 case class 的 RDD可以隐式轉換成 SchemaRDD。在 Python 中, 可以建立一個由 Row 對象組成的 RDD,然後調用 inferSchema()。

happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")])
happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)
happyPeopleSchemaRDD.registerTempTable("happy_people")
           

3. 使用者自定義函數(UDF)

我們可以使用 Spark 支援的程式設計語言編寫好函數,然後通過 Spark SQL 内建的方法傳遞進來,非常便捷地注冊我們自己的 UDF。 在 Scala 和 Python 中,可以利用語言原生的函數和lambda 文法的支援, 而在 Java 中,則需要擴充對應的 UDF 類。 

# 寫一個求字元串長度的UDF
hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
           

繼續閱讀