天天看點

Protobuf 在知乎大資料場景的應用

作者:閃念基因

1 背景

Protobuf(Protocol Buffers)是 Google 公司開發的一種跨語言和平台的序列化資料結構的方式,是一個靈活的、高效的用于序列化資料的協定。得益于其優秀的編解碼性能與簡單易用的特點,被廣泛應用于各大場景,如通信協定、資料存儲等。

Protobuf 在知乎大資料主要有以下兩種場景:

一是 Protobuf 資料入倉,知乎目前有很大一部分資料,如服務日志,特征資料等,都是采用 Protobuf 格式進行存儲。這些資料會被發送到 Kafka 或 Pulsar,再通過 Flink 進行消費,最後落入到 Hive 表中,供離線分析。但是因為 Flink 在早期版本(知乎目前的 Flink 版本是 1.13.2)沒有提供通用的 Protobuf Format 解決方案,是以使用者在使用 Flink 進行消費的時候,隻能自己寫 Jar 包程式來解析 Protobuf 的資料,再拼接成 Hive 表的資料寫入 HDFS 中。

二是 Protobuf 出倉,使用者将經過離線 ETL 處理的 Hive 表資料,組織成 Protobuf 的格式,再寫回到線上資料庫如 Redis,MySQL,TiDB或者是 Kafka/Pulsar 之類的消息系統。這種場景使用者也是自己寫 Spark 或者 Flink Jar 包程式來處理,使用者需要自己讀取 Hive 表的資料,再将表的資料組織成 Protobuf 格式。

這兩類場景在使用 Protobuf 的痛點如下:

  1. 都需要使用者自己寫 Jar 包來做 Protobuf 的編碼與解碼,開發門檻高;
  2. Jar 包程式不容易維護,不同的使用者有自己的 Protobuf 定義,每個 Protobuf 的解析都要寫一個程式,導緻項目備援;
  3. 每次 Protobuf 增加字段時,從釋出到上線流程過長,容易出錯;
  4. 代碼容易失傳,尤其展現在交接過程中。

基于以上痛點,我們決定開發通用的 Protobuf 處理工具。

2 Flink Protobuf Format

首先是針對 Protobuf 入倉的場景,我們開發了 Flink Protobuf Format,該 Format 基于 Protobuf Message 的 Java 反射 API 實作,利用使用者編譯出的 Java 類解碼 Protobuf Message,再拼接成 Flink 的資料結構。依托于 Flink SQL 豐富的 Connector,被解析後的 Protobuf 能夠落入下遊的任意資料源中,落入 Hive 自然也不在話下。

我們以下面的 Message 為例,簡單描述一下 Protobuf Format 的使用:

syntax = "proto3";
package com.zhihu.platform.proto.example;
option java_package = "com.zhihu.platform.proto.example";
option java_multiple_files = true;

message SimpleTest {
  optional int64 uid = 1;
  optional string name = 2;
  optional int32 int32_test = 3;
  optional bytes bytes_test = 4;
  optional double double_test = 5;
  optional float float_test = 6;
  repeated int32 int32_array_test = 7;
  optional InnerMessageTest innerMessage = 8;
  repeated InnerMessageTest innerMessage_arr = 9;
  optional State enum_test = 10;
  map<int64, int64> simple_map_test = 11;
  map<string, InnerMessageTest> nest_map_test = 12;

  message InnerMessageTest{
    optional int64 v1 = 1;
    optional int32 v2 = 2;
  }

  enum State {
    RUNNING = 0;
    FINISHED = 1;
  }
}           

Format 的使用方式如下,以 Kafka Connector 為例:

CREATE TABLE `test_catalog`.`test_database`.`test_table`
(
  `uid` BIGINT,
  `name` STRING,
  `int32_test` INT,
  `bytes_test` BYTES,
  `double_test` DOUBLE,
  `float_test` FLOAT,
  `int32_array_test` ARRAY<INT>,
  `innerMessage` ROW<`v1` BIGINT, `v2` INT>,
  `innerMessage_arr` ARRAY<ROW<`v1` BIGINT, `v2` INT>>,
  `enum_test` STRING,
  `simple_map_test` MAP<BIGINT, BIGINT>,
  `nest_map_test` MAP<STRING, ROW<`v1` BIGINT, `v2` INT>>
)
WITH (
  'properties.bootstrap.servers' = 'xxx',
  'connector' = 'kafka',
  'topic' = 'xxxx',
  'format' = 'protobufV1',
  'protobufV1.class-name' = 'com.zhihu.platform.proto.example.SimpleTest',
  'protobufV1.ignore-parse-errors' = 'true'
);           

