天天看点

teamtalk源码学习笔记——客户端(3)——逻辑队列和http队列、ui代理窗口

前面讲过,客户端主要框架是两个任务队列,四个线程,分别为逻辑任务队列(处理业务逻辑任务)、http任务队列(处理http有关的业务),每个队列有一个对应的线程,http队列有一个线程池,默认线程池线程个数为1,还有一个网络IO线程负责处理IO数据的读写操作,主线程做界面消息循环,用来与用户交互,之前分析过Io线程是进行数据下面就分析下这些队列和逻辑

一、逻辑队列实现方式

客户端概述的那一章讲过,逻辑队列的启动方式,OperationManager::startup()启动逻辑队列处理线程,进行while循环处理std::list

IMCoreErrorCode OperationManager::startup()
{
    m_operationThread = std::thread([&]
    {
        std::unique_lock <std::mutex> lck(m_cvMutex);
        Operation* pOperation = nullptr;
        while (m_bContinue)
        {
            if (!m_bContinue)
                break;
            if (m_vecRealtimeOperations.empty())
                m_CV.wait(lck);//没有任务,线程进行条件等待
            if (!m_bContinue)
                break;
            {
                std::lock_guard<std::mutex> lock(m_mutexOperation);
                if (m_vecRealtimeOperations.empty())
                    continue;
                pOperation = m_vecRealtimeOperations.front();
                m_vecRealtimeOperations.pop_front();
            }
            if (!m_bContinue)
                break;
            if (pOperation)
            {
                pOperation->process();//有任务,任务出队,调用任务的process函数处理
                pOperation->release();//处理完即自己释放掉
            }
        }
    });
    return IMCORE_OK;
}

void Operation::process()
{
    try
    {
        m_state = OPERATION_RUNNING;
        processOpertion();这是一个纯虚函数,该类是抽象类,不同任务类继承该类,并重写该函数,处理不同的逻辑任务
    }
    catch (Exception& exc)
    {
        assert(false);
        LOG__(ERR, _T("process exception,reason:%s"),exc.what());
    }
    catch (std::exception& exc)
    {
        assert(false);
        LOG__(ERR, _T("process exception,reason:%s"), exc.what());
    }
    catch (...)
    {
        assert(false);
        LOG__(ERR, _T("process unknown exception"));
    }
    m_state = OPERATION_FINISHED;
}
           

OperationManager作为任务管理类,拥有这个逻辑队列,并可以对逻辑任务队列进行添加删除任务,添加任务有两种方式,普通插入添加startOperation,和使用lambda表达式添加startOperationWithLambda

二、http队列实现方式

这个http队列线程启动方式在之前的概述中也没有讲到,因为http队列线程是在点击登录界面中的登录按钮的时候才会进行启动

void LoginDialog::OnClick(TNotifyUI& msg)
{
    PTR_VOID(msg.pSender);
    if (msg.pSender == m_pBtnLogin)
    {
        _DoLogin();//如果是登录按钮点击,则进行登录操作
        return;
    }
    else if (msg.pSender == m_pBtnSysSetting)
    {
        module::getSysConfigModule()->showServerConfigDialog(m_PaintManager.GetPaintWindow());
        return;
    }
    __super::OnClick(msg);
}
           
void LoginDialog::_DoLogin()
{
    LOG__(APP,_T("User Clicked LoginBtn"));
    m_ptxtTip->SetText(_T(""));
    CDuiString userName = m_pedtUserName->GetText();
    CDuiString password = m_pedtPassword->GetText();
    if (userName.IsEmpty())
    {
        CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_USERNAME_EMPTY"));
        m_ptxtTip->SetText(csTip);
        return;
    }
    if (password.IsEmpty())
    {
        CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_PASSWORD_EMPTY"));
        m_ptxtTip->SetText(csTip);
        return;
    }
    module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
    pCfg->userName = userName;
    if (m_bPassChanged)
    {
        std::string sPass = util::cStringToString(CString(password));
        char* pOutData = ;
        uint32_t nOutLen = ;
        int retCode = EncryptPass(sPass.c_str(), sPass.length(), &pOutData, nOutLen);
        if (retCode ==  && nOutLen >  && pOutData != )
        {
            pCfg->password = std::string(pOutData, nOutLen);
            Free(pOutData);
        }
        else
        {
            LOG__(ERR, _T("EncryptPass Failed!"));
            CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_ENCRYPT_PASE_FAIL"));
            m_ptxtTip->SetText(csTip);            
            return;
        }
    }
    pCfg->isRememberPWD = m_pChkRememberPWD->GetCheck();
    module::getSysConfigModule()->saveData();
    CString csTxt = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_BTN_DOLOGIN"));
    m_pBtnLogin->SetText(csTxt);
    m_pBtnLogin->SetEnabled(false);
   //上面进行用户名密码获取等一系列登录准备操作

    //连接登陆服务器
    DoLoginServerParam param;
   //DoLoginServerHttpOperation这个继承于IHttpOperation,IHttpOperation又继承于ICallbackOpertaion,ICallbackOpertaion继承自Operation
    DoLoginServerHttpOperation* pOper = new DoLoginServerHttpOperation(
        BIND_CALLBACK_1(LoginDialog::OnHttpCallbackOperation), param);
    module::getHttpPoolModule()->pushHttpOperation(pOper);//如果这时没有http任务队列处理线程,会进行启动对应数量的线程
}
           

