天天看點

FLINK Notebook 混合程式設計:PYTHON (一)

本文介紹了 Py4j的使用以及 Flink官方如何使用 Py4j進行混合語言程式設計,最後會介紹下我們會應用這種技術在我們的 Flink Notebook 服務,來建立一個混合語言程式設計環境。

Flink Notebook 服務是我司自研的基于Notebook方式的Flink 開發平台,他支援使用者通過SQL方式和JAR包方式進行混合程式設計,并通過一些配置,既可完全的在頁面上完成FLINK任務的開發工作,如圖:

FLINK Notebook 混合程式設計:PYTHON (一)

​ 通過不同的Notebook Type我們可以加載不同類型的元件,通過table結果集流轉方式,承接上下遊,以完成相應的功能。目前的插件類型主要是主要分:1.SQL組建:可以自由撰寫SQL,2.格式化組建:sink或者source,有具體的格式,标準的前端組建對應,3.JAR包自定義元件,通過使用者上傳自己開發的jar包完成對應的邏輯。

​ 對于Jar包自定義元件來說,他是為了解決1%的特異性需求的,但問題是其代碼不可見,邏輯也相對自由,有違Notebook的初衷,是以,我們想設計一種Notebook的Type,支援可視化的Python編寫,可以直接将代碼在頁面上進行開發。

​ Flink 本身來說,就有PyFlink 和 Python UDF support,是以python和 flink的耦合度應該很高,是以我們要了解Flink是怎麼做的,進而研究我們應該如何去做,是以本文會分成以下3個部分來介紹整個混編邏輯:

1. Java與Python 通信:Py4J
2. Py4j in Flink
3. Notebook with Python
           

Py4j 介紹

Py4j可以使運作于python解釋器的python程式動态的通路java虛拟機中的java對象。Java方法可以像java對象就在python解釋器裡一樣被調用, java collection也可以通過标準python collection方法調用。Py4j也可以使java程式回調python對象。

詳細說明可以參考官網 https://www.py4j.org/

安裝以及基本使用也可以參考官網

Py4j可以在系統中建立一個 java和python 之間通信的socket管道。

FLINK Notebook 混合程式設計:PYTHON (一)

我們可以通過一個例子來看整個Py4j是如何工作的。

我們先建立一個想讓python負責具體實作的Java 接口:

public interface TestEnterPoint {
    String gift(HashMap<String,String> a, String b);
}
           

在java 服務端,我們通過以下代碼可以啟動一個簡單的Py4j監聽:

public static void main(String[] args) {
        ListenerApplication application = new ListenerApplication();
        GatewayServer server = new GatewayServer(application);
        server.start(true);
    }
           

ListenerApplication 表示一個允許共享給python的類,她可以是任意java類,包括Map,List等複雜結構化資料:

public class ListenerApplication {
    TestEnterPoint enterPoint = new TestEnterPoint();
    public void setListener(TestEnterPoint enterPoint) {
        this.enterPoint = enterPoint;
    }
    public void notifyAllListeners() {
        HashMap<String,String> map = new HashMap<>();
        map.put("a","aaaa");
        Object returnValue = listener.gift(map,"a");
        System.out.println(returnValue);
    }
}
           

而在Python端,我們可以通過以下代碼運作一個python程式:

from py4j.java_gateway import JavaGateway, CallbackServerParameters

class TestEnterPoint(object):
    def gift(self, map, key):
        return map.get(key)
    class Java:
        implements = ["com.xxxx.xxx.test.py.TestEnterPoint"]

if __name__ == "__main__":
    gateway = JavaGateway(
        callback_server_parameters=CallbackServerParameters())
    listener = TestEnterPoint()
    gateway.entry_point.setListener(listener)
    gateway.entry_point.notifyAllListeners()
    gateway.shutdown()
           

這樣我們就通過Python來實作了一個 map.get(key) 的方法

整個過程中,我們看出幾點對于python來說比較基本的使用方式,那就是,第一,通過Python 中

implements = ["com.xxxx.xxx.test.py.TestEnterPoint"]

