天天看點

RPC架構的異步處理

RPC異步調用(以tars rpc架構為示例說明)

引入工作線程池和io收發線程池将工作線程和io收發線程兩者的同步關系解除。RPC中的上下文十分重要,因為請求包的發送,響應包的callback回調不在同一個工作線程中完成,需要一個context來記錄一個請求的上下文,把請求-響應-回調等一些資訊比對起來。通過rpc架構的内部請求id作為key,來儲存調用開始時間time,逾時時間timeout,回調函數callback,逾時回調timeout_callback等資訊。注意:請求id由client端服務調用時生成,會序列化成位元組流發送給server端,server端會傳回該請求id。

RPC架構的異步處理

servantProxy和Objproxyd的關系

ObjectProxy:一個網絡線程上的某個服務實體A;ServantProxy:RPC服務句柄,所有網絡線程上的某ObjectProxy(服務實體)的總代理; ObjectProxy類是一個服務實體,注意與ServantProxy類是一個服務代理相差別,前者表示一個網絡線程上的某個服務實體A,後者表示對所有網絡線程上的某服務實體A的總代理。

ServantProxy内含多個服務實體ObjectProxy,能夠幫助使用者在同一個服務代理内進行負載均衡。ObjectProxy對象的個數,其個數由用戶端的網絡線程數決定,每個網絡線程有一個ObjectProxy。

舉例有一個Demo.StringServer.StringServantObj的服務,提供一個RPC接口是append,傳入兩個string類型的變量,傳回兩個string類型變量的拼接結果。而且假設有兩台伺服器,socket辨別分别是192.112.112.112:112與192.112.112.113:113,設定用戶端的網絡線程數為3,那麼執行如下代碼:

Communicator _comm;
StringServantPrx _proxy;
_comm.stringToProxy("[email protected] -h 192.112.112.113 -p 113:tcp -h 192.112.112.112 -p 112", _proxy);
           
RPC架構的異步處理
//communicatorepoll用戶端
class CommunicatorEpoll : public TC_Thread ,public TC_ThreadRecMutex{
	 /* * ObjectProxy的工廠類 */
    ObjectProxyFactory *   _objectProxyFactory; //用于建立objproxy
    /* * 異步線程數組*/
    AsyncProcThread *      _asyncThread[MAX_CLIENT_ASYNCTHREAD_NUM];

    /* * 異步線程數目  */
    size_t                 _asyncThreadNum;
	/* * 分發給異步線程的索引seq*/
    size_t                 _asyncSeq;
    /** 網絡線程的id号*/
    size_t                 _netThreadSeq;
};
ObjectProxy * CommunicatorEpoll::getObjectProxy(const string & sObjectProxyName,const string& setName)
{
    return _objectProxyFactory->getObjectProxy(sObjectProxyName,setName);
}
---
class ServantProxy : public TC_HandleBase, public TC_ThreadMutex
{
	/** * 通信器 */
    Communicator *            _communicator;
    /** * 儲存ObjectProxy對象的指針數組 */
    ObjectProxy **            _objectProxy;
}


//servant初始化
ServantPrx::element_type* ServantProxyFactory::getServantProxy(const string& name,const string& setName)
{
    TC_LockT<TC_ThreadRecMutex> lock(*this);
    string tmpObjName = name + ":" + setName;
    map<string, ServantPrx>::iterator it = _servantProxy.find(tmpObjName);
    if(it != _servantProxy.end())
    {
        return it->second.get();
    }
    ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()];
    assert(ppObjectProxy != NULL);
    for(size_t i = 0; i < _comm->getClientThreadNum(); ++i)
    {
        ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
    }
    ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());
	...
    _servantProxy[tmpObjName] = sp;
    return sp.get();
}
           
RPC架構的異步處理

1、在每一個網絡線程CommunicatorEpoll的初始化過程中,會建立_asyncThreadNum條異步線程,等待異步調用的時候處理響應資料。

CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
{
 ......
   //異步線程數
    _asyncThreadNum = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncthread", "3"));

    if(_asyncThreadNum == 0)
    {
        _asyncThreadNum = 3;
    }

    if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM)
    {
        _asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
    }
 ......
    //異步隊列的大小
    size_t iAsyncQueueCap = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncqueuecap", "10000"));
    if(iAsyncQueueCap < 10000)
    {
        iAsyncQueueCap = 10000;
    }
 ......
    //建立異步線程
    for(size_t i = 0; i < _asyncThreadNum; ++i)
    {
        _asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
        _asyncThread[i]->start();
    }
 ......
}
           

2、tars2cpp的檔案中定義了回調函數基類,要繼承回調函數基類實作自己的回調函數。

class ConfigAdminPrxCallback: public tars::ServantProxyCallback
{
    public:
        virtual ~ConfigAdminPrxCallback(){}
        virtual void callback_AddConfig(tars::Int32 ret,  const std::string& result)
        { throw std::runtime_error("callback_AddConfig() override incorrect."); }
        virtual void callback_AddConfig_exception(tars::Int32 ret)
        { throw std::runtime_error("callback_AddConfig_exception() override incorrect."); }
 }
           

3、用戶端樁函數代理調用異步請求函數,并傳入實作的回調函數的類指針。

class ConfigAdminProxy : public tars::ServantProxy
    {
    public:
        //同步調用
        tars::Int32 AddConfig(const tars::AddConfigInfo & config,std::string &result,const map<string, string> &context = TARS_CONTEXT(),map<string, string> * pResponseContext = NULL)
        {}
		//異步調用
        void async_AddConfig(ConfigAdminPrxCallbackPtr callback,const tars::AddConfigInfo &config,const map<string, string>& context = TARS_CONTEXT())
        {}
};
           

4、用戶端調用。 ServantProxy::invoke中進行負載均衡調用,并且執行個體化此次請求msg包,執行個體化該請求的objproxy。CommunicatorEpoll會中的

CommunicatorEpoll::handle

會調用msg->obj->invoke進行請求的發送。每個用戶端線程中含有跟用戶端網絡線程通信的隊列

ReqInfoQueue * _reqQueue[MAX_CLIENT_THREAD_NUM]; //隊列數組

。請求包在invoke中push到相應的隊列中,comunicatorEpoll解包出msg請求體進行資料傳輸。

_reqNo:每個主調線程會被配置設定一個唯一的空閑序列号,這個序列号對應到網絡線程的epoll事件資料通知fd。SeqManager類負責配置設定該序列号。

void ServantProxy::invoke(ReqMessage * msg, bool bCoroAsync)
{
   ...
    ObjectProxy * pObjProxy = NULL;
    ReqInfoQueue * pReqQ    = NULL;
    //選擇網絡線程
    selectNetThreadInfo(pSptd,pObjProxy,pReqQ);
    //調用發起時間
    msg->iBeginTime   = TNOWMS;
    msg->pObjectProxy = pObjProxy;//執行個體化msg請求包中的obj執行個體
  ...
    //通知網絡線程
    bool bEmpty = false;
    bool bSync  = (msg->eType == ReqMessage::SYNC_CALL);
    if(!pReqQ->push_back(msg,bEmpty))
    {
    ...
        pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

        throw TarsClientQueueException("client queue full");
    }
    pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
 ...
}
           

5、用戶端負載均衡: 每個用戶端調用線程跟用戶端的網絡線程池通信的隊列 ,在業務用戶端根據負載均衡選擇網絡線程池中的網絡線程進行傳輸資料。每條caller線程與每條用戶端網絡線程CommunicatorEpoll進行資訊互動的橋梁——通信隊列ReqInfoQueue數組,數組中的每個ReqInfoQueue元素負責與一條網絡線程進行互動。數組裡面的ReqInfoQueue元素便是該數組對應的caller線程與兩條網絡線程的通信橋梁,一條網絡線程對應着數組裡面的一個元素,通過網絡線程ID進行數組索引。整個關系有點像生産者消費者模型,生産者Caller線程向自己的線程私有資料ReqInfoQueue[]中的第N個元素ReqInfoQueue[N] push請求包,消費者用戶端第N個網絡線程就會從這個隊列中pop請求包。

RPC架構的異步處理

(1)第一層負載均衡:輪詢選擇ObjectProxy(CommunicatorEpoll)和與之相對應的ReqInfoQueue

ServantProxy::invoke中進行負載均衡調用,并且執行個體化此次請求msg包,執行個體化該請求的objproxy。CommunicatorEpoll中的