DoLoginServerHttpOperation继承自Operation,自然知道这个任务类可以重写了processOpertion(),暂时不看这个函数做了什么,去看下http任务处理线程怎么启动的

void HttpPoolModule_Impl::pushHttpOperation(module::IHttpOperation* pOperaion, BOOL bHighPriority /*= FALSE*/)
{
    if (NULL == pOperaion)
    {
        return;
    }
    CAutoLock lock(&m_mtxLock);
    if (bHighPriority)
        m_lstHttpOpers.push_front(pOperaion);
    else
        m_lstHttpOpers.push_back(pOperaion);
    _launchThread();//启动线程函数
    ::ReleaseSemaphore(m_hSemaphore, , NULL);
    return;
}

BOOL HttpPoolModule_Impl::_launchThread()
{
    if ((int)m_vecHttpThread.size() >= MAX_THEAD_COUNT)//每次入队任务的时候都会进行线程大小的检查,不够就进行增加,默认为1
    {
        return TRUE;
    }
    TTHttpThread* pThread = new TTHttpThread();
    PTR_FALSE(pThread);
    if (!pThread->create())//这里调用TTHttpThread的process函数,具体怎么启动的调试进入代码可以看到
    {
        return FALSE;
    }
    Sleep();
    m_vecHttpThread.push_back(pThread);
    return TRUE;
}

UInt32 TTHttpThread::process()
{
    module::IHttpOperation * pHttpOper = NULL;
    HttpPoolModule_Impl *pPool = m_pInstance;
    while (m_bContinue)
    {
        if (WAIT_OBJECT_0 != ::WaitForSingleObject(pPool->m_hSemaphore, INFINITE))
        {
            break;
        }
        if (!m_bContinue)
        {
            break;
        }
        {
            CAutoLock lock(&(pPool->m_mtxLock));
            if (pPool->m_lstHttpOpers.empty())
                pHttpOper = NULL;
            else
            {
                pHttpOper = pPool->m_lstHttpOpers.front();
                pPool->m_lstHttpOpers.pop_front();
            }
        }
        try
        {
            if (m_bContinue && pHttpOper)
            {
                pHttpOper->process();//进行具体任务的处理
                pHttpOper->release();
            }
        }
        catch (...)
        {
            LOG__(ERR, _T("TTHttpThread: Failed to execute opertaion(0x%p)"), pHttpOper);
        }
    }
    return ;
}
           

TTHttpThread::process()这里进行while循环处理http队列里的http任务

pHttpOper->process();这里最后会调用该操作类的processOpertion函数,回调之前那个

DoLoginServerHttpOperation* pOper = new DoLoginServerHttpOperation(BIND_CALLBACK_1(LoginDialog::OnHttpCallbackOperation), param);构造http任务类的地方,这里http任务入队后,最后会被http线程调用他的processOpertion函数

