swoole伺服器開啟後,通過事件回調來處理對應的邏輯,easyswoole封裝了事件注冊類
1.先看看再服務啟動時,是如何注冊事件的,通過EventRegister類來管理監聽事件和對應的回調函數;
通過$events = $register->all();擷取所有的注冊事件
$this->finalHook中,EventHelper注冊預設的onWorkerStart、OnTask、OnFinish、OnPipeMessage、OnRequest事件到注冊器中
//建立預設的事件注冊器
$register = new EventRegister();
$this->finalHook($register);
//自定義事件注冊類
EasySwooleEvent::mainServerCreate($this,$register);
$events = $register->all();
foreach ($events as $event => $callback){
$this->mainServer->on($event, function () use ($callback) {
$ret = [];
$args = func_get_args();
foreach ($callback as $item) {
array_push($ret,Invoker::callUserFuncArray($item, $args));
}
if(count($ret) > 1){
return $ret;
}
return array_shift($ret);
});
private function finalHook(EventRegister $register)
{
//執行個體化對象池管理
PoolManager::getInstance();
$register->add($register::onWorkerStart,function (\swoole_server $server,int $workerId){
PoolManager::getInstance()->__workerStartHook($workerId);
$workerNum = Config::getInstance()->getConf('MAIN_SERVER.SETTING.worker_num');
$name = Config::getInstance()->getConf('SERVER_NAME');
if(PHP_OS != 'Darwin'){
if($workerId <= ($workerNum -1)){
$name = "{$name}_Worker_".$workerId;
}else{
$name = "{$name}_Task_Worker_".$workerId;
}
cli_set_process_title($name);
}
});
EventHelper::registerDefaultOnTask($register);
EventHelper::registerDefaultOnFinish($register);
EventHelper::registerDefaultOnPipeMessage($register);
$conf = Config::getInstance()->getConf("MAIN_SERVER");
if($conf['SERVER_TYPE'] == self::TYPE_WEB_SERVER || $conf['SERVER_TYPE'] == self::TYPE_WEB_SOCKET_SERVER){
if(!$register->get($register::onRequest)){
EventHelper::registerDefaultOnRequest($register);
}
}
}
2. 事件注冊類 \easyswoole\vendor\easyswoole\easyswoole\src\Core\Swoole\EventRegister.php
繼承了Event類,設定許可的事件
class EventRegister extends Event
{
const onStart = 'start';
const onShutdown = 'shutdown';
const onWorkerStart = 'workerStart';
const onWorkerStop = 'workerStop';
const onWorkerExit = 'workerExit';
const onTimer = 'timer';
const onConnect = 'connect';
const onReceive = 'receive';
const onPacket = 'packet';
const onClose = 'close';
const onBufferFull = 'bufferFull';
const onBufferEmpty = 'bufferEmpty';
const onTask = 'task';
const onFinish = 'finish';
const onPipeMessage = 'pipeMessage';
const onWorkerError = 'workerError';
const onManagerStart = 'managerStart';
const onManagerStop = 'managerStop';
const onRequest = 'request';
const onHandShake = 'handShake';
const onMessage = 'message';
const onOpen = 'open';
function __construct(array $allowKeys = null)
{
parent::__construct([
'start','shutdown','workerStart','workerStop','workerExit','timer',
'connect','receive','packet','close','bufferFull','bufferEmpty','task',
'finish','pipeMessage','workerError','managerStart','managerStop',
'request','handShake','message','open'
]);
}
function add($key, $item): EventRegister
{
if(!parent::add($key,$item)){
trigger_error("event {$key} invalid or not allow");
}
return $this;
}
function set($key, $item)
{
if(!parent::set($key,$item)){
trigger_error("event {$key} invalid or not allow");
}
return $this;
}
}
3. \easyswoole\vendor\easyswoole\easyswoole\src\Core\Component\Event.php
class Event extends MultiContainer
{
function add($key, $item)
{
if(is_callable($item)){
return parent::add($key, $item);
}else{
return false;
}
}
function set($key, $item)
{
if(is_callable($item)){
return parent::set($key, $item);
}else{
return false;
}
}
public function hook($event,...$args)
{
$calls = $this->get($event);
if(is_array($calls)){
foreach ($calls as $call){
try{
Invoker::callUserFunc($call,...$args);
}catch (\Throwable $throwable){
Trigger::throwable($throwable);
}
}
}
}
}
4. \easyswoole\vendor\easyswoole\easyswoole\src\Core\Component\MultiContainer.php
class MultiContainer
{
private $container = [];
private $allowKeys = null;
function __construct(array $allowKeys = null)
{
$this->allowKeys = $allowKeys;
}
function add($key,$item)
{
if(is_array($this->allowKeys) && !in_array($key,$this->allowKeys)){
return false;
}
$this->container[$key][] = $item;
return $this;
}
function set($key,$item)
{
if(is_array($this->allowKeys) && !in_array($key,$this->allowKeys)){
return false;
}
$this->container[$key] = [$item];
return $this;
}
function delete($key)
{
if(isset($this->container[$key])){
unset($this->container[$key]);
}
return $this;
}
function get($key):?array
{
if(isset($this->container[$key])){
return $this->container[$key];
}else{
return null;
}
}
function all():array
{
return $this->container;
}
}
5.預設注冊事件實作調用EventHelper注冊預設的onWorkerStart、OnTask、OnFinish、OnPipeMessage、OnRequest事件到注冊器中 \easyswoole\vendor\easyswoole\easyswoole\src\Core\Swoole\EventHelper.php
5.1 先看OnRequest事件
執行個體化路由排程器Dispatcher,把請求發送到對應的方法執行
執行個體化Request,Response對象;
調用EasySwooleEvent::onRequest 執行http控制器前執行邏輯
$dispatcher->dispatch() 執行http控制器
EasySwooleEvent::afterAction 執行http控制器後執行邏輯
public static function registerDefaultOnRequest(EventRegister $register,$controllerNameSpace = 'App\\HttpController\\'):void
{
$dispatcher = new Dispatcher($controllerNameSpace);
$register->set($register::onRequest,function (\swoole_http_request $request,\swoole_http_response $response)use($dispatcher){
$request_psr = new Request($request);
$response_psr = new Response($response);
try{
EasySwooleEvent::onRequest($request_psr,$response_psr);
$dispatcher->dispatch($request_psr,$response_psr);
EasySwooleEvent::afterAction($request_psr,$response_psr);
}catch (\Throwable $throwable){
$handler = Di::getInstance()->get(SysConst::HTTP_EXCEPTION_HANDLER);
if($handler instanceof ExceptionHandlerInterface){
$handler->handle($throwable,$request_psr,$response_psr);
}else{
$response_psr->withStatus(Status::CODE_INTERNAL_SERVER_ERROR);
$response_psr->write(nl2br($throwable->getMessage() ."\n". $throwable->getTraceAsString()));
}
}
$response_psr->response();
});
}
5.2 預設OnTask事件
task回調函數接收到$server,$taskId,$fromWorkerId,$taskObj; 其中$taskObj就是我們的work邏輯中,抛出task任務所傳的值,Task抽象類:easyswoole\vendor\easyswoole\easyswoole\src\Core\Swoole\Task\AbstractAsyncTask.php。我們可以通過繼承task抽象類,來實作自定義task任務;
如何投遞任務
public static function registerDefaultOnTask(EventRegister $register):void
{
$register->set($register::onTask,function (\swoole_server $server, $taskId, $fromWorkerId,$taskObj)
{
if(is_string($taskObj) && class_exists($taskObj)){
$taskObj = new $taskObj;
}
if($taskObj instanceof AbstractAsyncTask){
try{
$ret = $taskObj->run($taskObj->getData(),$taskId,$fromWorkerId);
//在有return或者設定了結果的時候 說明需要執行結束回調
$ret = is_null($ret) ? $taskObj->getResult() : $ret;
if(!is_null($ret)){
$taskObj->setResult($ret);
return $taskObj;
}
}catch (\Throwable $throwable){
$taskObj->onException($throwable);
}
}else if($taskObj instanceof SuperClosure){
try{
return $taskObj( $server, $taskId, $fromWorkerId);
}catch (\Throwable $throwable){
Trigger::throwable($throwable);
}
}
return null;
});
}
5.3 預設onPipeMessage
也是傳遞一個系列化的Message對象
public static function registerDefaultOnPipeMessage(EventRegister $register):void
{
$register->set($register::onPipeMessage,function (\swoole_server $server,$fromWorkerId,$data){
$message = unserialize($data);
if($message instanceof Message){
PipeMessageEventRegister::getInstance()->hook($message->getCommand(),$fromWorkerId,$message->getData());
}else{
Trigger::error("data :{$data} not packet by swoole_serialize or not a Message Instance");
}
});
}
6. 注冊主服務回調事件 (自定義事件回調),除了預設的事件注冊,我們還可以 EasySwooleEvent::mainServerCreate($this,$register); 操作$register,增加事件回調處理邏輯
\easyswoole\EasySwooleEvent.php
從上面的預設OnRequest事件我們可以發現,EasySwooleEvent::OnRequest(),EasySwooleEvent::afterAction()會在執行控制器方法前後調用。但是在這兩個方法中,執行$respond->end(‘向用戶端發送消息’)不會向用戶端發送消息
Class EasySwooleEvent implements EventInterface {
public static function frameInitialize(): void
{
// TODO: Implement frameInitialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(ServerManager $server,EventRegister $register): void
{
//在這裡可以 例如為主服務注冊onWorkerStart事件
$register->add($register::onWorkerStart,function (\swoole_server $server,int $workerId){
var_dump($workerId.'start');
});
//添加一個自定義程序
ProcessManager::getInstance()->addProcess('test_process',Test::class);
//添加一個子服務
$tcp = $server->addServer('tcp',9502);
$tcp->set($tcp::onReceive,function (\swoole_server $server, int $fd, int $reactor_id, string $data){
var_dump('rec'.$data);
});
// TODO: Implement mainServerCreate() method.
}
public static function onRequest(Request $request,Response $response): void
{
// TODO: Implement onRequest() method.
}
public static function afterAction(Request $request,Response $response): void
{
// TODO: Implement afterAction() method.
}
}