CommunicatorEpoll::handle

會調用obj->invoke進行請求的發送。

//第一層:選取一個網絡線程對應的資訊
void ServantProxy::selectNetThreadInfo(ServantProxyThreadData * pSptd, ObjectProxy * & pObjProxy, ReqInfoQueue * & pReqQ)
{
    //指針為空 就new一個
    if(!pSptd->_queueInit)
    {
        for(size_t i=0;i<_objectProxyNum;++i)
        {
            pSptd->_reqQueue[i] = new ReqInfoQueue(_queueSize);
        }
        pSptd->_objectProxyNum = _objectProxyNum;
        pSptd->_objectProxy    = _objectProxy;
        pSptd->_queueInit      = true;
    }
    if(_objectProxyNum == 1)
    {
        pObjProxy = *_objectProxy;
        pReqQ     = pSptd->_reqQueue[0];
    }
    else
    {
        if(pSptd->_netThreadSeq >= 0)
        {
            //網絡線程發起的請求
            assert(pSptd->_netThreadSeq < static_cast<int>(_objectProxyNum));
            pObjProxy = *(_objectProxy + pSptd->_netThreadSeq);
            pReqQ     = pSptd->_reqQueue[pSptd->_netThreadSeq];
        }
        else
        {
            //用線程的私有資料來儲存選到的seq
            pObjProxy = *(_objectProxy + pSptd->_netSeq);//選取obj
            pReqQ     = pSptd->_reqQueue[pSptd->_netSeq];
            pSptd->_netSeq++;

            if(pSptd->_netSeq == _objectProxyNum)
                pSptd->_netSeq = 0;
        }
    }
}
           

(2)第二層負載均衡: 通過EndpointManager選擇AdapterProxy,負載均衡算法(Hash、權重、輪詢)

objproxy中的invoke調用selectAdapterProxy。

bool EndpointManager::selectAdapterProxy(ReqMessage * msg,AdapterProxy * & pAdapterProxy)
{
    pAdapterProxy = NULL;
    //重新整理主要
    refreshReg(E_DEFAULT,"");

    //無效的資料 傳回true
    if(!_valid)
    {
        return true;
    }

    //如果有hash,則先使用hash政策
    if (msg->bHash)
    {
        pAdapterProxy = getHashProxy(msg->iHashCode, msg->bConHash);

        return false;
    }
    
    if(_weightType == E_STATIC_WEIGHT)
    {
        //權重模式
        bool bStaticWeighted = false;
        if(_weightType == E_STATIC_WEIGHT || msg->eType == ReqMessage::ONE_WAY)
            bStaticWeighted = true;

        pAdapterProxy = getWeightedProxy(bStaticWeighted);
    }
    else
    {
        //普通輪詢模式
        pAdapterProxy = getNextValidProxy();
    }

    return false;
}
           

6、同步調用的話,建立完條件變量,通知網絡線程進行請求包的發送。

//通知網絡線程
    bool bEmpty = false;
    bool bSync  = (msg->eType == ReqMessage::SYNC_CALL);

    if(!pReqQ->push_back(msg,bEmpty))
    {
        TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);

        delete msg;
        msg = NULL;

        pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

        throw TarsClientQueueException("client queue full");
    }

    pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
           

7、資料在用戶端epoll中監聽發送。 在CommunicatorEpoll::notify()中,caller線程往請求事件通知數組NotifyInfo _notify[]中添加請求事件,通知CommunicatorEpoll進行請求包的發送。注意了,這個函數的作用僅僅是通知網絡線程準備發送資料,通過TC_Epoller::mod()或者TC_Epoller::add()觸發一個EPOLLIN事件,進而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的網絡線程CommunicatorEpoll被喚醒,并設定喚醒後的epoll_event中的聯合體epoll_data變量為&_notify[iSeq].stFDInfo。

_iSeq為每個主調線程會被配置設定一個唯一的序列号,這個序列号對應到網絡線程的epoll事件資料清單。SeqManager類負責配置設定該序列号。