void DoLoginServerHttpOperation::processOpertion()
{
    module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
    LOG__(APP, _T("loginAddr = %s"), pCfg->loginServIP);
    std::string& loginAddr = util::cStringToString(pCfg->loginServIP);
    std::string url = loginAddr;

    DoLoginServerParam* pPamram = new DoLoginServerParam();
    pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_DEFERROR"));
    Http::HttpResponse    response;
    Http::HttpClient    client;
    Http::HttpRequest    request("get", url);
    if (!client.execute(&request, &response))
    {
        CString csTemp = util::stringToCString(url);
        pPamram->result = DOLOGIN_FAIL;
        LOG__(ERR,_T("failed %s"), csTemp);
        asyncCallback(std::shared_ptr<void>(pPamram));
        client.killSelf();
        return;
    }
    std::string body = response.getBody();
    client.killSelf();
    //json解析
    try
    {
        Json::Reader reader;
        Json::Value root;
        if (!reader.parse(body, root))
        {
            CString csTemp = util::stringToCString(body);
            LOG__(ERR, _T("parse data failed,%s"), csTemp);
            pPamram->result = DOLOGIN_FAIL;
            pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_JSONERROR"));
            goto End;
        }
        int nCode = root.get("code", "").asInt();
        if ( == nCode)//登陆成功
        {
            LOG__(APP, _T("get msgSvr IP succeed!"));
            pCfg->msgSevPriorIP = root.get("priorIP", "").asString();
            pCfg->msgSevBackupIP = root.get("backupIP", "").asString();
            std::string strPort = root.get("port", "").asString();
            pCfg->msgServPort = util::stringToInt32(strPort);
            pCfg->fileSysAddr = util::stringToCString(root.get("msfsPrior", "").asString());
            pCfg->fileSysBackUpAddr = util::stringToCString(root.get("msfsBackup", "").asString());
            pPamram->result = DOLOGIN_SUCC;
        }
        else
        {
            LOG__(ERR, _T("get msgSvr IP failed! Code = %d"),nCode);
            pPamram->result = DOLOGIN_FAIL;
            CString csRetMsgTemp = util::stringToCString(root.get("msg", "").asString());
            if (!csRetMsgTemp.IsEmpty())
                pPamram->resMsg = csRetMsgTemp;
        }
    }
    catch (...)
    {
        CString csTemp = util::stringToCString(body);
        LOG__(ERR,_T("parse json execption,%s"), csTemp);
        pPamram->result = DOLOGIN_FAIL;
        pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_JSONERROR"));
    }
End:
    asyncCallback(std::shared_ptr<void>(pPamram));//异步通知ui代理窗口线程调用回调函数
}
           

这里这个函数的的具体作用就是对登录服务器发送http请求,获取到具体的消息服务器的ip和端口,然后根据这个ip和端口进行真正的登录消息服务器进行通信,这里不过多的讨论业务方面的东西,那些东西可以自行摸索,有些业务我不是参与者我只是学习者,也没有完全掌握,这里只是作为一个代码学习,对它的框架结构的一个分析,这个笔记在看源码的时候可以进行参考,并不能直接帮你解读

DoLoginServerHttpOperation::processOpertion()中成功获取到所想要的服务器的地址后,通过asyncCallback(std::shared_ptr(pPamram));来进行一个异步通知,这个异步通知最后会在ui代理窗口的过程函数中被处理,即之前的构造函数中注册的回调函数,会在ui线程中进行调用

void asyncCallback(std::shared_ptr<void> param)
    {
        CallbackOperationEvent* pEvent = new CallbackOperationEvent(m_callback, param);
        module::getEventManager()->asynFireUIEvent(pEvent);
    }
           

看CallbackOperationEvent中的process方法

virtual void process()
    {
        m_callback(m_param);
    }
           

而上述中的module::getEventManager()->asynFireUIEvent(pEvent);会给ui代理窗口发送消息,这个消息在消息队列中被ui线程进行获取处理

module::IMCoreErrorCode UIEventManager::asynFireUIEvent(IN const IEvent* const pEvent)
{
    assert(m_hWnd);
    assert(pEvent);
    if ( == m_hWnd ||  == pEvent)
        return IMCORE_ARGUMENT_ERROR;
    if (FALSE == ::PostMessage(m_hWnd, UI_EVENT_MSG, reinterpret_cast<WPARAM>(this), reinterpret_cast<WPARAM>(pEvent)))
        return IMCORE_WORK_POSTMESSAGE_ERROR;
    return IMCORE_OK;
}
//客户端概述中讲过ui代理窗口处理任务的过程,这里就带过
LRESULT _stdcall UIEventManager::_WindowProc(HWND hWnd
                                            , UINT message
                                            , WPARAM wparam
                                            , LPARAM lparam)
{
    switch (message)
    {
    case UI_EVENT_MSG:
        reinterpret_cast<UIEventManager*>(wparam)->_processEvent(reinterpret_cast<IEvent*>(lparam), TRUE);
        break;
    case WM_TIMER:
        reinterpret_cast<UIEventManager*>(wparam)->_processTimer();
        break;
    default:
        break;
    }
    return ::DefWindowProc(hWnd, message, wparam, lparam);
}

