天天看點

Glusterfs之rpc子產品源碼分析(中)之Glusterfs的rpc子產品實作(2)

第二節、rpc用戶端實作原理及代碼分析

rpc用戶端主要發起一個rpc請求,執行完rpc請求以後就退出rpc,下面分析用戶端rpc請求建立的整個過程。Rpc用戶端請求建立的第一步是執行cli_rpc_init函數,主要實作代碼如下:

        this = THIS;//取得本線程的xlator清單

        cli_rpc_prog = &cli_prog;//設定rpc調用過程集合(許多函數)

        options = dict_new ();//建立一個字典資料結構用于存放選項資訊

        ret = dict_set_str (options, “remote-host”, state->remote_host);//設定host

        if (state->remote_port)

                port = state->remote_port;

        ret = dict_set_int32 (options, “remote-port”, port);//設定端口号

        ret = dict_set_str (options, “transport.address-family”, “inet”);//設定協定族為inet

        rpc = rpc_clnt_new (options, this->ctx, this->name);//建立一個rpc用戶端對象

        ret = rpc_clnt_register_notify (rpc, cli_rpc_notify, this);//注冊rpc請求通知函數

        rpc_clnt_start (rpc);//開始rpc

這段代碼其實是glusterfs用戶端程式啟動時建立rpc請求的初始化過程函數,真正獨立開始建立一個rpc請求的過程是從函數rpc_clnt_new開始的,下面就分析這個函數的功能,先看主要代碼:

        rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t);//為用戶端rpc對象配置設定記憶體

        pthread_mutex_init (&rpc->lock, NULL);//初始化鎖

        rpc->ctx = ctx;//屬于哪一個ctx

//建立請求記憶體池

        rpc->reqpool = mem_pool_new (struct rpc_req, RPC_CLNT_DEFAULT_REQUEST_COUNT);

        //儲存幀資料的記憶體池

rpc->saved_frames_pool = mem_pool_new (struct saved_frame,

                                              RPC_CLNT_DEFAULT_REQUEST_COUNT);

        ret = rpc_clnt_connection_init (rpc, ctx, options, name);//初始化rpc請求連接配接

        rpc = rpc_clnt_ref (rpc);//用戶端對象引用計數加1

        INIT_LIST_HEAD (&rpc->programs);//初始化程式集的連結清單

rpc用戶端發送請求也需要裝載相應的協定庫,它主要使用協定庫裡面的初始化函數和連結函數,上面代碼中rpc_clnt_connection_init函數主要完成協定庫裝載功能和連結選項的一些初始化功能,具體實作的主要代碼如下:

  pthread_mutex_init (&clnt->conn.lock, NULL);//初始化鎖

conn->trans = rpc_transport_load (ctx, options, name);//裝載協定庫并執行初始化init

        rpc_transport_ref (conn->trans);//增加傳輸層的引用計數

        conn->rpc_clnt = clnt;//連接配接對象屬于哪一個用戶端對象

        ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify, conn);//注冊通知函數

conn->saved_frames = saved_frames_new ();//建立一個儲存幀資料的對象

通過上面代碼執行以後基本的初始化工作已經完畢,下一步就是建立于rpc服務的連結,此功能在函數rpc_clnt_start中實作,也就是在所有初始化工作完成以後調用此函數發起連結,主要代碼如下:

rpc_clnt_reconnect (conn->trans);

繼續跟蹤函數rpc_clnt_reconnect:

        pthread_mutex_lock (&conn->lock);//初始化鎖

        {

                if (conn->reconnect)//如果重新連結正在進行

                        gf_timer_call_cancel (clnt->ctx, conn->reconnect);//取消正在的連結

                conn->reconnect = 0;//初始化為0

                if (conn->connected == 0) {//還沒有完成連結

                        tv.tv_sec = 3;//時間三秒

//發起傳輸層的連結

                        ret = rpc_transport_connect (trans, conn->config.remote_port);

//設定重連結對象

                        conn->reconnect = gf_timer_call_after (clnt->ctx, tv,

                                                     rpc_clnt_reconnect, trans);

                } 

        }

        pthread_mutex_unlock (&conn->lock);//解鎖

        if ((ret == -1) && (errno != EINPROGRESS) && (clnt->notifyfn)) {

//建立連結失敗就通知rpc用戶端對象(調用通知函數)

                clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL);

        }

