天天看點

Opensearch PHP SDK協程相容改造

摘要

本文簡單的介紹了協程的概念及基本原理,以及協程在PHP中的一種實作方案(PECL/Swoole)。最後,結合Opensearch PHP SDK的協程改造過程示範了具體的使用方法。

協程

與程序、線程一樣,協程是邏輯代碼線之間隔離的一種方法。隻不過程序和線程是由作業系統直接支援,并負責排程的;協程的粒度比線程更小,作業系統無法感覺,是以排程工作必須由程式自己完成。

從目标上來看,協程與epoll等模型基本一緻:都是為了降低程序(線程)排程引發的頻繁上下文切換的資源消耗,最終提高系統效率。使用epoll模型編寫的代碼大量使用回調函數(類似下面的僞代碼):

connect(uri, connected() {
    send(data, sent() {
        receive(received(response) {
            // ...
        });
    });
})           

在實際編寫中,一般不會使用這麼深層次的函數嵌套結構,但是上例從側面描述了異步代碼的編寫困境:效率高,閱讀難。

與epoll模型不同,協程代碼不需要編寫很多回調函數,代碼邏輯看起來和同步代碼一樣:

connect(uri);
send(data);
response = receive();
// ...           

協程排程器完成了其中的排程工作:感覺挂起,完成排程。

協程的概念提出的很早,隻是最近有些程式設計語言原生支援協程(如:Go)才使得其變得較為熱門。PHP解釋器對各種C類庫的依賴較為嚴重,代碼中大量使用同步方法。是以直接在Zend Engine中支援協程困難重重。好在有擴充開發人員編寫了大量的實作代碼,為我們解決了這個問題。

PECL/Swoole

PECL/Swoole是使用C/C++開發的PHP異步網絡通訊擴充,提供異步非阻塞網絡通訊支援。基于PECL/Swoole擴充,我們可以在PHP非線程安全模式下實作多線程的網絡通訊,提高PHP程式的吞吐能力。

自2.0開始,PECL/Swoole提供了原生的協程支援。開發者可以借助一整套新編寫的類和方法實作單線程的基于協程的網絡通訊。自4.0開始,PECL/Swoole重寫了協程部分全部的代碼,棄用了(未釋出的3.0版本)基于微信C++協程庫的對于協程的實作方案,自主實作了較為穩定的協程方案。

下面的代碼展示了如何通過PECL/Swoole實作簡單的HTTP用戶端請求(與PECL/Swoole版本無關):

go(function() {
    $cli = new \Swoole\Coroutine\Http\Client('127.0.0.1', 9501);
    
    $cli->setHeaders(['Host' => 'localhost']);
    $cli->set(['http_proxy_host' => HTTP_PROXY_HOST, 'http_proxy_port' => HTTP_PROXY_PORT]);
    
    $result = $cli->get('/get?json=true');
    var_dump($cli->body);
});           

代碼中的匿名函數首先通過IP位址和端口号建立了HTTP用戶端對象,然後分别設定了頭資訊和代理資訊,最後通過

GET

方法擷取URI的響應結果并輸出。

示例代碼中的

go()

函數是PECL/Swoole協程實作的核心:在其中執行的代碼全部受到協程排程器的管控,并在某個協程操作挂起時自動切換到其他協程待處理的代碼段中。下面的僞代碼展示了如何借助

go()

函數同時發出多個請求:

for ($i=0; $i<10; ++$i) {
    go(function() use($i) {
        $response = request('/region');
        echo "#{$i}: " . $response . PHP_EOL;
    });
}           

由于協程排程器的存在,代碼不會在

request()

函數處停留,全部請求幾乎同時發出。這就意味着獲得響應的順序也不會嚴格按照#0, #1, …的順序進行:哪個請求先傳回,哪個請求的的

echo

語句先被執行。

當然,PECL/Swoole目前隻支援其自制的、經過改造的網絡通訊類,其他尚未改造的阻塞函數(或方法)無法被支援。

改造手記

與大部分的PHP編寫的HTTP用戶端程式一樣,Opensearch PHP SDK使用cURL作為預設的HTTP請求工具。借助ext/curl,我們可以實作絕大多數的阻塞式的HTTP請求(包括HTTPS請求)。但是對于協程程式來說,這裡就是需要重點改造的地方。

