天天看點

高效使用 PyODPS 最佳實踐

PyODPS 是 MaxCompute(ODPS) Python SDK,為 MaxCompute 對象提供了 Python 端的操作接口,同時,對于熟悉 Pandas 的使用者來說,它提供了 DataFrame API 來用類似 Pandas 的接口進行大規模資料分析以及處理,并能夠友善的将 MaxCompute 的分布式 DataFrame 向本地 Pandas DataFrame 轉換。正因為如此的友善靈活,在實際使用過程中,不少使用者容易把 PyODPS 用成單機處理模式,沒有最大限度發揮 MaxCompute 分布式運算的性能。本文主要介紹如何高效地使用 PyODPS 處理資料。

概念

首先介紹基本概念。PyODPS 作為一個 SDK,本身運作于各種用戶端,如下圖所示,PyODPS 位于紫色部分,可以在是個人 PC,也可以是 DataWorks 的排程節點,或者 PAI Notebooks 的運作環境。需要注意的是,DataWorks 中的 PyODPS 節點也是一個資源非常受限的用戶端運作容器,内置了 PyODPS 包以及必要的 Python 環境,并不使用 MaxCompute 資源,有較強的記憶體限制。是以合理利用 PyODPS 提供的分布式 DataFrame 功能,将主要的計算送出到 MaxCompute 分布式執行而不是在 PyODPS 用戶端節點下載下傳處理,是正确使用 PyODPS 的關鍵。這篇文章詳細介紹了

PyODPS 代碼跑在哪裡

高效使用 PyODPS 最佳實踐

盡量避免資料下載下傳

很多使用者習慣于用 Pandas 處理資料。PyODPS 提供了 to_pandas 接口,可以直接将 MaxCompute 資料轉化成 Pandas DataFrame 資料結構,這個接口很受歡迎。但這個接口隻應該被用于擷取小規模資料做本地開發調試使用,而不是用來大規模處理資料。使用這個接口會觸發下載下傳行為,将位于 MaxCompute 中的海量資料下載下傳到本地,如果後續操作的都是本地的 DataFrame,則喪失了 MaxCompute 的大規模并行計算能力。而且,資料量稍大,單機記憶體就很容易産生 OOM。

正确的使用方式,是使用 PyODPS DataFrame 接口來完成資料處理。常見的需求,比如需要對每一行資料處理然後寫回表,或者一行資料要拆成多行,都可以通過 PyODPS DataFrame 中的 map 或者 apply 實作,有的甚至隻需要一行代碼,足夠高效與簡潔。使用這些接口我們最終都會翻譯成 SQL 到 MaxCompute 計算叢集做分布式計算,并且本地幾乎沒有任何的記憶體消耗,顯然相比于單機有很大的性能提升。下面我們以一個實際的例子來說明。

一個例子:分詞

使用者需要通過分析每天産生的日志字元串來提取一些資訊,有一個隻有一列的表,它的類型是 string,通過 jieba 分詞可以将中文語句分詞,然後再找到想要的關鍵詞存儲到資訊表裡。很多人會習慣于單機處理資料的思維,一行一行的讀出資料,然後一行一行地處理資料,然後再一行一行的寫入目标表,是以代碼會變成這樣:

import jieba
t = o.get_table('word_split')
out = []
with t.open_reader() as reader:
    for r in reader:
        words = list(jieba.cut(r[0]))
        #
        # 處理邏輯,産生出 processed_data
        #
        out.append(processed_data)
out_t = o.get_table('words')
with out_t.open_writer() as writer:
    writer.write(out)           

我們分析一下整個流程,下載下傳上傳資料消耗了大量的時間,并且在執行腳本的機器上需要很大的記憶體處理所有的資料,特别對于 DataWorks 使用者,很容易超過預設配置設定的記憶體值,導緻 OOM 運作報錯。是以這樣的問題應該怎麼解決呢?答案就是利用 MaxCompute 的分布式能力,PyODPS 就能幫你做到這一點。

高效的分詞

當我們提出用 PyODPS DataFrame 改寫時,很多人望而卻步,現實情況中,我上面代碼中隐去的邏輯可能非常非常複雜,有的甚至上千行,使用者會覺得改起來是不是太複雜了,改寫成本是不是太多,其實大可不必擔心,中間的處理過程大部分是不需要動的,我們利用

apply接口

就能做到分布式的執行:

from odps.df import output
out_table = o.get_table('words')
df = o.get_table('word_split').to_df()
@output(out_table.schema.names, out_table.schema.types)
def handle(row):
    import jieba
    words = list(jieba.cut(r[0]))
    #
    # 處理邏輯,産生出 processed_data
    #
    yield processed_data
df.apply(handle, axis=1).persist(out_table)           

我們可以看到複雜邏輯都可以放在

handle

這個函數裡,這個函數會被自動序列化到服務端作為 UDF 使用,在服務端調用執行,而且因為 handle 在服務端實際執行時也是對每一行進行處理的,是以邏輯上是沒有差別的,不同的是,這樣寫的程式在送出到 MaxCompute 端執行時可以有多台機器幫你同時處理資料,可想而知時間上會節約很多,最後調用 persist 接口會将産生的資料直接寫到另一張 MaxCompute 表中,所有的資料産生與消費都在 MaxCompute 叢集完成,也節約了本地的網絡與記憶體。另外在這個例子中我們也使用到了三方包,MaxCompute 是支援自定義函數中使用三方包的,可以參考 [文章] (

https://yq.aliyun.com/articles/591508)

。是以,使用者大可不必擔心代碼改動帶來的成本,事實上主要邏輯幾乎不要改動就可以享受到 MaxCompute 的大規模計算能力。

總結

利用 PyODPS,我們有很多種更高效操作 MaxCompute 資料的方式,大家有自己在實踐過程中獲得的感悟也可以積極分享出來,共同提升 MaxCompute Python 生态。最後,分享一句 Pandas 作者的一句話:精通面向數組的程式設計和思維方式是成為 Python 科學計算牛人的一大關鍵步驟。多多使用 apply, map 接口,讓我們的資料飛起來!