天天看點

如何在 PyFlink 1.10 中自定義 Python UDF?

作者:孫金城(金竹)

我們知道 PyFlink 是在 Apache Flink 1.9 版新增的,那麼在 Apache Flink 1.10 中 Python UDF 功能支援的速度是否能夠滿足使用者的急切需求呢?

如何在 PyFlink 1.10 中自定義 Python UDF?

Python UDF 的發展趨勢

直覺的判斷,PyFlink Python UDF 的功能也可以如上圖一樣能夠迅速從幼苗變成大樹,為啥有此判斷,請繼續往下看…

Flink on Beam

我們都知道有 Beam on Flink 的場景,就是 Beam 支援多種 Runner,也就是說 Beam SDK 編寫的 Job 可以運作在 Flink 之上。如下圖所示:

如何在 PyFlink 1.10 中自定義 Python UDF?

上面這圖是 Beam Portability Framework 的架構圖,他描述了 Beam 如何支援多語言,如何支援多 Runner,單獨說 Apache Flink 的時候我們就可以說是 Beam on Flink,那麼怎麼解釋 Flink on Beam 呢?

如何在 PyFlink 1.10 中自定義 Python UDF?

在 Apache Flink 1.10 中我們所說的 Flink on Beam 更精确的說是 PyFlink on Beam Portability Framework。我們看一下簡單的架構圖,如下:

如何在 PyFlink 1.10 中自定義 Python UDF?

Beam Portability Framework 是一個成熟的多語言支援架構,架構高度抽象了語言之間的通信協定(gRPC),定義了資料的傳輸格式(Protobuf),并且根據通用流計算架構所需要的元件,抽象個各種服務,比如 DataService,StateService,MetricsService 等。在這樣一個成熟的架構下,PyFlink 可以快速的建構自己的 Python 算子,同時重用 Apache Beam Portability Framework 中現有 SDK harness 元件,可以支援多種 Python 運作模式,如:Process,Docker,etc.,這使得 PyFlink 對 Python UDF 的支援變得非常容易,在 Apache Flink 1.10 中的功能也非常的穩定和完整。那麼為啥說是 Apache Flink 和 Apache Beam 共同打造呢,是因為我發現目前 Apache Beam Portability Framework 的架構也存在很多優化的空間,是以我在 Beam 社群進行了

優化讨論

,并且在 Beam 社群也貢獻了

20+ 的優化更新檔

概要了解了 Apache Flink 1.10 中 Python UDF 的架構之後,我們還是切入的代碼部分,看看如何開發和使用 Python UDF。

如何定義 Python UDF

在 Apache Flink 1.10 中我們有多種方式進行 UDF 的定義,比如:

  • Extend ScalarFunction, e.g.:
class HashCodeMean(ScalarFunction):
   def eval(self, i, j):
       return (hash(i) + hash(j)) / 2           
  • Lambda Functio
lambda i, j: (hash(i) + hash(j)) / 2           
  • Named Function
def hash_code_mean(i, j):
   return (hash(i) + hash(j)) / 2           
  • Callable Function
class CallableHashCodeMean(object):
   def __call__(self, i, j):
       return (hash(i) + hash(j)) / 2           

我們發現上面定義函數除了第一個擴充 ScalaFunction 的方式是 PyFlink 特有的,其他方式都是 Python 語言本身就支援的,也就是說,在 Apache Flink 1.10 中 PyFlink 允許以任何 Python 語言所支援的方式定義 UDF。

如何使用 Python UDF

那麼定義完 UDF 我們應該怎樣使用呢?Apache Flink 1.10 中提供了 2 種 Decorators,如下:

  • Decorators - udf(), e.g. :
udf(lambda i, j: (hash(i) + hash(j)) / 2,
      [for input types], [for result types])           
  • Decorators - @udf, e.g. :
@udf(input_types=..., result_type=...) 
     def hash_code_mean(…):
               return …           

然後在使用之前進行注冊,如下:

st_env.register_function("hash_code", hash_code_mean)           

接下來就可以在 Table API/SQL 中進行使用了,如下:

my_table.select("hash_code_mean(a, b)").insert_into("Results")           

目前為止,我們已經完成了 Python UDF 的定義,聲明和注冊了。接下來我們還是看一個完整的示例吧:)

案例描述

  • 需求

假設蘋果公司要統計該公司産品在雙 11 期間各城市的銷售數量和銷售金額分布情況。

  • 資料格式

每一筆訂單是一個字元串,字段用逗号分隔, 例如:

ItemName, OrderCount, Price, City
-------------------------------------------
iPhone 11, 30, 5499, Beijing\n
iPhone 11 Pro,20,8699,Guangzhou\n           