這裡 Protobuf Format 我們命名為 protobufV1 而不用 protobuf,原因是我們期待 Flink 社群提供 Protobuf Format 的解決方案,是以将 protobuf 這個名字預留了。事實證明預留 protobuf 的名稱是明智的,Flink 在高版本果然提供了 Protobuf Format,與我們的方案比較類似,感興趣的話可以檢視 Flink 的最新文檔,裡面有相關介紹。

2.1 解碼方式的選擇

Protobuf 的解碼有以下兩種備選方案:

  1. Dynamic Message 的方式解碼,這種方式的好處在于不用将 proto 檔案真正編譯成某種程式設計語言,可以做到語言無關,但是解碼性能會有較大的損耗;
  2. 編譯 proto 檔案為 Java 類,利用 Java 類做解碼,這樣做的好處是能夠獲得最高的解碼效率和最大的解碼性能,缺點是需要使用者自行編譯 proto 檔案為 Java 類,對其他語言的使用者不友好,需要使用者有一定的 Java 基礎,另外不同的 proto 檔案對應的 Java 類也不同,如何讓程式以一種比較優雅的形式擷取到使用者編譯的 Protobuf Jar 包,也是一件比較困難的事情。

經過内部讨論,我們選擇了方案 2,也就是利用使用者編譯出來的 Protobuf Java 類來解碼 Protobuf 資料,主要考慮如下:

  1. Protobuf Java 類解碼效率最高,性能最好;
  2. 知乎内部的 Protobuf 釋出有比較嚴格的流程,CI 建構時,會同時将 proto 檔案編譯成 Golang 和 Java 兩種語言,并且會将 Java 版本釋出到内部 Maven 倉庫中,是以每一個線上的 Protobuf 格式,都能夠很輕易找到其對應的 Java Jar 包;
  3. 知乎内部有 Flink SQL Gateway,能夠很輕易的讓 Flink 程式擷取到使用者編譯的 Java 包(具體參考下面的章節)。

2.2 Protobuf Jar 包注入

Protobuf Jar 包在我們的設計中有兩種注入方式:

  1. 在 Flink 叢集啟動前,将 Jar 包下載下傳到 Flink 的 lib 目錄内,再啟動叢集,這樣 Flink 叢集内就存在 Protobuf 的 Jar 包了,我們也能很輕易的擷取到 Protobuf Message 對應的 Java 類;
  2. 通過 Flink SQL 裡 Table 的 properties 屬性,将 Jar 包的 Maven 位址傳給 Flink Format,Flink Format 在接受到 Jar 包的位址後,利用 URLClassloader 加載 Protobuf Message 對應的 Java 類。

2.2.1 利用 JuiceFS 動态注入 Jar 包

首先看 Jar 包注入的第一種方式,前一段時間我們分享過 利用 JuiceFS 給 Flink 容器啟動加速,簡單描述就是知乎的 Flink 叢集都是部署在 K8s 上的,Flink 的容器在啟動前,會讀取某些特定的環境變量的值,從 JuiceFS 動态下載下傳啟動所需的 Jar 包,Jar 包下載下傳完成後,再啟動 Flink 叢集。

知乎内部的 Flink SQL 都是通過 Flink SQL Gateway 送出的,我們在 Flink SQL Gateway 上擴充了 ADD JAR 文法,用于添加外部的 Jar 包到 Flink 容器的環境變量中,Flink 叢集在啟動時,就會下載下傳 ADD JAR 指定的 Jar 包,添加到叢集中。這樣使用者隻需要通過寫 ADD JAR 指令,就能把自己的 Protobuf Message 對應的 Jar 包注入到 Flink 叢集中。

具體的使用方式如下:

-- 添加自己的 protobuf 對應版本的包,防止不同版本的 Protobuf 沖突
ADD JAR 'https://zhihu-juicefs.com/protobuf-java-3.19.1.jar';
-- 添加自己編譯出來的 Message 包
ADD JAR 'https://zhihu-juicefs.com/simple-test.jar';
CREATE TABLE `test_catalog`.`test_database`.`test_table`
(
  `uid` BIGINT,
  `name` STRING,
  `int32_test` INT,
  `bytes_test` BYTES,
  `double_test` DOUBLE,
  `float_test` FLOAT,
  `int32_array_test` ARRAY<INT>,
  `innerMessage` ROW<`v1` BIGINT, `v2` INT>,
  `innerMessage_arr` ARRAY<ROW<`v1` BIGINT, `v2` INT>>,
  `enum_test` STRING,
  `simple_map_test` MAP<BIGINT, BIGINT>,
  `nest_map_test` MAP<STRING, ROW<`v1` BIGINT, `v2` INT>>
)
WITH (
  'properties.bootstrap.servers' = 'xxx',
  'connector' = 'kafka',
  'topic' = 'xxxx',
  'format' = 'protobufV1',
  'protobufV1.class-name' = 'com.zhihu.platform.proto.example.SimpleTest',
  'protobufV1.ignore-parse-errors' = 'true'
);           

Jar 包加載的流程圖如下:

Protobuf 在知乎大資料場景的應用

2.2.2 利用 URLClassloader 動态加載 JuiceFs 上的 Jar 包

其次再看 Jar 包注入的第二種方式,利用 URLClassloader 遠端加載 Jar 包。

這裡要求使用者在建表的時候,指定 Format 所使用的 Jar 包:

CREATE TABLE `test_catalog`.`test_database`.`test_table`
(
  `uid` BIGINT,
  `name` STRING,
  `int32_test` INT,
  `bytes_test` BYTES,
  `double_test` DOUBLE,
  `float_test` FLOAT,
  `int32_array_test` ARRAY<INT>,
  `innerMessage` ROW<`v1` BIGINT, `v2` INT>,
  `innerMessage_arr` ARRAY<ROW<`v1` BIGINT, `v2` INT>>,
  `enum_test` STRING,
  `simple_map_test` MAP<BIGINT, BIGINT>,
  `nest_map_test` MAP<STRING, ROW<`v1` BIGINT, `v2` INT>>
)
WITH (
  'properties.bootstrap.servers' = 'xxx',
  'connector' = 'kafka',
  'topic' = 'xxxx',
  'format' = 'protobufV1',
  'protobufV1.class-name' = 'com.zhihu.platform.proto.example.SimpleTest',
  'protobufV1.ignore-parse-errors' = 'true',
  'protobufV1.jar-urls' = 'https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar'
);           

Protobuf Format 在檢查到有 jar-urls 這個屬性時,會建立一個 URLClassloader,并且将該屬性對應 Jar 包連結注入到 classpath 中,以供加載。

這裡值得一提的是,一開始我們在 URLClassloader 裡傳入的是内部 Maven 上 Jar 的 URL,但是 Flink 程式在啟動時,經常出現 ClassNotFoundException 的錯誤,加載不到使用者的 Class。最後定位原因是内部的鏡像倉庫在負載比較高的情況下,偶爾會出現問題,而 URLClassloader 在通路這些 URL 失敗的情況下,就會跳過這些 URL 對應的 Jar 包,也不會報錯,等到真正要使用這些 URL 上的 Jar 包裡的 Class 時,就會報 ClassNotFoundException 的錯誤。

鏡像倉庫本意是用于 CI 建構以及公共類庫釋出,本身就不是為高并發設計的,是以我們需要尋找另一種解決方案。我們最開始想到的解決方案就是将使用者需要的 Jar 包從内部的 Maven 倉庫上緩存一份到 JuiceFS 上,然後再讓 URLClassloader 從 JuiceFS 上加載。因為我們之前用 JuiceFS 已經快一年了,沒有出過任何問題,是以對 JuiceFS 在這個場景的可用性是非常有信心的。

在 JuiceFS 社群的幫助下,我們選擇了以 webdav 的方式來通路 JuiceFS,這裡不選擇 s3 proxy 的方式是因為 URLClassloader 不适合使用需要認證的 URL,而 webdav 我們以 read-only 的方式啟動,就能輕易被 URLClassloader 通路到,無需擔心驗證的問題。