void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{
    assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM);

    if(_notify[iSeq].bValid)
    {
        _ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
        assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
    }
    else
    {
        _notify[iSeq].stFDInfo.iType   = FDInfo::ET_C_NOTIFY;
        _notify[iSeq].stFDInfo.p       = (void*)msgQueue;
        _notify[iSeq].stFDInfo.fd      = _notify[iSeq].eventFd;
        _notify[iSeq].stFDInfo.iSeq    = iSeq;
        _notify[iSeq].notify.createSocket();
        _notify[iSeq].bValid           = true;
		//主調線程占用第iSeq個通知事件fd, 用于向communicatorEpoll注冊發送事件。
        _ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
    }
}
           

8、adapterProxy發送資料

Client發起請求時,如果發送緩沖區沒有資料,就直接從連接配接發送出去,如果發送緩沖區有資料,則将發送請求先放入逾時隊列,網絡線程從逾時隊列拉取請求進行發送。每個AdapterProxy有一個 逾時隊列 _timeoutQueue,存儲了原始的msg結構體資訊(含同步異步調用等資訊)。**

//交給連接配接發送資料,連接配接連上,buffer不為空,直接發送資料成功
    if(_timeoutQueue->sendListEmpty() && _trans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size()) != Transceiver::eRetError)
    {
        //請求發送成功了,單向調用直接傳回
        if(msg->eType == ReqMessage::ONE_WAY)
        {
       ...
            return 0;
        }

        bool bFlag = _timeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout + msg->iBeginTime);
      ...
    }
    else
    {
 //請求發送失敗了,放入逾時隊列等待重發
        bool bFlag = _timeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout+msg->iBeginTime, false);
       ...
        }
    }
           

adapterproxy調用transceiver向sever發送資料,并在communicatorEpoll中注冊用戶端socket,監聽可讀可寫事件。

void Transceiver::connect()
{
    if(isValid())  {  return; }
    if(_connStatus == eConnecting || _connStatus == eConnected) {return;}
    int fd = -1;
    if (_ep.type() == EndpointInfo::UDP)
    {
        fd = NetworkUtil::createSocket(true, false, _ep.isIPv6());
        NetworkUtil::setBlock(fd, false);
        _connStatus = eConnected;
    }
    else
    {
        fd = NetworkUtil::createSocket(false, false, _ep.isIPv6());
        NetworkUtil::setBlock(fd, false);

        socklen_t len = _ep.isIPv6() ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in);
        bool bConnected = NetworkUtil::doConnect(fd, _ep.addrPtr(), len);
        if(bConnected)
        {
            setConnected();
        }
        else
        {
            _connStatus     = Transceiver::eConnecting;
            _conTimeoutTime = TNOWMS + _adapterProxy->getConTimeout();
        }
    }
    _fd = fd;
...
    //設定套接口選項
    vector<SocketOpt> &socketOpts = _adapterProxy->getObjProxy()->getSocketOpt();
    for(size_t i=0; i<socketOpts.size(); ++i)
    {
        if(setsockopt(_fd,socketOpts[i].level,socketOpts[i].optname,socketOpts[i].optval,socketOpts[i].optlen) == -1)
        {
            ...
    }

    _adapterProxy->getObjProxy()->getCommunicatorEpoll()->addFd(fd, &_fdInfo, EPOLLIN|EPOLLOUT);
}
           

9、發送和響應的資料包中含有請求ID字段,iRequestId是一個自增的id,用來關聯請求和響應包,另外ReqMessage結構體中,有調用的類型字段用于區分異步同步等。

struct ReqMessage : public TC_HandleBase
{
    //調用類型
    enum CallType
    {
        SYNC_CALL = 1, //同步
        ASYNC_CALL,    //異步
        ONE_WAY,       //單向
        THREAD_EXIT    //線程退出的辨別
    };
}



//請求包體
    struct RequestPacket
    {
        1  require short        iVersion;		//版本号
        2  require byte         cPacketType;	//包類型
        3  require int          iMessageType;//消息類型
        4  require int          iRequestId;	//請求ID
        5  require string       sServantName;	//servant名字
        6  require string       sFuncName;	//函數名稱
        7  require vector<byte> sBuffer;		//二進制buffer
        8  require int          iTimeout;		//逾時時間(毫秒)
        9  require map<string, string> context;	//業務上下文
        10 require map<string, string> status; 	//架構協定上下文
    };

    struct ResponsePacket
    {
        1 require short         iVersion;		//版本号
        2 require byte          cPacketType;	//包類型
        3 require int           iRequestId;		//請求ID
        4 require int           iMessageType;	//消息類型
        5 require int           iRet;			//傳回值
        6 require vector<byte>  sBuffer;		//二進制流
        7 require map<string, string> status; 	//協定上下文
        8 optional string        sResultDesc;   //描述
        9 optional map<string, string> context;   //業務上下文
    };
    
           

