天天看點

使用Coroutine\Channel實作一個簡單的MySQL連接配接池

Channel 通道,類似于 go 語言的 chan,支援多生産者協程和多消費者協程,Swoole 底層自動實作了協程的切換和排程

Channel 實作原理

  • 通道與 PHP 的 Array 類似,僅占用記憶體,沒有其他額外的資源申請,所有操作均為記憶體操作,無 IO 消耗
  • 底層使用 PHP 引用計數實作,無記憶體拷貝。即使是傳遞巨大字元串或數組也不會産生額外性能消耗

方法

  1. Channel->push :當隊列中有其他協程正在等待 pop 資料時,自動按順序喚醒一個消費者協程。當隊列已滿時自動 yield 讓出控制器,等待其他協程消費資料
  2. Channel->pop:當隊列為空時自動 yield,等待其他協程生産資料。消費資料後,隊列可寫入新的資料,自動按順序喚醒一個生産者協程

連接配接池

使用

Coroutine\Channel

來實作 MySQL 連接配接池可以使用 defer 特性來實作資源的回收,同時可以被協程排程,而且使用

channel->pop

方法的時候,可以設定逾時,減少一系列的心智負擔

代碼實作

namespace SwExample;

class MysqlPool
{
    private static $instance;
    private $pool;  //連接配接池容器,一個 channel
    private $config;

    /**
     * @param  null $config
     * @return MysqlPool
     * @desc   擷取連接配接池執行個體
     */
    public static function getInstance($config = null)
    {
        if (empty(self::$instance)) {
            if (empty($config)) {
                throw new \RuntimeException("mysql config empty");
            }
            self::$instance = new static($config);
        }

        return self::$instance;
    }

   /**
    * MysqlPool constructor.
    *
    * @param $config
    * @desc  初始化,自動建立執行個體,需要放在 workerstart 中執行
    */
   public function __construct($config)
   {
      if (empty($this->pool)) {
         $this->config = $config;
         $this->pool = new chan($config['pool_size']);
         for ($i = 0; $i < $config['pool_size']; $i++) {
            $mysql = new MySQL();
            $res = $mysql->connect($config);
            if ($res == false) {
               //連接配接失敗,抛異常
               throw new \RuntimeException("failed to connect mysql server.");
            } else {
               //mysql 連接配接存入 channel
               $this->put($mysql);
            }
         }
      }
   }

   /**
     * @param $mysql
     * @desc  放入一個 mysql 連接配接入池
     */
    public function put($mysql)
    {
        $this->pool->push($mysql);
    }

    /**
     * @return mixed
     * @desc   擷取一個連接配接,當逾時,傳回一個異常
     */
    public function get()
    {
        $mysql = $this->pool->pop($this->config['pool_get_timeout']);
        if (false === $mysql) {
            throw new \RuntimeException("get mysql timeout, all mysql connection is used");
        }
        return $mysql;
    }

    /**
     * @return mixed
     * @desc   擷取當時連接配接池可用對象
     */
    public function getLength()
    {
        return $this->pool->length();
    }

}           

複制

使用

<?php

require '../vendor/autoload.php';

use SwExample\MysqlPool;

$config = [
    'host' => '127.0.0.1',   //資料庫 ip
    'port' => 3306,          //資料庫端口
    'user' => 'root',        //資料庫使用者名
    'password' => 'root', //資料庫密碼
    'database' => 'wordpress',   //預設資料庫名
    'timeout' => 0.5,       //資料庫連接配接逾時時間
    'charset' => 'utf8mb4', //預設字元集
    'strict_type' => true,  //ture,會自動表數字轉為 int 類型
    'pool_size' => '3',     //連接配接池大小
    'pool_get_timeout' => 0.5, //當在此時間内未獲得到一個連接配接,會立即傳回。(表示所有的連接配接都已在使用中)
];

//建立 http server
$http = new Swoole\Http\Server("0.0.0.0", 9501);
$http->set([
      //"daemonize" => true, // 常駐程序模式運作
      "worker_num" => 1,
      "log_level" => SWOOLE_LOG_ERROR,
    ]);

$http->on('WorkerStart', function ($serv, $worker_id) use ($config) {
        //worker 啟動時,每個程序都初始化連接配接池,在 onRequest 中可以直接使用
        try {
            MysqlPool::getInstance($config);
        } catch (\Exception $e) {
            //初始化異常,關閉服務
            echo $e->getMessage() . PHP_EOL;
            $serv->shutdown();
        } catch (\Throwable $throwable) {
            //初始化異常,關閉服務
            echo $throwable->getMessage() . PHP_EOL;
            $serv->shutdown();
        }
    });

$http->on('request', function ($request, $response) {

        //浏覽器會自動發起這個請求 避免占用請求
        if ($request->server['path_info'] == '/favicon.ico') {
            $response->end('');
            return;
        }

        //擷取資料庫
        if ($request->server['path_info'] == '/list') {
            go(function () use ($request, $response) {
                    //從池子中擷取一個執行個體
                    try {
                        $pool = MysqlPool::getInstance();
                        $mysql = $pool->get();
                        defer(function () use ($mysql) {
                                //利用 defer 特性,可以達到協程執行完成,歸還$mysql 到連接配接池
                                //好處是 可能因為業務代碼很長,導緻亂用或者忘記把資源歸還
                                MysqlPool::getInstance()->put($mysql);
                                echo "目前可用連接配接數:" . MysqlPool::getInstance()->getLength() . PHP_EOL;
                            });
                        $result = $mysql->query("select * from wp_users");
                        $response->end(json_encode($result));
                    } catch (\Exception $e) {
                       $response->end($e->getMessage());
                    }
            });
            return;
        }

        echo "get request:".date('Y-m-d H:i:s').PHP_EOL;
        if ($request->server['path_info'] == '/timeout') {
            go(function () use ($request, $response) {
                    //從池子中擷取一個執行個體
                    try {
                        $pool = MysqlPool::getInstance();
                        $mysql = $pool->get();
                        defer(function () use ($mysql) {
                                //協程執行完成,歸還$mysql 到連接配接池
                                MysqlPool::getInstance()->put($mysql);
                                echo "目前可用連接配接數:" . MysqlPool::getInstance()->getLength() . PHP_EOL;
                        });
                        $result = $mysql->query("select * from wp_users");
                        \Swoole\Coroutine::sleep(10); //sleep 10 秒,模拟耗時操作
                        $response->end(date('Y-m-d H:i:s').PHP_EOL.json_encode($result));
                    } catch (\Exception $e) {
                       $response->end($e->getMessage());
                    }
            });
            return;
        }
    });

$http->start();           

複制

通路

http://127.0.0.1:9501/list

可以看到正常的結果輸出

通路

http://127.0.0.1:9501/timeout

示範連接配接池取和存的過程

模拟 timeout, 需要浏覽器打開 4 個 tab 頁面,都請求

http://127.0.0.1:9501/timeout

,前三個應該是等 10 秒出結果,第四個 500ms 後出逾時結果

如果是 chrome 浏覽器,會對完全一樣的 url 做并發請求限制需要加一個随機數,例如

http://127.0.0.1:9501/timeout?n=0

http://127.0.0.1:9501/timeout?n=1

沈唁志,一個PHPer的成長之路!

任何個人或團體,未經允許禁止轉載本文:《使用Coroutine\Channel實作一個簡單的MySQL連接配接池》,謝謝合作!