天天看點

swoole_process實作程序池的方法示例

swoole —— 重新定義PHP

swoole 的程序之間有兩種通信方式,一種是消息隊列(queue),另一種是管道(pipe),對swoole_process 的研究在swoole中顯得尤為重要。

預備知識

IO多路複用

swoole 中的io多路複用表現為底層的 epoll程序模型,在C語言中表現為 epoll 函數。

  • epoll 模型下會持續監聽自己名下的素有socket 描述符 fd
  • 當觸發了 socket 監聽的事件時,epoll 函數才會響應,并傳回所有監聽該時間的 socket 集合
  • epoll 的本質是阻塞IO,它的優點在于能同僚處理大量socket連接配接

Event loop 事件循環

swoole 對 epoll 實作了一個Reactor線程模型封裝,設定了read事件和write事件的監聽回調函數。(詳見swoole_event_add)

  • Event loop 是一個Reactor線程,其中運作了一個epoll執行個體。
  • 通過swoole_event_add将socket描述符的一個事件添加到epoll監聽中,事件發生時将執行回調函數
  • 不可用于fpm環境下,因為fpm在任務結束時可能會關掉程序。

swoole_process

  • 基于C語言封裝的程序管理子產品,友善php來調用
  • 内置管道、消息隊列接口,友善實作程序間通信

我們在php-fpm.conf配置檔案中發現,php-fpm中有兩種程序池管理設定。

  • 靜态模式 即初始化固定的程序數,當來了一個請求時,從中選取一個程序來處理。
  • 動态模式 指定最小、最大程序數,當請求量過大,程序數不超過最大限制時,新增線程去處理請求

接下來用swoole代碼來實作,這裡隻是為了解swoole_process、程序間通信、定時器等使用,實際情況使用封裝好的swoole_server來實作task任務隊列池會更友善。

假如有個定時投遞的任務隊列:

<?php
/**
* 動态程序池,類似fpm
* 動态建立程序
* 有初始程序數,最小程序數,程序不夠處理時候建立程序,不超過最大程序數
*/
// 一個程序定時投遞任務
/**
* 1. tick
* 2. process及其管道通訊
* 3. event loop 事件循環
*/
class processPool
{
private $pool;
/**
* @var swoole_process[] 記錄所有worker的process對象
*/
private $workers = [];
/**
* @var array 記錄worker工作狀态
*/
private $used_workers = [];
/**
* @var int 最小程序數
*/
private $min_woker_num = 5;
/**
* @var int 初始程序數
*/
private $start_worker_num = 10;
/**
* @var int 最大程序數
*/
private $max_woker_num = 20;
/**
* 程序閑置銷毀秒數
* @var int
*/
private $idle_seconds = 5;
/**
* @var int 目前程序數
*/
private $curr_num;
/**
* 閑置程序時間戳
* @var array
*/
private $active_time = [];
public function __construct()
{
$this- pool = new swoole_process(function () {
// 循環建立worker程序
for ($i = 0; $i < $this- start_worker_num; $i++) {
$this- createWorker();
}
echo '初始化程序數:' . $this- curr_num . PHP_EOL;
// 每秒定時往閑置的worker的管道中投遞任務
swoole_timer_tick(1000, function ($timer_id) {
static $count = 0;
$count++;
$need_create = true;
foreach ($this- used_workers as $pid =  $used) {
if ($used == 0) {
$need_create = false;
$this- workers[$pid]- write($count . ' job');
// 标記使用中
$this- used_workers[$pid] = 1;
$this- active_time[$pid] = time();
break;
}
}
foreach ($this- used_workers as $pid =  $used)
// 如果所有worker隊列都沒有閑置的,則建立一個worker來處理
if ($need_create && $this- curr_num < $this- max_woker_num) {
$new_pid = $this- createWorker();
$this- workers[$new_pid]- write($count . ' job');
$this- used_workers[$new_pid] = 1;
$this- active_time[$new_pid] = time();
}
// 閑置超過一段時間則銷毀程序
foreach ($this- active_time as $pid =  $timestamp) {
if ((time() - $timestamp)   $this- idle_seconds && $this- curr_num   $this- min_woker_num) {
// 銷毀該程序
if (isset($this- workers[$pid]) && $this- workers[$pid] instanceof swoole_process) {
$this- workers[$pid]- write('exit');
unset($this- workers[$pid]);
$this- curr_num = count($this- workers);
unset($this- used_workers[$pid]);
unset($this- active_time[$pid]);
echo "{$pid} destroyed\n";
break;
}
}
}
echo "任務{$count}/{$this- curr_num}\n";
if ($count == 20) {
foreach ($this- workers as $pid =  $worker) {
$worker- write('exit');
}
// 關閉定時器
swoole_timer_clear($timer_id);
// 退出程序池
$this- pool- exit(0);
exit();
}
});
});
$master_pid = $this- pool- start();
echo "Master $master_pid start\n";
while ($ret = swoole_process::wait()) {
$pid = $ret['pid'];
echo "process {$pid} existed\n";
}
}
/**
* 建立一個新程序
* @return int 新程序的pid
*/
public function createWorker()
{
$worker_process = new swoole_process(function (swoole_process $worker) {
// 給子程序管道綁定事件
swoole_event_add($worker- pipe, function ($pipe) use ($worker) {
$data = trim($worker- read());
if ($data == 'exit') {
$worker- exit(0);
exit();
}
echo "{$worker- pid} 正在處理 {$data}\n";
sleep(5);
// 傳回結果,表示空閑
$worker- write("complete");
});
});
$worker_pid = $worker_process- start();
// 給父程序管道綁定事件
swoole_event_add($worker_process- pipe, function ($pipe) use ($worker_process) {
$data = trim($worker_process- read());
if ($data == 'complete') {
// 标記為空閑
//        echo "{$worker_process- pid} 空閑了\n";
$this- used_workers[$worker_process- pid] = 0;
}
});
// 儲存process對象
$this- workers[$worker_pid] = $worker_process;
// 标記為空閑
$this- used_workers[$worker_pid] = 0;
$this- active_time[$worker_pid] = time();
$this- curr_num = count($this- workers);
return $worker_pid;
}
}
new processPool();           

複制

以上就是本文的全部内容,希望對大家的學習有所幫助。