10、用戶端接收服務端發來的資訊包,并根據響應包的請求ID(iRequestId),從逾時隊列中取出原發送的msg結構體,查驗調用類型等。

void AdapterProxy::finishInvoke(ResponsePacket & rsp)
{
    ReqMessage * msg = NULL;

    //requestid 為0 是push消息
    if(rsp.iRequestId == 0)
    {
        msg               = new ReqMessage();
        msg->eStatus      = ReqMessage::REQ_RSP;
        msg->eType        = ReqMessage::ASYNC_CALL;
        msg->bFromRpc     = true;
        msg->bPush        = true;
        msg->proxy        = _objectProxy->getServantProxy();
        msg->pObjectProxy = _objectProxy;
        msg->adapter      = this;
        msg->callback     = _objectProxy->getPushCallback();
    }
    else
    {
        //這裡的隊列中的發送連結清單中的資料可能已經在timeout的時候删除了
        bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);

        //找不到此請求id資訊
        if (!retErase)
        {
            if(_timeoutLogFlag)
            {
                TLOGERROR("[TARS][AdapterProxy::finishInvoke(ResponsePacket) objname:"<< _objectProxy->name() << ",get req-ptr NULL,may be timeout,id:" << rsp.iRequestId 
                    << ",desc:" << _endpoint.desc() << endl);
            }
            return ;
        }

        assert(msg->eStatus == ReqMessage::REQ_REQ);

        msg->eStatus = ReqMessage::REQ_RSP;
    }

    msg->response = rsp;//完善msg的響應包

    finishInvoke(msg);
}
           

11、在函數

AdapterProxy::finishInvoke(ReqMessage)

中,程式在

CommunicatorEpoll::pushAsyncThreadQueue()

中,通過輪詢的方式選擇異步回調處理線程處理接收到的響應包,每個異步處理線程有一個任務處理隊列,通過以下代碼将資訊包msg(帶響應資訊)放到異步回調處理線程中的隊列中。異步處理線程數預設是3,最大是1024。

//異步回調,放入回調處理線程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);

void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
{
    //輪詢的方式選擇異步處理線程。
    _asyncThread[_asyncSeq]->push_back(msg);
    _asyncSeq ++;

    if(_asyncSeq == _asyncThreadNum)
    {
        _asyncSeq = 0;
    }
}
           

12、選取之後,通過

AsyncProcThread::push_back()

,将msg包放在響應包隊列

AsyncProcThread::_msgQueue

中,然後通過

AsyncProcThread:: notify()

函數通知本異步回調處理線程進行處理,

AsyncProcThread:: notify()

函數可以令阻塞在

AsyncProcThread:: run()中的AsyncProcThread::timedWait()

的異步處理線程被喚醒。在

AsyncProcThread::run()

中,主要執行下面的程式進行函數回調:

if (_msgQueue->pop_front(msg))
{
 ......

    try
    {
        ReqMessagePtr msgPtr = msg;
        msg->callback->onDispatch(msgPtr);
    }
    catch (exception& e)
    {
        TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl);
    }
    catch (...)
    {
        TLOGERROR("[TARS][AsyncProcThread exception.]" << endl);
    }
}
           

13、通過msg->callback,程式可以調用回調函數基類

ServantPrxCallback裡面的onDispatch(

)函數。在

ServantPrxCallback:: onDispatch()

中,分析此次響應所對應的RPC方法名,擷取響應結果,并通過動态多态,執行使用者所定義好的派生類的虛函數。通過ReqMessagePtr的引用計數,還可以将

ReqNessage msg

删除掉,與同步調用不同,同步調用的msg的建立與删除都在caller線程中,而異步調用的msg在caller線程上構造,在異步回調處理線程中析構。