1.改造原有代碼

OpenSearch\Client\OpenSearchClient

類中,我們找到了前輩們提取出的公用請求方法

_curl()

private function _curl($url, $items) {
        $method = strtoupper($items['method']);
        $options = array(
            CURLOPT_HTTP_VERSION => 'CURL_HTTP_VERSION_1_1',
            CURLOPT_CONNECTTIMEOUT => $this->connectTimeout,
            CURLOPT_TIMEOUT => $this->timeout,
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_HEADER => false,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERAGENT => "opensearch/php sdk " . self::SDK_VERSION . "/" . PHP_VERSION,
            CURLOPT_HTTPHEADER => $this->_getHeaders($items),
        );

        if ($method == self::METHOD_GET) {
            $query = $this->_buildQuery($items['query_params']);
            $url .= preg_match('/\?/i', $url) ? '&' . $query : '?' . $query;
        } else{
            if(!empty($items['body_json'])){
                $options[CURLOPT_POSTFIELDS] = $items['body_json'];
            }
        }

        if ($this->gzip) {
            $options[CURLOPT_ENCODING] = 'gzip';
        }

        if ($this->debug) {
            $out = fopen('php://temp','rw');
            $options[CURLOPT_VERBOSE] = true;
            $options[CURLOPT_STDERR] = $out;
        }

        $session = curl_init($url);
        curl_setopt_array($session, $options);
        $response = curl_exec($session);
        curl_close($session);

        $openSearchResult = new OpenSearchResult();
        $openSearchResult->result = $response;

        if ($this->debug) {
            $openSearchResult->traceInfo = $this->getDebugInfo($out, $items);
        }

        return $openSearchResult;
    }           

上述代碼的大緻流程是:

  • 設定cURL請求參數;
  • 請求并擷取響應體;
  • 建構并傳回

    OpenSearch\Generated\Common\OpenSearchResult

    對象;

首先,我們需要提供一個可供使用者切換的開關,便于協程開發者從cURL模式切換為Swoole模式:

/** @var IHttpHandler */
    private $httpHandler = null;

    public function __construct($accessKey, $secret, $host, $options = array()) {
        // ...
        $this->httpHandler = new CUrlHttpHandler();
        // ...
    }

    public function setHttpHandler(IHttpHandler $httpHandler)
    {
        $this->httpHandler = $httpHandler;
    }           

其次,定義

IHttpHandler

接口:

interface IHttpHandler
{
    /**
     * Performs a HTTP request and returns response body
     *
     * @return string|false
     */
    public function request($url, $items, $connectTimeout, $timeout, $gzip, $debug);
}           

接口方法

request()

的參數和傳回值保持與原

_curl()

方法一緻,但是追加了一些原來可以通過

$this->

擷取到的配置參數。

注:如果深入改造的話,可以考慮将這些

$this->

參數移入

IHttpHandler

的抽象實作中。

使用該接口改造原

_curl()

方法:

private function _curl($url, $items) {
        $response = $this->httpHandler->request($url, $items
                , $this->connectTimeout, $this->timeout, $this->gzip, $this->debug);
        // ...
    }           

由于原

_curl()

方法中包含對

OpenSearchClient

類私有方法的調用,考慮建立

IHttpHandler

的抽象實作共享這部分方法:

abstract class AbstractHttpHandler implements IHttpHandler
{
    // Extract from OpenSearchClient
    public function _getHeaders($items) {
        // ...
    }

    // Extract from OpenSearchClient
    public function _buildQuery($params) {
        // ...
    }
}           

在改造原

_curl()

方法時,原有的代碼就可以拼接出

CUrlHttpHandler

