天天看點

easyswoole2.X源碼解讀-事件注冊EventRegister

swoole伺服器開啟後,通過事件回調來處理對應的邏輯,easyswoole封裝了事件注冊類

1.先看看再服務啟動時,是如何注冊事件的,通過EventRegister類來管理監聽事件和對應的回調函數;

通過$events = $register->all();擷取所有的注冊事件

$this->finalHook中,EventHelper注冊預設的onWorkerStart、OnTask、OnFinish、OnPipeMessage、OnRequest事件到注冊器中

//建立預設的事件注冊器
        $register = new EventRegister();
        $this->finalHook($register);
        //自定義事件注冊類
        EasySwooleEvent::mainServerCreate($this,$register);
        $events = $register->all();
        foreach ($events as $event => $callback){
            $this->mainServer->on($event, function () use ($callback) {
                $ret = [];
                $args = func_get_args();
                foreach ($callback as $item) {
                    array_push($ret,Invoker::callUserFuncArray($item, $args));
                }
                if(count($ret) > 1){
                    return $ret;
                }
                return array_shift($ret);
            });
           
private function finalHook(EventRegister $register)
    {
        //執行個體化對象池管理
        PoolManager::getInstance();
        $register->add($register::onWorkerStart,function (\swoole_server $server,int $workerId){
            PoolManager::getInstance()->__workerStartHook($workerId);
            $workerNum = Config::getInstance()->getConf('MAIN_SERVER.SETTING.worker_num');
            $name = Config::getInstance()->getConf('SERVER_NAME');
            if(PHP_OS != 'Darwin'){
                if($workerId <= ($workerNum -1)){
                    $name = "{$name}_Worker_".$workerId;
                }else{
                    $name = "{$name}_Task_Worker_".$workerId;
                }
                cli_set_process_title($name);
            }
        });
        EventHelper::registerDefaultOnTask($register);
        EventHelper::registerDefaultOnFinish($register);
        EventHelper::registerDefaultOnPipeMessage($register);
        $conf = Config::getInstance()->getConf("MAIN_SERVER");
        if($conf['SERVER_TYPE'] == self::TYPE_WEB_SERVER || $conf['SERVER_TYPE'] == self::TYPE_WEB_SOCKET_SERVER){
            if(!$register->get($register::onRequest)){
                EventHelper::registerDefaultOnRequest($register);
            }
        }
    }
           

2. 事件注冊類  \easyswoole\vendor\easyswoole\easyswoole\src\Core\Swoole\EventRegister.php  

繼承了Event類,設定許可的事件

class EventRegister extends Event
{
    const onStart = 'start';
    const onShutdown = 'shutdown';
    const onWorkerStart = 'workerStart';
    const onWorkerStop = 'workerStop';
    const onWorkerExit = 'workerExit';
    const onTimer = 'timer';
    const onConnect = 'connect';
    const onReceive = 'receive';
    const onPacket = 'packet';
    const onClose = 'close';
    const onBufferFull = 'bufferFull';
    const onBufferEmpty = 'bufferEmpty';
    const onTask = 'task';
    const onFinish = 'finish';
    const onPipeMessage = 'pipeMessage';
    const onWorkerError = 'workerError';
    const onManagerStart = 'managerStart';
    const onManagerStop = 'managerStop';
    const onRequest = 'request';
    const onHandShake = 'handShake';
    const onMessage = 'message';
    const onOpen = 'open';

    function __construct(array $allowKeys = null)
    {
        parent::__construct([
            'start','shutdown','workerStart','workerStop','workerExit','timer',
            'connect','receive','packet','close','bufferFull','bufferEmpty','task',
            'finish','pipeMessage','workerError','managerStart','managerStop',
            'request','handShake','message','open'
        ]);
    }


    function add($key, $item): EventRegister
    {
        if(!parent::add($key,$item)){
            trigger_error("event {$key} invalid or not allow");
        }
        return $this;
    }

    function set($key, $item)
    {
        if(!parent::set($key,$item)){
            trigger_error("event {$key} invalid or not allow");
        }
        return $this;
    }
}
           

3.  \easyswoole\vendor\easyswoole\easyswoole\src\Core\Component\Event.php

class Event extends MultiContainer
{
    function add($key, $item)
    {
        if(is_callable($item)){
            return parent::add($key, $item);
        }else{
            return false;
        }
    }

    function set($key, $item)
    {
        if(is_callable($item)){
            return parent::set($key, $item);
        }else{
            return false;
        }
    }

    public function hook($event,...$args)
    {
        $calls = $this->get($event);
        if(is_array($calls)){
            foreach ($calls as $call){
                try{
                    Invoker::callUserFunc($call,...$args);
                }catch (\Throwable $throwable){
                    Trigger::throwable($throwable);
                }
            }
        }
    }
}
           

4.  \easyswoole\vendor\easyswoole\easyswoole\src\Core\Component\MultiContainer.php

class MultiContainer
{
    private $container = [];
    private $allowKeys = null;

    function __construct(array $allowKeys = null)
    {
        $this->allowKeys = $allowKeys;
    }

    function add($key,$item)
    {
        if(is_array($this->allowKeys) && !in_array($key,$this->allowKeys)){
            return false;
        }
        $this->container[$key][] = $item;
        return $this;
    }

    function set($key,$item)
    {
        if(is_array($this->allowKeys) && !in_array($key,$this->allowKeys)){
            return false;
        }
        $this->container[$key] = [$item];
        return $this;
    }

    function delete($key)
    {
        if(isset($this->container[$key])){
            unset($this->container[$key]);
        }
        return $this;
    }

    function get($key):?array
    {
        if(isset($this->container[$key])){
            return $this->container[$key];
        }else{
            return null;
        }
    }

    function all():array
    {
        return $this->container;
    }
}
           

5.預設注冊事件實作調用EventHelper注冊預設的onWorkerStart、OnTask、OnFinish、OnPipeMessage、OnRequest事件到注冊器中 \easyswoole\vendor\easyswoole\easyswoole\src\Core\Swoole\EventHelper.php

5.1 先看OnRequest事件

執行個體化路由排程器Dispatcher,把請求發送到對應的方法執行

執行個體化Request,Response對象;

調用EasySwooleEvent::onRequest 執行http控制器前執行邏輯

$dispatcher->dispatch() 執行http控制器

EasySwooleEvent::afterAction 執行http控制器後執行邏輯

public static function registerDefaultOnRequest(EventRegister $register,$controllerNameSpace = 'App\\HttpController\\'):void
    {
        $dispatcher = new Dispatcher($controllerNameSpace);
        $register->set($register::onRequest,function (\swoole_http_request $request,\swoole_http_response $response)use($dispatcher){
            $request_psr = new Request($request);
            $response_psr = new Response($response);
            try{
                EasySwooleEvent::onRequest($request_psr,$response_psr);
                $dispatcher->dispatch($request_psr,$response_psr);
                EasySwooleEvent::afterAction($request_psr,$response_psr);
            }catch (\Throwable $throwable){
                $handler = Di::getInstance()->get(SysConst::HTTP_EXCEPTION_HANDLER);
                if($handler instanceof ExceptionHandlerInterface){
                    $handler->handle($throwable,$request_psr,$response_psr);
                }else{
                    $response_psr->withStatus(Status::CODE_INTERNAL_SERVER_ERROR);
                    $response_psr->write(nl2br($throwable->getMessage() ."\n". $throwable->getTraceAsString()));
                }
            }
            $response_psr->response();
        });
    }
           

5.2 預設OnTask事件 

task回調函數接收到$server,$taskId,$fromWorkerId,$taskObj;  其中$taskObj就是我們的work邏輯中,抛出task任務所傳的值,Task抽象類:easyswoole\vendor\easyswoole\easyswoole\src\Core\Swoole\Task\AbstractAsyncTask.php。我們可以通過繼承task抽象類,來實作自定義task任務;

如何投遞任務

public static function registerDefaultOnTask(EventRegister $register):void
    {
        $register->set($register::onTask,function (\swoole_server $server, $taskId, $fromWorkerId,$taskObj)
        {
            if(is_string($taskObj) && class_exists($taskObj)){
                $taskObj = new $taskObj;
            }
            if($taskObj instanceof AbstractAsyncTask){
                try{
                    $ret =  $taskObj->run($taskObj->getData(),$taskId,$fromWorkerId);
                    //在有return或者設定了結果的時候  說明需要執行結束回調
                    $ret = is_null($ret) ? $taskObj->getResult() : $ret;
                    if(!is_null($ret)){
                        $taskObj->setResult($ret);
                        return $taskObj;
                    }
                }catch (\Throwable $throwable){
                    $taskObj->onException($throwable);
                }
            }else if($taskObj instanceof SuperClosure){
                try{
                    return $taskObj( $server, $taskId, $fromWorkerId);
                }catch (\Throwable $throwable){
                    Trigger::throwable($throwable);
                }
            }
            return null;
        });
    }
           

5.3 預設onPipeMessage 

也是傳遞一個系列化的Message對象

public static function registerDefaultOnPipeMessage(EventRegister $register):void
    {
        $register->set($register::onPipeMessage,function (\swoole_server $server,$fromWorkerId,$data){
            $message = unserialize($data);
            if($message instanceof Message){
                PipeMessageEventRegister::getInstance()->hook($message->getCommand(),$fromWorkerId,$message->getData());
            }else{
                Trigger::error("data :{$data} not packet by swoole_serialize or not a Message Instance");
            }
        });
    }
           

6. 注冊主服務回調事件  (自定義事件回調),除了預設的事件注冊,我們還可以 EasySwooleEvent::mainServerCreate($this,$register); 操作$register,增加事件回調處理邏輯

\easyswoole\EasySwooleEvent.php

從上面的預設OnRequest事件我們可以發現,EasySwooleEvent::OnRequest(),EasySwooleEvent::afterAction()會在執行控制器方法前後調用。但是在這兩個方法中,執行$respond->end(‘向用戶端發送消息’)不會向用戶端發送消息

Class EasySwooleEvent implements EventInterface {

    public static function frameInitialize(): void
    {
        // TODO: Implement frameInitialize() method.
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(ServerManager $server,EventRegister $register): void
    {
        //在這裡可以 例如為主服務注冊onWorkerStart事件
        $register->add($register::onWorkerStart,function (\swoole_server $server,int $workerId){
             var_dump($workerId.'start');      
        });

        //添加一個自定義程序
        ProcessManager::getInstance()->addProcess('test_process',Test::class);

        //添加一個子服務
        $tcp = $server->addServer('tcp',9502);
           $tcp->set($tcp::onReceive,function (\swoole_server $server, int $fd, int $reactor_id, string $data){
           var_dump('rec'.$data);
        });
        // TODO: Implement mainServerCreate() method.
    }

    public static function onRequest(Request $request,Response $response): void
    {
        // TODO: Implement onRequest() method.
    }

    public static function afterAction(Request $request,Response $response): void
    {
        // TODO: Implement afterAction() method.
    }
}