virtual int onDispatch(tars::ReqMessagePtr msg)
        {
            static ::std::string __ConfigAdmin_all[]=
            {
                "AddConfig",
                 ...
            };
            pair<string*, string*> r = equal_range(__ConfigAdmin_all, __ConfigAdmin_all+14, string(msg->request.sFuncName));
            if(r.first == r.second) return tars::TARSSERVERNOFUNCERR;
            switch(r.first - __ConfigAdmin_all)
            {
                case 0:
                {
                    if (msg->response.iRet != tars::TARSSERVERSUCCESS)
                    {
                        callback_AddConfig_exception(msg->response.iRet);

                        return msg->response.iRet;
                    }
                    tars::TarsInputStream<tars::BufferReader> _is;

                    _is.setBuffer(msg->response.sBuffer);
                    tars::Int32 _ret;
                    _is.read(_ret, 0, true);

                    std::string result;
                    _is.read(result, 2, true);
                    CallbackThreadData * pCbtd = CallbackThreadData::getData();
                    assert(pCbtd != NULL);

                    pCbtd->setResponseContext(msg->response.context);

                    callback_AddConfig(_ret, result);

                    pCbtd->delResponseContext();

                    return tars::TARSSERVERSUCCESS;

        }
  }
           

二、異步并發模式 promise和future

Future & Promise 模式,屬于一種實作異步調用的并發模式。 普通的異步調用是基于回調的異步,當服務規模和業務邏輯比較簡單的時候,基于回調的異步調用能簡單就搞定了,但是随着業務服務的規模擴大,業務邏輯慢慢變得複雜,一些需要進行多次異步串行調用和異步并行調用的業務需求慢慢顯現出來,雖然基于回調的異步調用能夠滿足這些需求,但是使得異步調用的邏輯流程邏輯分散在不同地方,這點造成了對業務開發提出了很大的挑戰,編碼代碼十分不便,代碼邏輯難于了解和維護。代碼采用同步模式編寫異步的業務邏輯。

Future表示一個調用結果,而這個結果可能不會立即給出,代表了未來某個時刻會得到結果,而這個結果是由Promise來保證的,可以通過get()得到這個結果,其成員函數then,表示注冊一個對get()得到的結果進行處理的函數。Promise表示給調用其成員函數getFuture()的Future的一個承諾,通過setValue設定承諾的結果。

異步串行化:

//服務對外接口,串行調用
taf::Int32 
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
    //設定異步回包
    current->setResponse(false);

    // 向服務B發送異步請求,傳回值的類型是
    // promise::Future<std::string>,
    // 意思就是服務B未來會傳回一個string類型的資料
    promise::Future<std::string> f = sendBReq(_pPrxB, sIn, current);

    // f調用其成員函數then,給未來要到達的string類型的
    // 傳回結果設定一個處理函數
    // 在handleBRspAndSendCReq中擷取傳回結果,
    // 并return sendCReq(),即f2,然後f2通過鍊式法則調用then
    f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
    .then(promise::bind(&handleCRspAndReturnClient, current));

    return 0;
}
promise::Future<std::string> sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)

{

//定義一個promise::Promise<std::string>類型的變量promise,

//其目的是承諾會在promise裡面存放一個string類型的資料,

//然後把這個變量傳到BServantCallback對象中,

//然後發起異步調用

//最後傳回promise.getFuture(),意思是promise承諾的string類型資料

//可以通過promise::Future<std::string>類型的promise.getFuture()來獲得

       promise::Promise<std::string> promise;

 

//其實這個的current可以不用傳遞給BServantCallback,當然傳也沒有影響

    Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);

 

       prx->async_queryResult(cb, sIn);

 

    return promise.getFuture();

}
           

異步并行化:

taf::Int32 AServantImp::queryResultParallel(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)

{
       current->setResponse(false);
       promise::Future<std::string> f1 = sendBReq(_pPrxB, sIn, current);
       promise::Future<std::string> f2 = sendCReq(_pPrxC, sIn, current); 

       promise::Future<promise::Tuple<promise::Future<std::string>, promise::Future<std::string> > > f_all = promise::whenAll(f1, f2);

       f_all.then(promise::bind(&handleBCRspAndReturnClient, current));
       return 0;

}