Jar 包加載的流程圖如下:

Protobuf 在知乎大資料場景的應用

2.3 建表工具

Protobuf Message 可能出現多級嵌套的複雜情況,這個時候人力建表已經是幾乎不可能完成的事情,為此我們開發了專門的建表工具來輔助使用者建立與 Protobuf Message 結構相比對的 Hive 或 Flink 表。

以面給出的 proto 檔案為例:

Protobuf 在知乎大資料場景的應用

3 Protobuf Hive UDF

其次是針對 Protobuf 出倉的場景,這類場景一般是批處理,比較适合在 Hive 或 Spark 裡進行。

這裡我們開發了兩種 Hive UDF,用于将 Hive 表轉換成 Protobuf Message,使用者在獲得 Message 的二進制資料後,再使用我們内部的資料內建平台,将資料寫入到其他存儲中。

3.1 将 Message 拍平指派

還是以上文給出的 proto 檔案為例,我們開發的第一種 Protobuf UDF 使用方式如下:

SET protobuf.jar-urls=https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar;
SET protobuf.class-name=com.zhihu.platform.proto.example.SimpleTest;
SELECT to_protobuf(
               "uid", 1,
               "name", "zhangsan",
               "bytes_test", "12345",
               "double_test", cast(123.456 as double),
               "float_test", cast(123.456 as float),
               "enum_test", "running",
               "innerMessage.v1", 1,
               "innerMessage.v2", 2,
               "int32_array_test(0)", 0,
               "int32_array_test(1)", 1,
               "innerMessage_arr(0).v1", 3,
               "innerMessage_arr(0).v2", 4,
               "innerMessage_arr(1).v1", 5,
               "innerMessage_arr(1).v2", 6,
               "simple_map_test[1]", cast(1 as bigint),
               "simple_map_test[2]", cast(2 as bigint),
               "nest_map_test[a].v1", cast(7 as bigint),
               "nest_map_test[a].v2", 8,
               "nest_map_test[b].v1", cast(9 as bigint),
               "nest_map_test[b].v2", 10
           ) AS protobuf_bytes;           

這樣使用者可以直接指定 Message 内任意的屬性的值:

  1. 如果屬性之間具有層級/嵌套關系,則用 . 分隔;
  2. 數組用 () 将下标括起來,表示數組的第幾個元素;
  3. MAP 用 [] 将 key 括起來,表示這個 key 對應的 value。

但是這種寫法也有一些缺陷:

  1. 對于大數組或者大 MAP, UDF 寫起來會非常複雜;
  2. 對于不确定長度的數組,或者是 key 不固定的 MAP,無法指派。

是以此 UDF 使用起來簡單,但是隻适合比較簡單的 Protobuf Message。

3.2 利用 struct 指派

針對第一種 UDF 的限制,我們又開發了第二種 UDF,使用方式如下:

SET protobuf.jar-urls=https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar;
SET protobuf.class-name=com.zhihu.platform.proto.example.SimpleTest;
SELECT to_protobuf(
               named_struct(
                       "uid", 1,
                       "name", "zhangsan",
                       "bytes_test", "12345",
                       "double_test", cast(123.456 as double),
                       "float_test", cast(123.456 as float),
                       "enum_test", "running",
                       "innerMessage", named_struct("v1", 1, "v2", 2),
                       "innerMessage_arr", array(named_struct("v1", 1, "v2", 2), named_struct("v1", 3, "v2", 4))
                   )
           ) AS protobuf_bytes;           

這樣,使用者可以使用 Hive 内自帶的 named_struct 函數,将值拼接成與 Protobuf Message 相對應的 struct 結構,直接給整個 Message 指派。

比如現在有一張 Hive 表,結構與 Protobuf 的 Message 十分類似:

CREATE TABLE `simple_test`(
  `uid` bigint,
  `name` string,
  `int32_test` int,
  `bytes_test` binary,
  `double_test` double,
  `float_test` float,
  `int32_array_test` array<int>,
  `innermessage` struct<v1:bigint,v2:int>,
  `innermessage_arr` array<struct<v1:bigint,v2:int>>,
  `enum_test` string,
  `simple_map_test` map<bigint,bigint>,
  `nest_map_test` map<string,struct<v1:bigint,v2:int>>
);           

