天天看點

PhpRpc 從 0 到 0.7

1.什麼是RPC

RPC全稱Remote Procedure Call,中文譯為遠端過程調用,簡單了解就是 一種解決方案。

業務場景:

舉一個大部分phper都接觸過的商城開發,一般商城都有以下幾個子產品

  • 商品子產品
  • 訂單子產品
  • 會員子產品
  • XX子產品

在常見架構中的展現是:

PhpRpc 從 0 到 0.7

那麼在RPC架構中每個子產品就是一個服務提供者,架構展現:

在這套架構中業務機的職責就是把一個請求 ,拆分成N個小請求,分發到各個服務裡面,再整合各個服務的結果,傳回給使用者。

PhpRpc 從 0 到 0.7

例如在某次下單請求中,那麼大概 發送的邏輯如下:

1. 業務機接受請求

2. 業務機提取使用者參數,請求使用者服務,擷取使用者餘額等資訊,等待結果

3. 業務機提取商品參數,請求商品服務,擷取商品剩餘庫存和價格等資訊,等待結果。

4. 業務機融合使用者服務、商品服務的傳回結果,進行下一步調用(假設滿足購買條件)

5. 業務機調用使用者服務進行扣款,調用商品服務進行庫存扣減,調用訂單服務進行下單(事務邏輯和撤回可以用請求id保證,或者自己實作其他邏輯排程)

6. 業務機根據處理響應使用者

而在以上發生的行為,就稱為遠端過程調用。而調用過程實作的通訊協定可以有很多,比如常見的HTTP、TCP協定。

服務熔斷

某個服務故障或者異常時直接熔斷整個服務,而不是一直等到此服務逾時

服務降級

當某個服務熔斷之後,伺服器将不再被調用,此時用戶端可以自己準備一個本地的fallback回掉,傳回一個預設值 ,這樣做,雖然服務水準下降,但好歹,比直接挂掉要強。 服務降級處理是在用戶端實作完成的,與服務端沒有關系

服務限流

例如某個伺服器最多同時僅能處理100個請求, 或者是cpu負載達到百分之80的時候, 為了保護服務的穩定性,則不在希望繼續收到 新的連接配接。那麼此時就要求用戶端不再對其發起請求,例如 你可以以任何的形式來監控你的服務,當觸發某個條件時(CPU負載80%)下線此服務,業務機動态擷取服務節點時就可以知道此服務已限流則響應使用者[網絡繁忙,請稍後再試] 或者此服務有多台機提供則其他機可繼續提供服務,等被下線的機子恢複後又上線

2.Php Tcp通訊

源碼

https://github.com/ar414-com/RpcDemo

開發環境要求

  • 保證 PHP 版本大于等于 7.2
  • 保證 Swoole 拓展版本大于等于 4.3.5
  • 使用 Linux / FreeBSD / MacOS 這三類作業系統

作者開發環境

  • PHP 7.2
  • Swoole 4.3.5
  • CentOS 7.2
PhpRpc 從 0 到 0.7

建立一個最基本的TCP伺服器

<?php

//建立Server對象,監聽 0.0.0.0:20001端口
$serv = new Swoole\Server("0.0.0.0", 20001);

$serv->on('Start', function ($serv) {
 echo "服務已啟動,主程序PID:{$serv->master_pid}\n";
});

//監聽連接配接進入事件
$serv->on('Connect', function ($serv, $fd) {
 echo "Client: Connect.\n";});

//監聽資料接收事件
$serv->on('Receive', function ($serv, $fd, $from_id, $data) {
 echo "接收用戶端資料:{$data}\n";
  $serv->send($fd, "Server: ".$data);
});

//監聽連接配接關閉事件
$serv->on('Close', function ($serv, $fd) {
 echo "Client: Close.\n";});

//啟動伺服器
$serv->start();           
PhpRpc 從 0 到 0.7
<?php

//建立連接配接
$fp = stream_socket_client('tcp://127.0.0.1:20001');

//發送資料
fwrite($fp, 'Test');

//主動擷取響應
$data = fread($fp, 65533);

echo "服務端響應資料:{$data}\n";

//斷開連接配接
fclose($fp);           

用戶端

PhpRpc 從 0 到 0.7

服務端

PhpRpc 從 0 到 0.7

3.用戶端調用與服務端處理(提供思路)

用戶端與伺服器的資料傳輸約定

用戶端請求Rpc服務(以下并非完整代碼)
  • 場景:例如在一個商場系統中,我們将商品庫和使用者庫兩個服務切分開到不同的伺服器當中
  • 當使用者打開商場首頁的時候, 我們希望App向某個網關發起請求,
  • 該網關可以自動的幫我們請求商品清單和使用者資訊等資料
//商品清單
$data = [
 'service' => 'Goods',  //服務名稱
 'action'  => 'getList', //具體方法
 'arg'     => ['page' => 1] //請求參數
];
//使用者資訊
$data = [
 'service' => 'User',  //服務名稱
 'action'  => 'getUserInfoForToken', //具體方法
 'arg'     => ['token' => '6aa62603ef82b70597a90d93af04b542'] //請求參數
];
//打包資料
$dataStr = serialize($data); 
$dataStr = pack('N', strlen($str)).$str;           

請求API網關 API網關自動根據Service參數查詢出對應服務IP、PORT并進行調用傳回

本示例為了友善将Rpc服務配置寫入.env檔案 例:

//.env
RPC_GOODS_HOST=10.0.0.1
RPC_GOODS_PORT=8899
RPC_USER_HOST=10.0.0.2
RPC_USER_PORT=8899           
服務端處理請求( 完整代碼
//接受請求資料并解包
$data = substr($request,'4');
$data = unserialize($data);
//TODO 檢測必須參數 service action
//檢測服務是否存在
//$controllerNameSpace是你的控制器命名空間
$service = ucfirst($data['service']);
$class   = "{$controllerNameSpace}\\{$service}";
if(!class_exists($class))
{
 //TODO 服務不存在
 //設定響應狀态錯誤碼(需自行封裝)
 $response->setStatus(Response::STATUS_SERVICE_SERVICE_NOT_FOUND); //響應用戶端(需自行封裝)
 goto response;}

//檢測方法是否存在
$class  = new \ReflectionClass($class);
$action = $data['action'];
if(!$class->hasMethod($action))
{
 //action不存在
 //重新組裝參數
 //如果方法則調用魔術方法 比如調用一些PDO方法,如果無則調用時傳回方法不存在
 $request->proxyActionAssemblyArg(); $method = $class->getMethod('__call');}
else
{
 $method = $class->getMethod($action);}

//調用
$instance = $class->newInstance($request,$response);
$ret = $method->invokeArgs($instance,$request->getArg());
$response->setMessage($ret);
//響應用戶端(需自行封裝)
goto response;

//作者的響應封裝(僅供參考):
response:{
    if ($server->exist($fd))
    {
        $message = $response->getMessage();
        $responseData = [
            'status' => $response->getStatus(),
            'data'   => $message
        ];
        $responseData = serialize($responseData);
        $responseData = Request::pack($responseData);
        $server->send($fd,$responseData);
        //判斷用戶端是否需要長連接配接
        if(!$request->getIsKeep())
        {
            $server->close($fd);
        }
    }
}