class CUrlHttpHandler extends AbstractHttpHandler
{
    public function request($url, $items, $connectTimeout, $timeout, $gzip, $debug)
    {
        $method = strtoupper($items['method']);
        $options = array(
            CURLOPT_HTTP_VERSION => 'CURL_HTTP_VERSION_1_1',
            CURLOPT_CONNECTTIMEOUT => $connectTimeout,
            CURLOPT_TIMEOUT => $timeout,
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_HEADER => false,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERAGENT => "opensearch/php sdk " . OpenSearchClient::SDK_VERSION . "/" . PHP_VERSION,
            CURLOPT_HTTPHEADER => $this->_getHeaders($items),
        );

        if ($method == OpenSearchClient::METHOD_GET) {
            $query = $this->_buildQuery($items['query_params']);
            $url .= preg_match('/\?/i', $url) ? '&' . $query : '?' . $query;
        } else{
            if(!empty($items['body_json'])){
                $options[CURLOPT_POSTFIELDS] = $items['body_json'];
            }
        }

        if ($gzip) {
            $options[CURLOPT_ENCODING] = 'gzip';
        }

        if ($debug) {
            $out = fopen('php://temp','rw');
            $options[CURLOPT_VERBOSE] = true;
            $options[CURLOPT_STDERR] = $out;
        }

        $session = curl_init($url);
        curl_setopt_array($session, $options);
        $response = curl_exec($session);
        curl_close($session);

        return $response;
    }
}           

隻是需要有兩點修改:

  • 原有的

    $this->

    對屬性的使用全部變更為局部變量,如:

    $this->debug

    更換為

    $debug

  • self::

    對常量的使用全部變更為

    OpenSearchClient::

最後,就是我們本次的重頭戲

SwooleHttpHandler

了。

2.新的方法

PECL/Swoole的更新疊代速度飛快,是以其文檔遠遠追不上最新的版本。很多時候,我們隻能夠靠分析其源代碼探尋可以使用屬性或者方法。

首先,建立請求類對象:

$host = parse_url($url, PHP_URL_HOST);
        $client = new \Swoole\Coroutine\Http\Client($host);           

然後,對應cURL配置各種參數:

// ...

        // 跳過CURLOPT_HTTP_VERSION(Swoole預設使用HTTP/1.1)
        // 跳過CURLOPT_CONNECTTIMEOUT(注意:暫無法設定連接配接逾時時間)
        // CURLOPT_TIMEOUT
        $client->set(['timeout' => $timeout]);
        // CURLOPT_CUSTOMREQUEST
        $client->setMethod($method);
        // 跳過CURLOPT_HEADER(Swoole預設将響應頭、體分離)
        // 跳過CURLOPT_RETURNTRANSFER(Swoole預設傳回響應體)
        // CURLOPT_USERAGENT
        $headers['User-Agent'] = "opensearch/php sdk " . OpenSearchClient::SDK_VERSION . "/" . PHP_VERSION;
        // CURLOPT_ENCODING
        if ($gzip) {
            $headers['Accept-Encoding'] = 'gzip';
        }
        // CURLOPT_HTTPHEADER
        $client->setHeaders($headers); // NAME => VALUE           

接下來,根據請求類型存放請求體:

if ($method == OpenSearchClient::METHOD_GET) {
            $query = $this->_buildQuery($items['query_params']);
            $url .= preg_match('/\?/i', $url) ? '&' . $query : '?' . $query;
        } else {
            if(!empty($items['body_json'])){
                $client->setData($items['body_json']); // Request body
            }
        }           

最後,請求并傳回結果:

$result = $client->execute($url); // Boolean
        if (!$result) {
            return false;
        }

        return $client->body;           

至此,改造完畢。

3.測試使用

注:下面的代碼隻是展示了改造後的用戶端類如何使用,并不涉及多請求的并行示範:

go(function() {

    $coClient = OpensearchClientBuilder::build();
    $coClient->setHttpHandler(new OpenSearch\Client\SwooleHttpHandler()); // 更換請求處理器
    $coClient = new OpensearchClientResponseParser($coClient);

    $result = $coClient->get('/region');
    fprintf(STDOUT, "name=%s" . PHP_EOL, $result['result']['name']);

});           

後記

雖然在Opensearch PHP SDK中支援協程并非使用者提出的需求,但是作為一家技術型公司,為使用者提供更多的技術選擇可能性也是我們應該提倡、做到的。

本文中提到的PHP協程并非隻有PECL/Swoole一種解決方案,PHP開發組也在考慮将協程内置的可能性。然而從功能完整性(即使存在上文中提到無法設定“連接配接逾時時間”等問題)和穩定性上來看,PECL/Swoole無疑是當下最出色的。

參考文檔