的使用方式,我們可以實作一個Java的Interface,第二,通過gateway.entry_point的方式,我們可以拿到java中設定的可共享變量,第三個我們在例子中并沒有呈現,但也是非常基礎的使用,就是通過在python中使用

gateway.jvm.com.xxxx.xxx.test.py.TestServer

的方式,允許python使用任何java的class,允許初始化,允許調用方法,但是他們如果想和java端進行資料通信,則必須通過entry_point來實作。

Py4j in Flink

講完Py4j并且如果把上面的代碼自己拿來試下,應該已經對整個python和java互通有一定了解了,那麼我們Flink中如何使用Py4J來進行混編,也就順理成章的很好了解了,在Flink中,有很多地方使用到了這種技術,包括PyFlink,以及Python UDF support,PyFlink 屬于Pyton為主,也比較複雜,這邊就先就以簡單的Python UDF為例,梳理下Flink的執行邏輯。

在Flink Java中如何使用Python UDF

在Flink 中使用Python的UDF相對來說非常簡單,建立一個Python代碼,比如:

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(s: str):
   return s.replace('hello', 'ni hao')
           

在Flink Java中,需要配置Python環境變量,首先将Python檔案加到環境中去,如果是叢集送出,需要加到依賴中去(使用-pyfs 送出Python檔案),或者遠端的Hdfs檔案。其次需要配置Python的程式依賴環境路徑:

configuration.setString("python.files", "/Users/yourName/test.py");
configuration.setString("python.client.executable", "python3");
configuration.setString("python.executable", "/usr/bin/python3");
           

最後,我們在使用過程中,比如通過SQL使用時候,隻需要如下SQL語句即可:

create temporary system function func1 as 'test1.func1' language python
           

其中test1是python的檔案名"test1.py"而 func1就是上文中的那個python 的function name,如此既可以在java中使用python實作的UDF

Flink是如何實作這些的

​ 在追蹤Flink Sql是如何執行create function過程中,我們發現整個Flink的執行流程大緻如圖:

FLINK Notebook 混合程式設計:PYTHON (一)

​ Flink會通過文法解析後的通過create function的字尾“ language python”判斷是否是Python fuction,如果是,會調用PythonFunctionUtils來擷取function,而PythonFunctionUtil最終通過動态加載的PythonFunctionFactory來最終調用Py4j。這裡可以看見他的邏輯其實也比較簡單,首先就是啟動Py4j的Java端server,然後主要就是通過環境變量,以及configture 裡的各種參數,最終拼接出python的cmd 執行指令,運作指令并通過entryPoint擷取其中的貢獻類。最終生成我們在java端可以用的function。

​ 這塊如果有興趣,在Flink源碼中搜尋 PythonFunctionFactory 可以直接看見相關代碼。

Notebook 混編Python

我們平台是類似Zeeplin的可視化Notebook程式設計頁面,對于我們來說,要在頁面上支援Python程式設計,有幾種方案:
           
  • 隻支援Python UDF
  • 以PyFlink為基礎,配置混合程式設計方案
  • 以Java版為基礎,配置混合程式設計方案

方案一對于我們來說并不難,可以看到Flink官方既是支援Python UDF的,我們隻需要将這個Notebook Part裡的内容,生成Python檔案并添加到環境中一起送出即可,但這種方案沒法解決我們上面提出的一大痛點,使用者的1%需要Jar包開發的非标任務,不是單單可以通過UDF來實作的。

方案二對于我們來說,最大的問題是所有的優化,整個程式體系都是建立在Java 基礎上的,改動會非常巨大。

如此,隻能采取方案三,而方案三的問題是,Flink的原版PyFlink隻建立了 PythonFunctionFactory 和一個 心跳2個 entryPoint,這對我們來說比較局限。是以我們會采取模仿 PythonFunctionFactory 的方式,自己建立Py4j程序,來完成Notebook的混編實作

這裡的詳細設計以及Demo 我們會在下篇文章(二)中放出。謝謝各位。

繼續閱讀