案例分析

根據案例的需求和資料結構分析,我們需要對原始字元串進行結構化解析,那麼需要一個按“,”号分隔的 UDF(split) 和一個能夠将各個列資訊展平的 DUF(get)。同時我們需要根據城市進行分組統計。

核心實作

UDF 定義

  • Split UDF
@udf(input_types=[DataTypes.STRING()],
           result_type=DataTypes.ARRAY(DataTypes.STRING()))
  def split(line):
       return line.split(",")           
  • Get UDF
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], result_type=DataTypes.STRING())
def get(array, index):
       return array[index]           

注冊 UDF

  • 注冊 Split UDF
t_env.register_function("split", split)           
  • 注冊 Get UDF
t_env.register_function("get", get)           

核心實作邏輯

如下代碼我們發現核心實作邏輯非常簡單,隻需要對資料進行解析和對資料進行集合計算:

t_env.from_table_source(SocketTableSource(port=9999))\        .alias("line")\        .select("split(line) as str_array")\        .select("get(str_array, 3) as city, "                     "get(str_array, 1).cast(LONG) as count, "                     "get(str_array, 2).cast(LONG) as unit_price")\        .select("city, count, count * unit_price as total_price")\       
        .group_by("city")\        .select("city, sum(count) as sales_volume, sum(total_price)   
         as sales")\
       .insert_into("sink")
t_env.execute("Sales Statistic")           

上面的代碼我們假設是一個 Socket 的 Source,Sink 是一個 Chart Sink,那麼最終運作效果圖,如下:

如何在 PyFlink 1.10 中自定義 Python UDF?

我總是認為在部落格中隻是文本描述而不能讓讀者真正的在自己的機器上運作起來的部落格,不是好部落格,是以接下來我們看看按照我們下面的操作,是否能在你的機器上也運作起來?:)

環境

因為目前 PyFlink 還沒有部署到 PyPI 上面,在 Apache Flink 1.10 釋出之前,我們需要通過建構 Flink 的 master 分支源碼來建構運作我們 Python UDF 的 PyFlink 版本。

源代碼編譯

在進行編譯代碼之前,我們需要你已經安裝了

JDK8

Maven3x
  • 下載下傳解壓
tar -xvf apache-maven-3.6.1-bin.tar.gz
mv -rf apache-maven-3.6.1 /usr/local/           
  • 修改環境變量(~/.bashrc)
MAVEN_HOME=/usr/local/apache-maven-3.6.1
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin           

除了 JDK 和 MAVEN 完整的環境依賴性如下:

  • JDK 1.8+ (1.8.0_211)
  • Maven 3.x (3.2.5)
  • Scala 2.11+ (2.12.0)
  • Python 3.6+ (3.7.3)
  • Git 2.20+ (2.20.1)
  • Pip3 19+ (19.1.1)

我們看到基礎環境安裝比較簡單,我這裡就不每一個都貼出來了。如果大家有問題歡迎郵件或者部落格留言。

  • 下載下傳 Flink 源代碼:
git clone https://github.com/apache/flink.git           
  • 編譯
