作者 | 春哥大魔王
來源 |
Serverless 公衆号寫在前面
在 SaaS 領域 Salesforce 是佼佼者,其 CRM 的概念已經擴充到了 Marketing、Sales、Service 等領域。那麼 Salesforce 靠什麼變成了這三個行業的解決方案呢?得益于 Salesforce 強大的
aPaaS 平台。
ISV、内部實施、客戶均可以從自己的次元基于 aPaaS 平台建構自己的行業,實作業務定制,甚至是行業定制。因為在此之前隻有在 Sales 方向有專門的 SaaS 産品,而 Marketing 和 Service 都是由自己的 ISV 在各自行業的解決方案。是以 Salesforce 已經從一家 SaaS 公司變成了一家 aPaaS 平台公司了。
搭建一個 aPaaS 平台是需要很長時間的,當然也可以基于一些公有雲産品的 Serverless 方案實作現有系統的靈活性與擴充性,進而實作針對于不同客戶的定制。
什麼是 Serverless
Serverless 由兩部分組成,Server 和 Less。
- 前者可以了解為其解決方案範圍處在服務端;
- 後者可以譯為少量的;
組合起來就是較少服務端幹預的服務端解決方案。
與 Serverless 相對的是 Serverfull,比較下對應的概念可能更便于了解。
在 Serverfull 時代,研發傳遞流程一般有三個角色:RD,PM,QA。
RD 根據 PM 的 PRD 進行功能開發,傳遞到 QA 進行測試,測試完成之後釋出到伺服器。由運維人員規劃伺服器規格、數量、機房部署、節點擴縮容等,這種更多由人力處理的時代就是 Serverfull 時代。
之後進入了 DevOps 時代。這個時代運維自己開發一套運維控制台,可以讓研發同學在控制台上自己進行服務觀測、資料查詢、運維處理等,運維同學的工作輕松了不少,這個階段主要釋放了運維同學的人力。
而到了 Serverless 時代,這套運維控制台能力越來越豐富,可以實作按配置的自動擴縮容、性能監控、DevOps 流水線等,同時侵入到研發流程側,比如自動釋出流水線、編譯打包、代碼品質監測、灰階釋出、彈性擴縮等流程基本不需要人力處理了,這就是 Serverless 時代。
Serverless 怎麼用
相信你有過這樣的經曆,在一個 Web 界面上,左側寫代碼,右側展示執行效果。
- 寫的是代碼塊,代碼數量不會特别大;
- 代碼運作速度快;
- 支援多種程式設計語言;
- 可以支援不可預計的流量洪峰沖擊。
以阿裡雲解決方案看下如何支援多語言架構:
抽象來說,前端隻需要将代碼片段和程式設計語言的辨別傳給 Server 端即可,等待響應結果。Server 端可以針對于不同程式設計語言進行 runtime 分類、預處理等工作。
Serverless 怎麼做
很多人把 Serverless 看做是 FC(function compute:函數計算),使用函數計算,無需業務自己搭建 IT 基礎設施,隻需要編碼并上傳代碼。函數計算會按需為你準備好計算資源,彈性、可靠地運作,并提供 trace、日志查詢、監控告警等治理能力。
比如:
在 FC 中有服務和函數之分。一個服務可以包含多個函數。我們可以用微服務了解,我們通過 golang 或 java 搭建了一個微服務架構,而 FC 服務就是其中的類,FC 函數是類中的一個方法:
差別在于 Java 搭建的微服務隻能運作 java 類代碼,golang 的類隻能運作 go 寫的代碼,而 FC 函數可以安裝不同語言的 runtime,支援運作不同語言程式。
類比了解之後,我們再看下如何調用 FC 的函數,一般的 FC 解決方案裡面都有一個觸發器的概念。比如 HTTP 觸發器、對象存儲觸發器、日志服務觸發器、定時任務觸發器、CDN 觸發器、消息隊列觸發器等。觸發器是對于 FC 函數調用的抽象收口,比如 HTTP 觸發器一般都類比網關的一個 http 請求事件,或是指定對象存儲路徑下上傳了一個圖檔,這些觸發事件的入口都可以是觸發器。
觸發器産生事件之後可以調用 FC 函數,函數執行的邏輯可以是下載下傳一張圖檔或是注冊一個使用者。
這樣從觸發器到 FC 函數邏輯處理就是一個 FC 的生命周期了。
那麼 FC 是如何實作高可用的呢?
其實每個函數底層代碼都是運作在一套 IaaS 平台上,使用 IaaS 資源,我們可以為每個函數設定運作代碼時需要的記憶體配置即可,比如最小 128M,最大 3G 等。研發人員不需要關心代碼運作在什麼樣的伺服器上,不需要關心啟動了多少函數執行個體支援目前場景,不需要關注背後的彈性擴縮問題,這些都被收斂在 FC 之後。
如圖有兩種高可用政策:
- 給函數設定并發執行個體數,比如 3 個,那麼當有三個請求進來時,該函數隻啟動一個執行個體,但是會啟動三個線程來運作邏輯;
- 線程達到上限後,會再拉起一個函數執行個體。
類似于線程池的方案。
那麼 Serverless 如何提效呢?
- 效率高:如果新加了語言,隻需要建立一個對應的 Runtime 的 FC 函數即可;
- 高可用:通過多線程、多執行個體兩種方式保障高可用,且函數執行個體擴縮容完全由 FC 自助處理,不需要運維做任何配置;
- 成本低:在沒有觸發器請求時,函數執行個體不會被拉起,也不會計費,是以在流量低谷期間或者夜間時,FC 消耗的成本是非常低的。
如何在雲平台建立一個 FC
1. 建立服務
- 首先建立一個服務名稱;
- 標明服務部署的地區(背後幫助你就近部署在目标機房);
- 選擇是否打開調試日志(開發過程開啟,線上運作時可關閉)。
2. 建立函數
有了服務之後就可以建立函數了,比如選擇基于 http 請求的函數。
- 選擇函數綁定的服務;
- 設定函數名稱;
- 選擇 runtime 環境;
- 是否要求函數執行個體彈性;
- 函數入口(觸發器直接調用的目标方法);
- 函數執行記憶體;
- 函數執行逾時時間;
- 設定執行個體并發度。
配置觸發器,比如選擇了 HTTP 觸發器,然後在觸發器上綁定函數名稱,由于是 http 通路,可以選擇通路的鑒權、認證方式,以及請求方式 POST or GET。
3. 代碼編寫
當函數建立好了之後,進入函數,可以看到描述、代碼執行曆史、觸發器類型、日志查詢頁等。
如果是 HTTP 觸發器,需要配置 http 觸發路徑。
可以看到就如前面介紹的那種,類似于類裡面的一個函數,上下文請求會打到這裡,直接執行。
Python 代碼為例:
# -*- coding: utf-8 -*-
import logging
import urllib.parse
import time
import subprocess
def handler(environ, start_response):
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
# 擷取使用者傳入的code
request_body = environ['wsgi.input'].read(request_body_size)
codeStr = urllib.parse.unquote(request_body.decode("GBK"))
# 因為body裡的對象裡有code和input兩個屬性,這裡分别擷取使用者code和使用者輸入
codeArr = codeStr.split('&')
code = codeArr[0][5:]
inputStr = codeArr[1][6:]
# 将使用者code儲存為py檔案,放/tmp目錄下,以時間戳為檔案名
fileName = '/tmp/' + str(int(time.time())) + '.py'
f = open(fileName, "w")
# 這裡預置引入了time庫
f.write('import time \r\n')
f = open(fileName, "a")
f.write(code)
f.close()
# 建立子程序,執行剛才儲存的使用者code py檔案
p = subprocess.Popen("python " + fileName, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, encoding='utf-8')
# 通過标準輸入傳入使用者的input輸入
if inputStr != '' :
p.stdin.write(inputStr + "\n")
p.stdin.flush()
# 通過标準輸出擷取代碼執行結果
r = p.stdout.read()
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [r.encode('UTF-8')]
流程如下:
- 前端傳入代碼片段,格式是字元串;
- 在 FC 函數中擷取到傳入的代碼字元串,截取 code 内容和 input 内容;
- 将代碼儲存為一個 py 檔案,以時間戳為檔案命名,儲存在 FC 函數的 /tmp 目錄下,每個函數有自己獨立的 /tmp 目錄;
- import time 庫代碼;
- 通過 subprocess 建立子流程,以 shell 方式通過 py 指令執行儲存在 /tmp 目錄下的 py 檔案;
- 最後讀取執行結果傳回給前端。
前端調用 FC 函數:
整個過程隻需要前端将代碼傳入到 FC 函數裡面,整個 Server 端各個環節都不需要研發與運維同學關心,展現了 Serverless 的精髓。
用 Serverless 協調工作流
工作流可以用順序、分支、并行等方式來編排任務執行,之後流程會按照設定好的步驟可靠地協調任務執行,跟蹤每個任務的狀态切換,并在必要時執行定義的重試邏輯,確定流程順利執行。
工作流流程通過記錄日志和審計方式來監視工作流的執行,便于流程的診斷與調試。
系統靈活性與擴充性的核心是服務可編排,是以我們需要做的是将現有系統内部使用者希望定制的功能進行梳理、拆分、抽離、結合 FC 提供的無狀态能力,将這些功能點進行編排,實作業務流程的定制。
需靈活配置工作流的業務
舉個例子,比如餐飲場景下不同商家可以配置不同的支付方式,可以走微信支付、銀聯支付、支付寶支付。可以同時支援三家,也可以某一家,可以到付,也可以積分兌換等。如果沒有一個好的配置化流程解決方案的話,系統中會出現大量寫死規則判斷條件,系統疊代疲于奔命,是個不可持續的過程。
有了 FC 搭建的工作流就可以很優雅地解決這種問題,比如規整流程如下:
上面的流程是使用者側的流程,接下來需要轉換成程式側的流程,通過限制的 FDL 建立工作流,如圖:
FDL 代碼如下:
version: v1beta1
type: flow
timeoutSeconds: 3600
steps:
- type: task
name: generateInfo
timeoutSeconds: 300
resourceArn: acs:mns:::/topics/generateInfo-fnf-demo-jiyuan/messages
pattern: waitForCallback
inputMappings:
- target: taskToken
source: $context.task.token
- target: products
source: $input.products
- target: supplier
source: $input.supplier
- target: address
source: $input.address
- target: orderNum
source: $input.orderNum
- target: type
source: $context.step.name
outputMappings:
- target: paymentcombination
source: $local.paymentcombination
- target: orderNum
source: $local.orderNum
serviceParams:
MessageBody: $
Priority: 1
catch:
- errors:
- FnF.TaskTimeout
goto: orderCanceled
-type: task
name: payment
timeoutSeconds: 300
resourceArn: acs:mns:::/topics/payment-fnf-demo-jiyuan/messages
pattern: waitForCallback
inputMappings:
- target: taskToken
source: $context.task.token
- target: orderNum
source: $local.orderNum
- target: paymentcombination
source: $local.paymentcombination
- target: type
source: $context.step.name
outputMappings:
- target: paymentMethod
source: $local.paymentMethod
- target: orderNum
source: $local.orderNum
- target: price
source: $local.price
- target: taskToken
source: $input.taskToken
serviceParams:
MessageBody: $
Priority: 1
catch:
- errors:
- FnF.TaskTimeout
goto: orderCanceled
- type: choice
name: paymentCombination
inputMappings:
- target: orderNum
source: $local.orderNum
- target: paymentMethod
source: $local.paymentMethod
- target: price
source: $local.price
- target: taskToken
source: $local.taskToken
choices:
- condition: $.paymentMethod == "zhifubao"
steps:
- type: task
name: zhifubao
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan/functions/zhifubao-fnf-demo
inputMappings:
- target: price
source: $input.price
- target: orderNum
source: $input.orderNum
- target: paymentMethod
source: $input.paymentMethod
- target: taskToken
source: $input.taskToken
- condition: $.paymentMethod == "weixin"
steps:
- type: task
name: weixin
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan.LATEST/functions/weixin-fnf-demo
inputMappings:
- target: price
source: $input.price
- target: orderNum
source: $input.orderNum
- target: paymentMethod
source: $input.paymentMethod
- target: taskToken
source: $input.taskToken
- condition: $.paymentMethod == "unionpay"
steps:
- type: task
name: unionpay
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan.LATEST/functions/union-fnf-demo
inputMappings:
- target: price
source: $input.price
- target: orderNum
source: $input.orderNum
- target: paymentMethod
source: $input.paymentMethod
- target: taskToken
source: $input.taskToken
default:
goto: orderCanceled
- type: task
name: orderCompleted
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan.LATEST/functions/orderCompleted
end: true
- type: task
name: orderCanceled
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan.LATEST/functions/cancerOrder
示例展現了基于 Serverless 的 FC 可實作靈活工作流。
流程如何觸發的呢?
在使用者選擇完商品、填完位址之後,通過拉取商品、訂單上下文,可以自動化觸發流程了。
在微服務背景下,很多能力不是閉環在單體代碼邏輯之内,很多時候是多個業務系統的連接配接,比如串聯多個 OpenAPI 接口實作全流程:
如想使用流程引擎需要進行相關的備案鑒權:
@Configuration
public class FNFConfig {
@Bean
public IAcsClient createDefaultAcsClient(){
DefaultProfile profile = DefaultProfile.getProfile(
"cn-xxx", // 地域ID
"ak", // RAM 賬号的AccessKey ID
"sk"); // RAM 賬号Access Key Secret
IAcsClient client = new DefaultAcsClient(profile);
return client;
}
}
startFNF 代碼裡面流程如何串聯起來:
- 輸入要啟動的流程名稱,比如每次訂單編号作為啟動流程執行個體名稱;
- 流程啟動後的流程執行個體名稱;
- 啟動輸入參數,比如業務參數,比如一個 json 裡面有商品、商家、位址、訂單等上下文資訊。
@GetMapping("/startFNF/{fnfname}/{execuname}/{input}")
public StartExecutionResponse startFNF(@PathVariable("fnfname") String fnfName,
@PathVariable("execuname") String execuName,
@PathVariable("input") String inputStr) throws ClientException {
JSONObject jsonObject = new JSONObject();
jsonObject.put("fnfname", fnfName);
jsonObject.put("execuname", execuName);
jsonObject.put("input", inputStr);
return fnfService.startFNF(jsonObject);
}
再看下 fnfService.startFNF:
@Override
public StartExecutionResponse startFNF(JSONObject jsonObject) throws ClientException {
StartExecutionRequest request = new StartExecutionRequest();
String orderNum = jsonObject.getString("execuname");
request.setFlowName(jsonObject.getString("fnfname"));
request.setExecutionName(orderNum);
request.setInput(jsonObject.getString("input"));
JSONObject inputObj = jsonObject.getJSONObject("input");
Order order = new Order();
order.setOrderNum(orderNum);
order.setAddress(inputObj.getString("address"));
order.setProducts(inputObj.getString("products"));
order.setSupplier(inputObj.getString("supplier"));
orderMap.put(orderNum, order);
return iAcsClient.getAcsResponse(request);
}
- 第一部分是啟動流程;
- 第二部分是建立訂單對下,并模拟入庫。
前端如何調用?
在前端當點選選擇商品和商家頁面中的下一步後,通過 GET 方式調用 HTTP 協定的接口 /startFNF/{fnfname}/{execuname}/{input}。和上面的 Java 方法對應。
- fnfname:要啟動的流程名稱;
- execuname:随機生成 uuid,作為訂單的編号,也作為啟動流程執行個體的名稱;
- input:将商品、商家、訂單号、位址建構為 JSON 字元串傳入流程。
submitOrder(){
const orderNum = uuid.v1()
this.$axios.$get('/startFNF/OrderDemo-Jiyuan/'+orderNum+'/{\n' +
' "products": "'+this.products+'",\n' +
' "supplier": "'+this.supplier+'",\n' +
' "orderNum": "'+orderNum+'",\n' +
' "address": "'+this.address+'"\n' +
'}' ).then((response) => {
console.log(response)
if(response.message == "success"){
this.$router.push('/orderdemo/' + orderNum)
}
})
}
1. generateInfo 節點
先看下第一個 FDL 節點定義:
- type: task
name: generateInfo
timeoutSeconds: 300
resourceArn: acs:mns:::/topics/generateInfo-fnf-demo-jiyuan/messages
pattern: waitForCallback
inputMappings:
- target: taskToken
source: $context.task.token
- target: products
source: $input.products
- target: supplier
source: $input.supplier
- target: address
source: $input.address
- target: orderNum
source: $input.orderNum
- target: type
source: $context.step.name
outputMappings:
- target: paymentcombination
source: $local.paymentcombination
- target: orderNum
source: $local.orderNum
serviceParams:
MessageBody: $
Priority: 1
catch:
- errors:
- FnF.TaskTimeout
goto: orderCanceled
```
- name:節點名稱;
- timeoutSeconds:逾時時間,節點等待時長,超過時間後跳轉到 goto 分支指向的 orderCanceled 節點;
- pattern:設定為 waitForCallback,表示需要等待确認;
- inputMappings:該節點入參;
- taskToken:Serverless 工作流自動生成的 Token;
- products:選擇的商品;
- supplier:選擇的商家;
- address:送餐位址;
- orderNum:訂單号;
- outputMappings:該節點的出參;
- paymentcombination:該商家支援的支付方式;
- orderNum:訂單号;
- catch:捕獲異常,跳轉到其他分支。
Serverless 工作流支援多個雲服務內建,将其他服務作為任務步驟的執行單元。服務內建方式通過 FDL 表達式實作,在任務步驟中,可以使 用resourceArn 來定義內建的目标服務,使用 pattern 定義內建模式。
在 resourceArn 中配置 /topics/generateInfo-fnf-demo-jiyuan/messages 資訊,就是內建了 MNS 消息隊列服務,當 generateInfo 節點觸發後會向 generateInfo-fnf-demo-jiyuanTopic 中發送一條消息。消息的正文和參數在 serviceParams 對象中 zhi'd 指定。MessageBody 是消息正文,配置 $ 表示通過輸入映射 inputMappings 産生消息正文。
generateInfo-fnf-demo 函數:
向 generateInfo-fnf-demo-jiyuanTopic 中發送的這條消息包含了商品資訊、商家資訊、位址、訂單号,表示一個下訂單流程的開始,既然有發消息,那麼必然有接受消息進行後續處理。在函數計算控制台,建立服務,在服務下建立名為 generateInfo-fnf-demo 的事件觸發器函數,這裡選擇 Python Runtime:
![17.png](https://ucc.alicdn.com/pic/developer-ecology/89d43991349e422e8bda26f905cce0c4.png)
建立 MNS 觸發器,選擇監聽 generateInfo-fnf-demo-jiyuanTopic:
![18.png](https://ucc.alicdn.com/pic/developer-ecology/4612a6cd4ba54dc38a79cebf7cacc3ac.png)
打開消息服務 MNS 控制台,建立 generateInfo-fnf-demo-jiyuanTopic:
![19.png](https://ucc.alicdn.com/pic/developer-ecology/f51714a2ee594dcba4bbbc888c42288c.png)
接下來寫函數代碼:
-- coding: utf-8 --
import logging
import json
import time
import requests
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
from aliyunsdkfnf.request.v20190315 import ReportTaskFailedRequest
def handler(event, context):
1. 建構Serverless工作流Client
region = "cn-hangzhou"
account_id = "XXXX"
ak_id = "XXX"
ak_secret = "XXX"
fnf_client = AcsClient(
ak_id,
ak_secret,
region
)
logger = logging.getLogger()
# 2. event内的資訊即接受到Topic generateInfo-fnf-demo-jiyuan中的消息内容,将其轉換為Json對象
bodyJson = json.loads(event)
logger.info("products:" + bodyJson["products"])
logger.info("supplier:" + bodyJson["supplier"])
logger.info("address:" + bodyJson["address"])
logger.info("taskToken:" + bodyJson["taskToken"])
supplier = bodyJson["supplier"]
taskToken = bodyJson["taskToken"]
orderNum = bodyJson["orderNum"]
# 3. 判斷什麼商家使用什麼樣的支付方式組合,這裡的示例比較簡單粗暴,正常情況下,應該使用中繼資料配置的方式擷取
paymentcombination = ""
if supplier == "haidilao":
paymentcombination = "zhifubao,weixin"
else:
paymentcombination = "zhifubao,weixin,unionpay"
# 4. 調用Java服務暴露的接口,更新訂單資訊,主要是更新支付方式
url = "http://xx.xx.xx.xx:8080/setPaymentCombination/" + orderNum + "/" + paymentcombination + "/0"
x = requests.get(url)
# 5. 給予generateInfo節點響應,并傳回資料,這裡傳回了訂單号和支付方式
output = "{\"orderNum\": \"%s\", \"paymentcombination\":\"%s\" " \
"}" % (orderNum, paymentcombination)
request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
request.set_Output(output)
request.set_TaskToken(taskToken)
resp = fnf_client.do_action_with_exception(request)
return 'hello world'
代碼分五部分:
- 建構 Serverless 工作流 Client;
- event 内的資訊即接受到 TopicgenerateInfo-fnf-demo-jiyuan 中的消息内容,将其轉換為 Json 對象;
- 判斷什麼商家使用什麼樣的支付方式組合,這裡的示例比較簡單粗暴,正常情況下,應該使用中繼資料配置的方式擷取。比如在系統内有商家資訊的配置功能,通過在界面上配置該商家支援哪些支付方式,形成中繼資料配置資訊,提供查詢接口,在這裡進行查詢;
- 調用 Java 服務暴露的接口,更新訂單資訊,主要是更新支付方式;
- 給予 generateInfo 節點響應,并傳回資料,這裡傳回了訂單号和支付方式。因為該節點的 pattern 是 waitForCallback,是以需要等待響應結果。
generateInfo-fnf-demo 函數配置了 MNS 觸發器,當 TopicgenerateInfo-fnf-demo-jiyuan 有消息後就會觸發執行 generateInfo-fnf-demo 函數。
### 2. payment 節點
接下來是 payment 的 FDL 代碼定義:
-
type: task
name: payment
timeoutSeconds: 300
resourceArn: acs:mns:::/topics/payment-fnf-demo-jiyuan/messages
pattern: waitForCallback
inputMappings:
-
target: taskToken
source: $context.task.token
-
target: orderNum
source: $local.orderNum
- target: paymentcombination
source: $local.paymentcombination
-
target: type
source: $context.step.name
outputMappings:
- target: paymentMethod
source: $local.paymentMethod
-
- target: price
source: $local.price
-
source: $input.taskToken
serviceParams:
MessageBody: $
Priority: 1
catch:
- errors:
-
FnF.TaskTimeout
goto: orderCanceled
-
-
當流程流轉到 payment 節點後,使用者就可以進入到支付頁面。
payment 節點會向 MNS 的 Topicpayment-fnf-demo-jiyuan 發送消息,會觸發 payment-fnf-demo 函數。
payment-fnf-demo 函數:
payment-fnf-demo 函數的建立方式和 generateInfo-fnf-demo 函數類似。
# -*- coding: utf-8 -*-
import logging
import json
import os
import time
import logging
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
from aliyunsdkfnf.request.v20190315 import ReportTaskFailedRequest
from mns.account import Account # pip install aliyun-mns
from mns.queue import *
def handler(event, context):
logger = logging.getLogger()
region = "xxx"
account_id = "xxx"
ak_id = "xxx"
ak_secret = "xxx"
mns_endpoint = "http://your_account_id.mns.cn-hangzhou.aliyuncs.com/"
queue_name = "payment-queue-fnf-demo"
my_account = Account(mns_endpoint, ak_id, ak_secret)
my_queue = my_account.get_queue(queue_name)
# my_queue.set_encoding(False)
fnf_client = AcsClient(
ak_id,
ak_secret,
region
)
eventJson = json.loads(event)
isLoop = True
while isLoop:
try:
recv_msg = my_queue.receive_message(30)
isLoop = False
# body = json.loads(recv_msg.message_body)
logger.info("recv_msg.message_body:======================" + recv_msg.message_body)
msgJson = json.loads(recv_msg.message_body)
my_queue.delete_message(recv_msg.receipt_handle)
# orderCode = int(time.time())
task_token = eventJson["taskToken"]
orderNum = eventJson["orderNum"]
output = "{\"orderNum\": \"%s\", \"paymentMethod\": \"%s\", \"price\": \"%s\" " \
"}" % (orderNum, msgJson["paymentMethod"], msgJson["price"])
request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
request.set_Output(output)
request.set_TaskToken(task_token)
resp = fnf_client.do_action_with_exception(request)
except Exception as e:
logger.info("new loop")
return 'hello world'
上面代碼核心思路是等待使用者在支付頁面選擇某個支付方式确認支付。使用了 MNS 的隊列來模拟等待。循環等待接收隊列 payment-queue-fnf-demo 中的消息,當收到消息後将訂單号和使用者選擇的具體支付方式以及金額傳回給 payment 節點。
前端選擇支付方式頁面:
經過 generateInfo 節點後,該訂單的支付方式資訊已經有了,是以對于使用者而言,當填完商品、商家、位址後,跳轉到的頁面就是該确認支付頁面,并且包含了該商家支援的支付方式。
進入該頁面後,會請求 Java 服務暴露的接口,擷取訂單資訊,根據支付方式在頁面上顯示不同的支付方式。
代碼片段如下:
當使用者標明某個支付方式點選送出訂單按鈕後,向 payment-queue-fnf-demo 隊列發送消息,即通知 payment-fnf-demo 函數繼續後續的邏輯。
使用了一個 HTTP 觸發器類型的函數,用于實作向 MNS 發消息的邏輯,paymentMethod-fnf-demo 函數代碼:
# -*- coding: utf-8 -*-
import logging
import urllib.parse
import json
from mns.account import Account # pip install aliyun-mns
from mns.queue import *
HELLO_WORLD = b'Hello world!\n'
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
# process custom request headers
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
request_body = environ['wsgi.input'].read(request_body_size)
paymentMethod = urllib.parse.unquote(request_body.decode("GBK"))
logger.info(paymentMethod)
paymentMethodJson = json.loads(paymentMethod)
region = "cn-xxx"
account_id = "xxx"
ak_id = "xxx"
ak_secret = "xxx"
mns_endpoint = "http://your_account_id.mns.cn-hangzhou.aliyuncs.com/"
queue_name = "payment-queue-fnf-demo"
my_account = Account(mns_endpoint, ak_id, ak_secret)
my_queue = my_account.get_queue(queue_name)
output = "{\"paymentMethod\": \"%s\", \"price\":\"%s\" " \
"}" % (paymentMethodJson["paymentMethod"], paymentMethodJson["price"])
msg = Message(output)
my_queue.send_message(msg)
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
函數的邏輯很簡單,就是向 MNS 的隊列 payment-queue-fnf-demo 發送使用者選擇的支付方式和金額。
3. paymentCombination 節點
paymentCombination 節點是一個路由節點,通過判斷某個參數路由到不同的節點,以 paymentMethod 作為判斷條件:
- type: choice
name: paymentCombination
inputMappings:
- target: orderNum
source: $local.orderNum
- target: paymentMethod
source: $local.paymentMethod
- target: price
source: $local.price
- target: taskToken
source: $local.taskToken
choices:
- condition: $.paymentMethod == "zhifubao"
steps:
- type: task
name: zhifubao
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan/functions/zhifubao-fnf-demo
inputMappings:
- target: price
source: $input.price
- target: orderNum
source: $input.orderNum
- target: paymentMethod
source: $input.paymentMethod
- target: taskToken
source: $input.taskToken
- condition: $.paymentMethod == "weixin"
steps:
- type: task
name: weixin
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan.LATEST/functions/weixin-fnf-demo
inputMappings:
- target: price
source: $input.price
- target: orderNum
source: $input.orderNum
- target: paymentMethod
source: $input.paymentMethod
- target: taskToken
source: $input.taskToken
- condition: $.paymentMethod == "unionpay"
steps:
- type: task
name: unionpay
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan.LATEST/functions/union-fnf-demo
inputMappings:
- target: price
source: $input.price
- target: orderNum
source: $input.orderNum
- target: paymentMethod
source: $input.paymentMethod
- target: taskToken
source: $input.taskToken
default:
goto: orderCanceled
流程是,使用者選擇支付方式後,通過消息發送給 payment-fnf-demo 函數,然後将支付方式傳回,于是流轉到 paymentCombination 節點通過判斷支付方式流轉到具體處理支付邏輯的節點和函數。
4. zhifubao 節點
看一個 zhifubao 節點:
choices:
- condition: $.paymentMethod == "zhifubao"
steps:
- type: task
name: zhifubao
resourceArn: acs:fc:cn-hangzhou:your_account_id:services/FNFDemo-jiyuan/functions/zhifubao-fnf-demo
inputMappings:
- target: price
source: $input.price
- target: orderNum
source: $input.orderNum
- target: paymentMethod
source: $input.paymentMethod
- target: taskToken
source: $input.taskToken
節點的 resourceArn 和之前兩個節點的不同,這裡配置的是函數計算中函數的 ARN,也就是說當流程流轉到這個節點時會觸發 zhifubao-fnf-demo 函數,該函數是一個事件觸發函數,但不需要建立任何觸發器。流程将訂單金額、訂單号、支付方式傳給 zhifubao-fnf-demo 函數。
zhifubao-fnf-demo 函數:
# -*- coding: utf-8 -*-
import logging
import json
import requests
import urllib.parse
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
from aliyunsdkfnf.request.v20190315 import ReportTaskFailedRequest
def handler(event, context):
region = "cn-xxx"
account_id = "xxx"
ak_id = "xxx"
ak_secret = "xxx"
fnf_client = AcsClient(
ak_id,
ak_secret,
region
)
logger = logging.getLogger()
logger.info(event)
bodyJson = json.loads(event)
price = bodyJson["price"]
taskToken = bodyJson["taskToken"]
orderNum = bodyJson["orderNum"]
paymentMethod = bodyJson["paymentMethod"]
logger.info("price:" + price)
newPrice = int(price) * 0.8
logger.info("newPrice:" + str(newPrice))
url = "http://xx.xx.xx.xx:8080/setPaymentCombination/" + orderNum + "/" + paymentMethod + "/" + str(newPrice)
x = requests.get(url)
return {"Status":"ok"}
代碼邏輯很簡單,接收到金額後,将金額打 8 折,然後将價格更新回訂單。其他支付方式的節點和函數如法炮制,變更實作邏輯就可以。在這個示例中,微信支付打了 5 折,銀聯支付打 7 折。
完整流程
流程中的 orderCompleted 和 orderCanceled 節點沒做什麼邏輯,流程如下:
從 Serverless 工作流中看到的節點流轉是這樣的:
寫在後面
以上是一個基于 Serverless 的 FC 實作的工作流,模拟建構了一個訂單子產品,規則包括:
- 配置商家和支付方式的中繼資料規則;
- 确認支付頁面的中繼資料規則。
在實際項目中,需要将可定制的部分抽象為中繼資料描述,需要有配置界面供營運或商家定制支付方式也就是中繼資料規則,然後前後端頁面基于中繼資料資訊展示相應的内容。
如果之後需要接入新的支付方式,隻需要在 paymentCombination 路由節點中确定好路由規則,之後增加對應的支付方式函數即可,通過增加中繼資料配置項,就可以在頁面展示新加的支付方式,并路由到新的支付函數中。
經過整篇文章相信很多人對于 Serverless 的定義,以及如何基于現有的公有雲系統的 Serverless 功能實作商業能力已經有了一定的了解,甚至基于此有實力的公司可以自研一套 Serverless 平台。當然思想是相同的,其實文中很多邏輯與理論不止适用于 Serverless,就是我們日常基于微服務的平台化/中台化解決方案,都可以從中擷取設計營養在工作中應用。