真正的連結是在具體的協定中的連結函數中執行,下面以tcp為例,看看它的連結函數的實作,主要代碼如下:

//得到用戶端協定族相關資訊

        ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr), &sockaddr_len, &sa_family)

        if (port > 0)

                ((struct sockaddr_in *) (&sockaddr))->sin_port = htons (port);//端口位元組序轉換

        pthread_mutex_lock (&priv->lock);

        {

                memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len);//指派sockaddr資訊

                this->peerinfo.sockaddr_len = sockaddr_len;//位址長度儲存

                priv->sock = socket (sa_family, SOCK_STREAM, 0);//建立socket

                setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, &priv->windowsize,

                                sizeof (priv->windowsize)) < 0) ;//設定接收的系統緩沖區

                setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, &priv->windowsize,

                                sizeof (priv->windowsize)) < 0);//發送緩沖區設定

                if (priv->nodelay) {

                        ret = __socket_nodelay (priv->sock);//設定是否延遲發送(等待一個完整的)

                 }

                if (!priv->bio) {

                        ret = __socket_nonblock (priv->sock);//設定非阻塞

                }

                if (priv->keepalive) {

                        ret = __socket_keepalive (priv->sock, priv->keepaliveintvl,

                                                  priv->keepaliveidle);//儲存連結活動

                }

                SA (&this->myinfo.sockaddr)->sa_family =

                        SA (&this->peerinfo.sockaddr)->sa_family;//儲存協定族資訊

                ret = client_bind (this, SA (&this->myinfo.sockaddr),//根據協定族做适當的綁定

                                   &this->myinfo.sockaddr_len, priv->sock);

                ret = connect (priv->sock, SA (&this->peerinfo.sockaddr),//發起連結請求

                               this->peerinfo.sockaddr_len);

priv->connected = 0;

                rpc_transport_ref (this);//傳輸對象引用加1

                priv->idx = event_register (ctx->event_pool, priv->sock,//注冊epoll讀寫事件

                                            socket_event_handler, this, 1, 1);

        }

        pthread_mutex_unlock (&priv->lock);//解鎖

整個連結過程的建立需要注意一點的就是會根據協定族的類型做适當的綁定,當發起連結以後就開始注冊各種讀寫事件。處理這些事件發生的函數是socket_event_handler,主要代碼如下:

  THIS = this->xl;//取得xlator

        priv = this->private;//取得私有資料

        pthread_mutex_lock (&priv->lock);//加鎖

        {

                priv->idx = idx;//取得索引下表(對應的socket)

        }

        pthread_mutex_unlock (&priv->lock);

        if (!priv->connected) {//如果連結還沒有建立先完成連結的建立

                ret = socket_connect_finish (this);//完成連結建立

        }

        if (!ret && poll_out) {//可寫事件處理

                ret = socket_event_poll_out (this);

        }

        if (!ret && poll_in) {//可讀事件處理

                ret = socket_event_poll_in (this);

        }

到此用戶端rpc請求過程完全建立,當真正的發送一個rpc請求的時候就會響應相應的epoll的寫事件,把包裝好的資料幀發送到rpc伺服器端,rpc用戶端也會通過可讀事件來接收rpc伺服器端的響應資訊。

總結:同rpc伺服器端一眼,rpc用戶端的建立也是比較複雜的過程,還是通過流程圖加描述來展示一下整個rpc用戶端的初始化過程,圖如下:

Glusterfs之rpc子產品源碼分析(中)之Glusterfs的rpc子產品實作(2)