我們就可以很簡單得到 Protobuf Message:

select to_protobuf(
               named_struct("uid", uid,
                            "name", name,
                            "bytes_test", bytes_test,
                            "double_test", double_test,
                            "float_test", float_test,
                            "enum_test", enum_test,
                            "innerMessage",innerMessage,
                            "int32_array_test",int32_array_test,
                            "innerMessage_arr",innerMessage_arr,
                            "simple_map_test",simple_map_test,
                            "nest_map_test",nest_map_test
                   )
           ) AS protobuf_bytes FROM simple_test;           

3.3 Hive Jar 包沖突解決

說完了 UDF 的設計,接下來說一個我們在開發中十分頭疼的問題,那就是 Jar 包沖突。因為 Hive 引入了 Protobuf 2 相關的依賴,是以在使用者傳入的 Protobuf 依賴為 Protobuf 3 時,會出現 NoSuchMehodError 等錯誤。

Jar 包沖突簡單來說就是在不同的 Jar 包内有相同名字的 Class,但是他們的成員屬性和方法不盡相同,程式在調用這個 Class 的方法時,可能因為加載到了另一個同名的 Class,導緻調用出錯。

Jar 包沖突最常見的解決方案是利用 Maven 的 shade 插件,将自身所引用的帶沖突的 package rename 到其他的 package 下,這樣原先有沖突的類因為被強制重命名了,就不是同名類,不會出現沖突了。這種方式簡單粗暴,但是不适用我們的場景,主要是這種方式需要将 UDF 的 Jar 包與使用者的 Protobuf 綁定到一起打包來進行統一的重命名,這樣會導緻每一個使用者都要重新打包自己的 Protobuf,不便于維護與管理。

最後,我們參考了 Flink 的 ChildFirstClassLoader,在加載使用者的 Jar 包時,使用 child-first 的加載方式。

在前面我們給出的樣例中,有這樣一段注入 Jar 包的 SQL:

SET protobuf.jar-urls=https://zhihu-juicefs.com/protobuf-java-3.19.1.jar,https://zhihu-juicefs.com/simple-test.jar;           

這裡不用 Hive 的 ADD JAR 文法注入 Jar 的原因就是因為 Jar 包沖突。是以我們定義了 protobuf.jar-urls 這個屬性,UDF 會從 Hive Server 的目前 Session 中讀取 SessionConf 中這個屬性的值,再将屬性值内的這些 URL 放入 ChildFirstClassLoader,以 child-first 的方式加載,成功解決了 Jar 包沖突的問題。

這裡我們 Jar 包依然采用 JuiceFS 做緩存。

3.4 Hive Server Metaspace OOM 解決

因為我們引入了 ChildFirstClassLoader,而 UDF 又沒有類似 close 的接口來供我們釋放 ClassLoader 的相關資源,這樣會導緻 Hive Server 每次在使用 Protobuf UDF 時加載進來的類都不會被釋放,進而導緻 Metaspace OOM。

這裡我們通過研究 Hive 的 ADD JAR 文法,發現 Hive Server 的每個連接配接都有自己的 SessionConf 以及自己的 Session ClassLoader,這些資訊都被封裝到了 SessionState 對象,在連接配接斷開時,SessionState 會釋放 Session ClassLoader。而所有的 SeesionState 都封裝在一個全局的 ThreadLocal 中,我們在 UDF 内能夠輕易的取到。這樣我們隻需要将我們建立的 ChildFirstClassLoader 作為 Session Classloader,将原來的 Session Classloader 作為 ChildFirstClassLoader 的Parent,這樣在連接配接關閉時,ChildFirstClassLoader 将會被自動關閉,釋放加載的使用者 Jar 包。

代碼大緻如下:

SessionState sessionState = SessionState.get();
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
if (sessionState != null) {
  parentClassLoader = sessionState.getConf().getClassLoader();
}
ChildFirstClassLoader childFirstClassLoader = new ChildFirstClassLoader(urls, parentClassLoader,);
if (sessionState != null) {
  sessionState.getConf().setClassLoader(childFirstClassLoader);
}
           

作者:胡夢宇

出處:https://zhuanlan.zhihu.com/p/586120009

繼續閱讀