overview
本文檔以
TO_BASE64
函數為例,介紹如何實作、擴充Flink Table&SQL Scalar function。
使用方式
自定義的Scalar函數有多種使用方式,我們以測試代碼來了解一下,具體是如何使用的:
@Test
def testToBase64(): Unit = {
testAllApis(
'f0.toBase64(),
"f0.toBase64()",
"TO_BASE64(f0)",
"VGhpcyBpcyBhIHRlc3QgU3RyaW5nLg==")
testAllApis(
'f8.toBase64(),
"f8.toBase64()",
"TO_BASE64(f8)",
"IFRoaXMgaXMgYSB0ZXN0IFN0cmluZy4g")
//null test
testAllApis(
'f33.toBase64(),
"f33.toBase64()",
"TO_BASE64(f33)",
"null")
testAllApis(
"".toBase64(),
"''.toBase64()",
"TO_BASE64('')",
"")
testAllApis(
'f33.toBase64(),
"f33.toBase64()",
"to_base64(f33)",
"null")
}
從測試代碼可見,大緻上存在三種使用方式:
- Table API調用函數
- Table 字元串表達式使用函數
- SQL 中以字元串的形式使用函數
在SQL使用時函數不區分大小寫,在Table中使用,如果函數無參數,可以省略括号,這些能力由Flink Table&SQL 自身提供支援。
函數實作
一個函數的邏輯是它最核心的部分,這裡我們的示例函數
TO_BASE64
很簡單:
org.apache.flink.table.runtime.functions.ScalarFunctions
/**
* Returns a string's representation that encoded as base64.
*/
def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes())
實作模闆
提供了函數的實作後,我們需要通過反射的方式給函數一個代号以便我們可以在其他地方引用它:
org.apache.flink.table.codegen.calls.BuiltInMethods
val TOBASE64 = Types.lookupMethod(classOf[ScalarFunctions], "toBase64", classOf[String])
然後為SQL定義函數:
org.apache.flink.table.functions.sql.ScalarSqlFunctions
val TO_BASE64 = new SqlFunction(
"TO_BASE64",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
InferTypes.RETURN_TYPE,
OperandTypes.family(SqlTypeFamily.STRING),
SqlFunctionCategory.STRING
)
然後我們需要為SQL中使用該函數再代碼生成的時候如何找到它的實作提供一個綁定關系:
org.apache.flink.table.codegen.calls.FunctionGenerator
addSqlFunctionMethod(
TO_BASE64,
Seq(STRING_TYPE_INFO),
STRING_TYPE_INFO,
BuiltInMethods.TOBASE64)
當函數在字元串表達式中使用時,Flink首先要去正确地解析并驗證它,作為一個節點,這裡需要提供關于該函數的必要的中繼資料資訊:
org.apache.flink.table.expressions.stringExpressions.scala
case class ToBase64(child: Expression) extends UnaryExpression with InputTypeSpec {
override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(STRING_TYPE_INFO)
override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
override private[flink] def validateInput(): ValidationResult = {
if (child.resultType == STRING_TYPE_INFO) {
ValidationSuccess
} else {
ValidationFailure(s"ToBase64 operator requires String input, " +
s"but $child is of type ${child.resultType}")
}
}
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder.call(ScalarSqlFunctions.TO_BASE64, children.map(_.toRexNode))
}
override def toString: String = s"($child).toBase64"
}
接下來,我們為Table API定義函數API:
org.apache.flink.table.api.scala.ImplicitExpressionOperations
/**
* Returns a string's representation that encoded as base64.
*/
def toBase64() = ToBase64(expr)
最後,我們需要為該函數的校驗提供映射資訊:
org.apache.flink.table.validate.FunctionCatalog.scala
"toBase64" -> classOf[ToBase64],
ScalarSqlFunctions.TO_BASE64,
測試與文檔
每個函數實作完成後都必須經過充分的測試,以驗證邏輯的正确性。測試用例補充進
org.apache.flink.table.expressions.ScalarFunctionsTest
。
函數完成後,請補充相關文檔:
Table API: flink/docs/dev/table/tableApi.md
SQL: flink/docs/dev/table/sql.md
示例與參考
LOG2(X): https://github.com/apache/flink/pull/6404/files
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZwpmLulGepV2dftmbpxmRfVGajFGcB9CXt92Yu4GZkV3bsNmLix2ZuAjeuETbvNmL6FWYrh3Nvw1LcpDc0RHaiojIsJye.jpg)