cd flink
mvn clean install -DskipTests
...
...
[INFO] flink-walkthrough-datastream-scala ................. SUCCESS [  0.192 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  18:34 min
[INFO] Finished at: 2019-12-04T23:03:25+08:00
[INFO] ------------------------------------------------------------------------           
  • 建構 PyFlink 釋出包
cd flink-python; python3 setup.py sdist bdist_wheel
...
...
adding 'apache_flink-1.10.dev0.dist-info/WHEEL'
adding 'apache_flink-1.10.dev0.dist-info/top_level.txt'
adding 'apache_flink-1.10.dev0.dist-info/RECORD'
removing build/bdist.macosx-10.14-x86_64/wheel           
  • 安裝 PyFlink(PyFlink 1.10 需要 Python3.6+)
pip3 install dist/*.tar.gz
...
...
Successfully installed apache-beam-2.15.0 apache-flink-1.10.dev0 avro-python3-1.9.1 cloudpickle-1.2.2 crcmod-1.7 dill-0.2.9 docopt-0.6.2 fastavro-0.21.24 future-0.18.2 grpcio-1.25.0 hdfs-2.5.8 httplib2-0.12.0 mock-2.0.0 numpy-1.17.4 oauth2client-3.0.0 pbr-5.4.4 protobuf-3.11.1 pyarrow-0.14.1 pyasn1-0.4.8 pyasn1-modules-0.2.7 pydot-1.4.1 pymongo-3.9.0 pyyaml-3.13 rsa-4.0           

也可以檢視一下,我們核心需要 apache-beam 和 apache-flink,如下指令:

jincheng:flink-python jincheng.sunjc$ pip3 list
Package                       Version  
----------------------------- ---------
alabaster                     0.7.12   
apache-beam                   2.15.0   
apache-flink                  1.10.dev0
atomicwrites                  1.3.0           

如上資訊證明你我們所需的 Python 依賴已經沒問題了,接下來回過頭來在看看如何進行業務需求的開發。

PyFlinlk 的 Job 結構

如何在 PyFlink 1.10 中自定義 Python UDF?

一個完成的 PyFlink 的 Job 需要有外部資料源的定義,有業務邏輯的定義和最終計算結果輸出的定義。也就是 Source connector, Transformations, Sink connector,接下來我們根據這個三個部分進行介紹來完成我們的需求。

Source Connector

我們需要實作一個 Socket Connector,首先要實作一個 StreamTableSource, 核心代碼是實作 getDataStream,代碼如下:

@Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
    return env.socketTextStream(hostname, port, lineDelimiter, MAX_RETRY)
      .flatMap(new Spliter(fieldNames.length, fieldDelimiter, appendProctime))
      .returns(getReturnType());
  }           

上面代碼利用了 StreamExecutionEnvironment 中現有 socketTextStream 方法接收資料,然後将業務訂單資料傳個一個 FlatMapFunction, FlatMapFunction 主要實作将資料類型封裝為 Row,詳細代碼查閱

Spliter

同時,我們還需要在 Python 封裝一個 SocketTableSource,詳情查閱

socket_table_source.py

Sink Connector

我們預期要得到的一個效果是能夠将結果資料進行圖形化展示,簡單的思路是将資料寫到一個本地的檔案,然後在寫一個 HTML 頁面,使其能夠自動更新結果檔案,并展示結果。是以我們還需要自定義一個 Sink 來完成該功能,我們的需求計算結果是會不斷的更新的,也就是涉及到 Retraction(如果大家不了解這個概念,可以查閱我以前的部落格),目前在 Flink 裡面還沒有預設支援 Retract 的 Sink,是以我們需要自定義一個 RetractSink,比如我們實作一下 CsvRetractTableSink。

CsvRetractTableSink 的核心邏輯是緩沖計算結果,每次更新進行一次全量(這是個純 demo,不能用于生産環境)檔案輸出。源代碼查閱

CsvRetractTableSink

同時我們還需要利用 Python 進行封裝,詳見 chart_table_sink.py。

在 chart_table_sink.py 我們封裝了一個 http server,這樣我們可以在浏覽器中查閱我們的統計結果。

業務邏輯

完成自定義的 Source 和 Sink 之後我們終于可以進行業務邏輯的開發了,其實整個過程自定義 Source 和 Sink 是最麻煩的,核心計算邏輯似乎要簡單的多。

  • 設定 Python 版本(很重要)

如果你本地環境 python 指令版本是 2.x,那麼需要對 Python 版本進行設定,如下:

t_env.get_config().set_python_executable("python3")           

PyFlink 1.10 之後支援 Python 3.6+ 版本。

  • 讀取資料源

PyFlink 讀取資料源非常簡單,如下:

...
...
t_env.from_table_source(SocketTableSource(port=9999)).alias("line")           

上面這一行代碼定義了監聽端口 9999 的資料源,同時結構化 Table 隻有一個名為 line 的列。

  • 解析原始資料

我們需要對上面列進行分析,為了示範 Python UDF,我們在 SocketTableSource中并沒有對資料進行預處理,是以我們利用上面 UDF 定義 一節定義的 UDF,來對原始資料進行預處理。

...
...
.select("split(line) as str_array")        
.select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price")        
.select("city, count, count * unit_price as total_price")           
  • 統計分析

核心的統計邏輯是根據 city 進行分組,然後對 銷售數量和銷售金額進行求和,如下:

...
...
.group_by("city")
.select("city, sum(count) as sales_volume, sum(total_price)   
         as sales")\           
  • 計算結果輸出

計算結果寫入到我們自定義的 Sink 中,如下:

...
...
.insert_into("sink")           
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.demo import ChartConnector, SocketTableSource
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.descriptors import Schema
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
    env,
    environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
t_env.connect(ChartConnector())\
    .with_schema(Schema()
                 .field("city", DataTypes.STRING())
                 .field("sales_volume", DataTypes.BIGINT())
                 .field("sales", DataTypes.BIGINT()))\
    .register_table_sink("sink")


@udf(input_types=[DataTypes.STRING()],
     result_type=DataTypes.ARRAY(DataTypes.STRING()))
def split(line):
    return line.split(",")


@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()],
     result_type=DataTypes.STRING())
def get(array, index):
    return array[index]

t_env.get_config().set_python_executable("python3")

t_env.register_function("split", split)
t_env.register_function("get", get)
t_env.from_table_source(SocketTableSource(port=6666))\
    .alias("line")\
    .select("split(line) as str_array")\
    .select("get(str_array, 3) as city, "
            "get(str_array, 1).cast(LONG) as count, "
            "get(str_array, 2).cast(LONG) as unit_price")\
    .select("city, count, count * unit_price as total_price")\
    .group_by("city")\
    .select("city, "
            "sum(count) as sales_volume, "
            "sum(total_price) as sales")\
    .insert_into("sink")

t_env.execute("Sales Statistic")           

上面代碼中大家會發現一個陌生的部分,就是 from pyflink.demo import ChartConnector, SocketTableSource. 其中 pyflink.demo 是哪裡來的呢?其實就是包含了上面我們介紹的 自定義 Source/Sink(Java&Python)。下面我們來介紹如何增加這個 pyflink.demo 子產品。

安裝 pyflink.demo

為了大家友善我把自定義 Source/Sink(Java&Python)的源代碼放到了這裡 ,大家可以進行如下操作:

  • 下載下傳源碼
git clone https://github.com/sunjincheng121/enjoyment.code.git           
  • 編譯源碼
cd enjoyment.code/PyUDFDemoConnector/; mvn clean install           
  • 建構釋出包
python3 setup.py sdist bdist_wheel
...
...
adding 'pyflink_demo_connector-0.1.dist-info/WHEEL'
adding 'pyflink_demo_connector-0.1.dist-info/top_level.txt'
adding 'pyflink_demo_connector-0.1.dist-info/RECORD'
removing build/bdist.macosx-10.14-x86_64/wheel           
  • 安裝 Pyflink.demo
pip3 install dist/pyflink-demo-connector-0.1.tar.gz
...
...
Successfully built pyflink-demo-connector
Installing collected packages: pyflink-demo-connector
Successfully installed pyflink-demo-connector-0.1           

出現上面資訊證明已經将 PyFlink.demo 子產品成功安裝。接下來我們可以運作我們的示例了 :)

運作示例

示例的代碼在上面下載下傳的源代碼裡面已經包含了,為了簡單,我們利用 PyCharm 打開enjoyment.code/myPyFlink。同時在 Terminal 啟動一個端口:

nc -l 6666           

啟動 blog_demo,如果一切順利,啟動之後,控制台會輸出一個 web 位址,如下所示:

如何在 PyFlink 1.10 中自定義 Python UDF?

我們打開這個頁面,開始是一個空白頁面,如下:

如何在 PyFlink 1.10 中自定義 Python UDF?

我們嘗試将下面的資料,一條,一條的發送給 Source Connector:

iPhone 11,30,5499,Beijing
iPhone 11 Pro,20,8699,Guangzhou
MacBook Pro,10,9999,Beijing
AirPods Pro,50,1999,Beijing
MacBook Pro,10,11499,Shanghai
iPhone 11,30,5999,Shanghai
iPhone 11 Pro,20,9999,Shenzhen
MacBook Pro,10,13899,Hangzhou
iPhone 11,10,6799,Beijing
MacBook Pro,10,18999,Beijing
iPhone 11 Pro,10,11799,Shenzhen
MacBook Pro,10,22199,Shanghai
AirPods Pro,40,1999,Shanghai           

當輸入第一條訂單 iPhone 11,30,5499,Beijing,之後,頁面變化如下:

如何在 PyFlink 1.10 中自定義 Python UDF?

随之訂單資料的不斷輸入,統計圖不斷變化。一個完整的 GIF 示範如下:

如何在 PyFlink 1.10 中自定義 Python UDF?

小結

本篇從架構到 UDF 接口定義,再到具體的執行個體,向大家介紹了在 Apache Flink 1.10 釋出之後,如何利用 PyFlink 進行業務開發,其中 使用者自定義 Source 和 Sink部分比較複雜,這也是目前社群需要進行改進的部分(Java/Scala)。真正的核心邏輯部分其實比較簡單,為了大家按照本篇進行實戰操作有些成就感,是以我增加了自定義 Source/Sink 和圖形化部分。但如果大家想簡化執行個體的實作也可以利用 Kafka 作為 Source 和 Sink,這樣就可以省去自定義的部分,做起來也會簡單一些。

閱讀原文可點選:

原文連結