void UIEventManager::_processEvent(IEvent* pEvent, BOOL bRelease)
{
    assert(pEvent);
    if ( == pEvent)
        return;
    try
    {
        pEvent->process();
        if (bRelease)
            pEvent->release();
    }
   ......
}
           

最终CallbackOperationEvent中的回调会在这个process中调用,而CallbackOperationEvent中的回调是之前构造的时候进行注册的OnHttpCallbackOperation

void LoginDialog::OnHttpCallbackOperation(std::shared_ptr<void> param)
{
    DoLoginServerParam* pParam = (DoLoginServerParam*)param.get();
    if (DOLOGIN_SUCC == pParam->result)
    {
        module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
        PTR_VOID(pCfg);
        LoginParam loginparam;
        loginparam.csUserName = pCfg->userName;
        loginparam.password = pCfg->password;
        loginparam.csUserName.Trim();
        //将操作任务进行入逻辑队列,等待逻辑任务线程处理
        LoginOperation* pOperation = new LoginOperation(
            BIND_CALLBACK_1(LoginDialog::OnOperationCallback), loginparam);
        imcore::IMLibCoreStartOperation(pOperation);
    }
    else
    {
        m_ptxtTip->SetText(pParam->resMsg);
        module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
        LOG__(ERR, _T("get MsgServer config faild,login server addres:%s:%d"), pCfg->loginServIP,pCfg->loginServPort);
        CString csTxt = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_BTN_LOGIN"));
        m_pBtnLogin->SetText(csTxt);
        m_pBtnLogin->SetEnabled(true);
    }
}
           

OnHttpCallbackOperation由ui代理窗口调用,然后注册的OnOperationCallback最后由任务逻辑队列进行调用,这里还是重复的或者业务逻辑的东西都进行跳过

这里总结下这个小结,这个小结是讲http任务队列的实现方式,主要是启动http任务线程进行循环处理队列中的http任务,而这个http任务可能是Ui线程投入的,有的是逻辑任务线程投入的,使用的任务类都是继承自ICallbackOpertaion,它会最后调用之前注册的回调函数,ui代理窗口线程处理任务的实现方式有一种跟这个类似,如CallbackOperationEvent继承自ICallbackOpertaion,然后ui代理窗口会调用之前注册的回调函数,只是和http队列不一样的是,入队是通过发送消息,队列是系统的消息队列,并没有额外启动线程进行循环处理,而是利用duilib的消息机制,另外ui窗口进行事件处理并不只是这一种异步通知的方式进行处理事件,下面接着看

三、ui代理窗口事件处理与模块通信

客户端将一些功能抽象成模块组件,如登录模块、组模块、会话模块等,有些模块是后台信息保存和处理的模块,逻辑任务线程进行对其操作,有些模块涉及界面的一些数据,ui代理线程对其操作,类似于mfc里的文档与视图,这里的后台模块与界面也会进行通信,但这个数据流通的方式比前面讲述的单独的asyncCallback异步通知要为复杂

下面是简单画的一个草图,用来理清类之间的关系

teamtalk源码学习笔记——客户端(3)——逻辑队列和http队列、ui代理窗口

模块基础类

1. IEvent : 事件基类

2. ITimerEvent : 定时器基类

3. IMKOEvent :观察事件基类

4. MKOEvent : 观察事件实现类,处理观察事件

5. ModuleObserverCtx : 观察者信息保存类,有回调函数

6. ModuleSubject : 通知者,通知所有观察者

7. CallbackOperationEvent : 继承于IEvent,封装回调函数的事件

8. ICallbackPertaion : 继承Operation , 封装对回调事件的操作

9. ModuleBase : 封装对ModuleSubject的操作

10. UIEventManager : UI事件管理类,负责事件的通知和事件处理

之前的CallbackOperationEvent和IMKOEvent都是继承于IEvent,并进行重写process方法,而各个后台模块都是ModuleBase的基类,每个模块都拥有ModuleSubject的指针,可以进行添加观察者,这里使用的是观察者模式,如后台数据的变化会导致多个界面或者多个地方会发生改变,此时ModuleBase类通过添加需要改变的类,然后进行通知,将MKOEvent_Impl事件给ui代理线程,ui线程将调用MKOEvent_Impl的process,在process中调用ModuleSubject中的每个ModuleObjectCtx中的回调函数,这样就完成了后台数据发生变化可以通知到多个地方的操作,上面讲的比较粗像,但这些只是作为笔记,用来类似于思路提示作用,看客户端代码时候许多业务都会涉及到这种通知方式,可以进行调试下即可

继续阅读