天天看点

mooon-agent接收状态机代码摘要

<b>recv_machine.h</b>

#ifndef MOOON_AGENT_RECV_MACHINE_H  

#define MOOON_AGENT_RECV_MACHINE_H  

#include &lt;agent/message.h&gt;  

AGENT_NAMESPACE_BEGIN  

class CAgentThread;  

class CRecvMachine  

{  

private:  

    /***  

      * 接收状态值  

      */  

    typedef enum recv_state_t  

    {  

        rs_header, /** 接收消息头状态 */  

        rs_body /** 接收消息体状态 */  

    }TRecvState;  

      * 接收状态上下文  

    struct RecvStateContext  

        const char* buffer; /** 当前的数据buffer */  

        size_t buffer_size; /** 当前的数据字节数 */  

        RecvStateContext(const char* buf=NULL, size_t buf_size=0)  

         :buffer(buf)  

         ,buffer_size(buf_size)  

        {  

        }  

        RecvStateContext(const RecvStateContext&amp; other)  

         :buffer(other.buffer)  

         ,buffer_size(other.buffer_size)  

        RecvStateContext&amp; operator =(const RecvStateContext&amp; other)  

            buffer = other.buffer;  

            buffer_size = other.buffer_size;  

            return *this;  

    };  

public:  

    CRecvMachine(CAgentThread* thread);  

    util::handle_result_t work(const char* buffer, size_t buffer_size);  

    void reset();  

    void set_next_state(recv_state_t next_state)  

        _recv_state = next_state;  

        _finished_size = 0;  

    }  

    util::handle_result_t handle_header(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);  

    util::handle_result_t handle_body(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);  

    util::handle_result_t handle_error(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx);  

private:   

    CAgentThread* _thread; /** 需要通过CAgentThread取得CProcessorManager */  

    agent_message_header_t _header; /** 消息头,这个大小是固定的 */  

    recv_state_t _recv_state; /** 当前的接收状态 */  

    size_t _finished_size; /** 当前状态已经接收到的字节数,注意不是总的已经接收到的字节数,只针对当前状态 */  

};  

AGENT_NAMESPACE_END  

#endif // MOOON_AGENT_RECV_MACHINE_H  

recv_machine.cpp

#include "recv_machine.h" 

#include "agent_thread.h" 

CRecvMachine::CRecvMachine(CAgentThread* thread)  

 :_thread(thread)  

    set_next_state(rs_header);  

}  

// 状态机入口函数  

// 状态机工作原理:-&gt; rs_header -&gt; rs_body -&gt; rs_header  

// -&gt; rs_header -&gt; rs_error -&gt; rs_header  

// -&gt; rs_header -&gt; rs_body -&gt; rs_error -&gt; rs_header  

// 参数说明:  

// buffer - 本次收到的数据,注意不是总的  

// buffer_size - 本次收到的数据字节数  

util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)  

{   

    RecvStateContext next_ctx(buffer, buffer_size);   

    util::handle_result_t hr = util::handle_continue;  

    // 状态机循环条件为:util::handle_continue == hr  

    while (util::handle_continue == hr)  

    {   

        RecvStateContext cur_ctx(next_ctx);  

        switch (_recv_state)  

        case rs_header:  

            hr = handle_header(cur_ctx, &amp;next_ctx);  

            break;  

        case rs_body:  

            hr = handle_body(cur_ctx, &amp;next_ctx);  

        default:  

            hr = handle_error(cur_ctx, &amp;next_ctx);  

    return hr;  

void CRecvMachine::reset()  

// 处理消息头部  

// cur_ctx - 当前上下文,  

// cur_ctx.buffer为当前收到的数据buffer,包含了消息头,但也可能包含了消息体。  

// cur_ctx.buffer_size为当前收到字节数  

// next_ctx - 下一步上下文,  

// 由于cur_ctx.buffer可能包含了消息体,所以在一次接收receive动作后,  

// 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_body的cur_ctx  

util::handle_result_t CRecvMachine::handle_header(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)  

    if (_finished_size + cur_ctx.buffer_size &lt; sizeof(agent_message_header_t))  

        memcpy(reinterpret_cast&lt;char*&gt;(&amp;_header) + _finished_size  

              ,cur_ctx.buffer  

              ,cur_ctx.buffer_size);  

        _finished_size += cur_ctx.buffer_size;  

        return util::handle_continue;  

    else 

        size_t need_size = sizeof(agent_message_header_t) - _finished_size;  

              ,need_size);  

        // TODO: Check header here  

        size_t remain_size = cur_ctx.buffer_size - need_size;  

        if (remain_size &gt; 0)  

            next_ctx-&gt;buffer = cur_ctx.buffer + need_size;  

            next_ctx-&gt;buffer_size = cur_ctx.buffer_size - need_size;  

        // 只有当包含消息体时,才需要进行状态切换,  

        // 否则维持rs_header状态不变  

        if (_header.size &gt; 0)  

            // 切换状态  

            set_next_state(rs_body);  

        else 

        {   

            CProcessorManager* processor_manager = _thread-&gt;get_processor_manager();   

            if (!processor_manager-&gt;on_message(_header, 0, NULL, 0))  

            {  

                return util::handle_error;  

            }  

        return (remain_size &gt; 0)  

              ? util::handle_continue // 控制work过程是否继续循环  

              : util::handle_finish;  

// 处理消息体  

// cur_ctx.buffer为当前收到的数据buffer,包含了消息体,但也可能包含了消息头。  

// 由于cur_ctx.buffer可能包含了消息头,所以在一次接收receive动作后,  

// 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_header的cur_ctx  

util::handle_result_t CRecvMachine::handle_body(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)  

    CProcessorManager* processor_manager = _thread-&gt;get_processor_manager();  

    if (_finished_size + cur_ctx.buffer_size &lt; _header.size)  

        if (!processor_manager-&gt;on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))  

            return util::handle_error;  

        size_t need_size = _header.size - _finished_size;  

        if (!processor_manager-&gt;on_message(_header, _finished_size, cur_ctx.buffer, need_size))  

        // 切换状态  

        set_next_state(rs_header);  

            return util::handle_continue;  

        return util::handle_finish;   

util::handle_result_t CRecvMachine::handle_error(const RecvStateContext&amp; cur_ctx, RecvStateContext* next_ctx)  

    //AGENT_LOG_ERROR("Network error.\n");  

    set_next_state(rs_header); // 无条件切换到rs_header,这个时候应当断开连接重连接  

    return util::handle_error;  

    本文转自eyjian 51CTO博客,原文链接:http://blog.51cto.com/mooon/910302,如需转载请自行联系原作者

继续阅读