天天看點

《Ceph源碼分析》——第3章,第2節Simple實作

本節書摘來自華章出版社《ceph源碼分析》一書中的第3章,第3.2節simple實作,作者常濤,更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視

3.2 simple實作

simple在ceph裡實作比較早,目前也比較穩定,是在生産環境中使用的網絡通信子產品。如其名字所示,實作相對比較簡單。下面具體分析一下,simple如何實作ceph網絡通信架構的各個子產品。

3.2.1 simplemessager

類simplemessager實作了messager接口。

class simplemessenger : public simplepolicymessenger {

accepter accepter; //用于接受用戶端的連結請求

dispatchqueue dispatch_queue; //接收到的請求的消息分發隊列

bool did_bind; //是否綁定

` //位址→pipe映射

ceph::unordered_map rank_pipe;

//正在處理的pipes

set accepting_pipes;

//所有的pipes

set pipes;

//準備釋放的pipes

list pipe_reap_queue;`

//内部叢集的協定版本

` int cluster_protocol;

}`

3.2.2 accepter

類accepter用來在server端監聽端口,接收連結,它繼承了thread類,本身是一個線程,來不斷地監聽server的端口:

`class accepter : public thread {

simplemessenger *msgr;

bool done;

int listen_sd; //監聽的端口

uint64_t nonce;

……

}

`

3.2.3 dispatchqueue

dispatchqueue類用于把接收到的請求儲存在内部,通過其内部的線程,調用simplemessenger類注冊的dispatch類的處理函數來處理相應的消息:

其内部的mqueue為優先級隊列,用來儲存消息,marrival儲存了接收到的消息。marrival_map儲存消息在集合中的位置。

函數dispatchqueue::enqueue用來把接收到的消息添加到消息隊列中,函數dispatchqueue::entry為線程的處理函數,用于處理消息。

3.2.4 pipe

類pipe實作了pipeconnection的接口,它實作了兩個端口之間的類似管道的功能。

對于每一個pipe,内部都有一個reader和一個writer線程,分别用來處理這個pipe有關的消息接收和請求的發送。線程delayeddelivery用于故障注入測試:

3.2.5 消息的發送

1)當發送一個消息時,首先要通過messenger類,擷取對應的connection:

<code>conn = messenger-&gt;get_connection(dest_server);</code>

具體到simplemessenger的實作如下所示:

a)首先比較,如果dest.addr是my_inst.addr,就直接傳回local_connection。

b)調用函數_lookup_pipe在已經存在的pipe中查找。如果找到,就直接傳回pipeconnectionref;否則調用函數connect_rank新建立一個pipe,并加入到msgr的register_pipe裡。

2)當獲得一個connection之後,就可以調用connection的發送函數來發送消息。

<code>conn-&gt;send_message(m);</code>

其最終調用了simplemessenger::submit_message函數:

a)如果pipe不為空,并且狀态不是pipe::state_closed狀态,調用函數pipe→_send把發送的消息添加到out_q發送隊列裡,觸發發送線程。

b)如果pipe為空,就調用connect_rank建立pipe,并把消息添加到out_q發送隊列中。

3)發送線程writer把消息發送出去。通過步驟2,要發送的消息已經儲存在相應pipe的out_q隊列裡,并觸發了發送線程。每個pipe的writer線程負責發送out_q的消息,其線程入口函數為pipe::writer,實作功能:

a)調用函數_get_next_outgoing從out_q中擷取消息。

b)調用函數write_message(header, footer, blist)把消息的header、footer、資料blist發送出去。

3.2.6 消息的接收

1)每個pipe對應的線程reader用于接收消息。入口函數為pipe::reader,其功能如下:

a)判斷目前的state,如果為state_accepting,就調用函數pipe::accept來接收連接配接,如果不是state_closed,并且不是state_connecting狀态,就接收消息。

b)先調用函數tcp_read來接收一個tag。

c)根據tag,來接收不同類型的消息如下所示:

`ceph_msgr_tag_keepalive消息。

ceph_msgr_tag_keepalive2,在ceph_msgr_tag_keepalive的基礎上,添加了時間。

ceph_msgr_tag_keepalive2_ack。

ceph_msgr_tag_ack。

ceph_msgr_tag_msg,這裡才是接收的消息。

ceph_msgr_tag_close。`

d)調用函數read_message來接收消息,當本函數傳回後,就完成了接收消息。

2)調用函數in_q-&gt;fast_preprocess(m)預處理消息。

3)調用函數in_q-&gt;can_fast_dispatch(m),如果可以進行fast_dispatch,就in_q-&gt;fast_dispatch(m)處理。fast_dispatch并不把消息加入到mqueue裡,而是直接調用msgr-&gt;ms_fast_dispatch函數,并最終調用注冊的fast_dispatcher函數處理。

4)如果不能fast_dispatch,就調用函數in_q-&gt;enqueue(m, m-&gt;get_priority(), conn_id) 把接收到的消息加入到dispatchqueue的mqueue隊列裡,由dispatchqueue的分發線程調用ms_dispatch處理。

ms_fast_dispath和ms_dispatch兩種處理的差別在于:ms_dispatch是由dispatchqueue的線程處理的,它是一個單線程;ms_fast_dispatch函數是由pipe的接收線程直接調用處理的,是以性能比前者要好。

3.2.7 錯誤處理

網絡子產品複雜的功能是如何處理網絡錯誤。無論是接收還是發送,會出現各種異常錯誤,包括傳回異常錯誤碼,接收資料的magic驗證不一緻,接收的資料的效驗驗證不一緻,等等。錯誤的原因主要是由于網絡本身的錯誤(實體鍊路等),或者位元組跳變引起的。

目前錯誤處理的方法比較簡單,處理流程如下:

1)關閉目前socket的連接配接。

2)重建立立一個socket連接配接。

3)重新發送沒有接受到ack應對的消息。

函數pipe::fault用來處理錯誤:

1)調用shutdown_socket關閉pipe的socket。

2)調用函數requeue_sent把沒有收到ack的消息重新加入發送隊列,當發送隊列有請求時,發送線程會不斷地嘗試重新連接配接。