自己動手實作RAFT算法
前段時間學習了一下分布式系統的raft算法,相比較Paxos協定,了解起來确實容易多了,于是就産生了自己動手實作一套基于raft一緻性協定的分布式緩存的想法,經過大約兩個月的空閑時間,終于完成了一個可以運作的python版本aducode/simple-raft-py
子產品劃分
項目主要包括如下幾個子產品:
1. server:
- main loop所在,在main loop中處理IO事件和逾時事件(參考redis的實作)
def server_forever(self):
"""
啟動服務
:return:
"""
# init the server runtime env
self.initialise()
# Main Loop
while self.inputs:
# handle io
self.handle_io_event()
# handle timer
self.handle_timeout_event()
if not self.is_running():
# stoppde or stopping
break
# release the resources
self.realease()
print 'Server stopped!'
- 網絡模型采用IO多路複用模型,使用python的select子產品
- 将socket連接配接封裝成channel,channel中進行輸入輸出資料格式化處理,同時多個channel組成鍊式結構,按順序格式化資料
class Channel(object):
def __init__(self, server, client, _next):
self.server = server
self.client = client
self.next = _next
def input(self, data, recv):
"""
:param data 資料
:param recv 是否從socket接受來的消息
當為False時,說明資料是發送到其他server的,不是server接受來的
"""
pass
def output(self):
pass
def close(self):
"""
說明socket被關閉,傳遞到handler
"""
return self.next.close()
- handler進行真正的邏輯處理,位于channel鍊的最末端
class Handler(object):
def handle(self, server, client, request):
pass
def close(self, server, client):
"""
handler處理close
:return:
"""
pass
- server提供兩個最基本的channel:1 IO2Channel 作為channel鍊的head,負責調用socket.recv接收資料和socket.send發送資料;2 Channel2Handler 作為channel鍊的tail與handler連接配接,負責調用handler.handle方法并将傳回結果傳回到前一個channel
class IO2Channel(Channel):
"""
直接與IO關聯的Channel
"""
def __init__(self, server, client, _next):
super(IO2Channel, self).__init__(server, client, _next)
def input(self, data=None, recv=True):
client_close = False
try:
if recv:
request = self.client.recv()
else:
request = data
except Exception, e:
client_close = True
else:
if request:
self.next.input(request, recv)
else:
client_close = True
if client_close:
self.server.close(self.client)
def output(self):
response, end = self.next.output()
if response:
try:
self.client.send(response)
except (IOError, socket.error):
self.server.close(self.client)
if (not response or end) and self.client in self.server.outputs:
self.server.outputs.remove(self.client)
class Channel2Handler(Channel):
"""
與handler關聯起來
"""
def __init__(self, server, client, _next):
super(Channel2Handler, self).__init__(server, client, _next)
self.queue = Queue.Queue()
def input(self, data, recv):
response = None
if recv:
if isinstance(self.next, Handler):
response = self.next.handle(self.server, self.client, data)
elif isinstance(self.next, types.FunctionType):
response = self.next(self.server, self.client, data)
else:
# 說明直接發出去
response = data
if response:
self.queue.put(response)
if self.client not in self.server.outputs:
self.server.outputs.append(self.client)
def output(self):
try:
return self.queue.get_nowait(), self.queue.empty()
except Queue.Empty:
return None, True
def close(self):
return self.next.close(self.server, self.client)
2. protocol
- 主要負責分布式節點間通信的message定義與解析
- 消息主要分為ClientMessage & NodeMessage
- 其中ClientMessage為用戶端發來的操作,如set get del commit rollback
- NodeMessage為分布式節點之間傳遞的消息,如選舉消息、心跳消息等
3.db
- 主要負責資料存儲
- 定義一個db engine接口,用來處理client操作,将來隻要修改配置檔案,就可以選擇不同的存儲引擎了(參考mysql)
- 目前隻有一個簡單的db engine實作,所有資料儲存在python dict中
4.node & state
- node定義了一個邏輯上的節點,以(host, port)作為node的唯一辨別
- node中實作了MessageChannel用來序列化/反序列化,将str格式的message轉換成ClientMessage/NodeMessage
- node中實作了NodeHandler,用來路由不同類型的Message
- state用來定義節點狀态,主要狀态有Leader、Follower、Candidate
class State(object):
"""
節點狀态類
不同狀态有不同的表現
"""
def __init__(self, node):
self.node = node
def handle(self, client, message):
"""
處理其他node的消息
"""
assert isinstance(message, Message)
if isinstance(message, ClientMessage):
if message.op == 'get':
return self.on_get(client, message)
else:
return self.on_update(client, message)
elif isinstance(message, ClientCloseMessage):
return self.on_client_close(client, message)
elif isinstance(message, HeartbeatRequestMessage):
return self.on_heartbeat_request(client, message)
elif isinstance(message, HeartbeatResponseMessage):
return self.on_heartbeat_response(client, message)
elif isinstance(message, ElectRequestMessage):
return self.on_elect_request(client, message)
elif isinstance(message, ElectResponseMessage):
return self.on_elect_response(client, message)
elif isinstance(message, SyncRequestMessage):
return self.on_sync_request(client, message)
elif isinstance(message, SyncResponseMessage):
return self.on_sync_response(client, message)
else:
pass
def on_get(self, client, message):
"""
處理用戶端get請求
"""
return self.node.config.db.handle(client, message.op, message.key, message.value, message.auto_commit)
def on_update(self, client, message):
"""
處理用戶端除了get外的請求
"""
return '@%s:%[email protected]' % self.node.leader if self.node.leader \
else 'No Leader Elected, please wait until we have a leader...'
def on_client_close(self, client, message):
"""
處理用戶端關閉
"""
return self.node.config.db.release(message.client)
def on_heartbeat_request(self, client, message):
"""
接收到heartbeat請求的處理方法
"""
pass
def on_heartbeat_response(self, client, message):
"""
接收到heartbeata響應的處理方法
"""
pass
def on_elect_request(self, client, message):
"""
接收到選舉請求的處理方法
"""
pass
def on_elect_response(self, client, message):
"""
接收到選舉響應的處理方法
"""
pass
def on_sync_request(self, client, message):
"""
接收到同步請求
"""
pass
def on_sync_response(self, client, message):
"""
接收同步響應
"""
pass
def __str__(self):
return self.__class__.__name__.strip().upper()
目前實作的功能
- 多節點選舉Leader
- Leader節點下線重選
- Leader發送心跳,根據響應維護follower存活清單
- Node之間資料同步
- 簡單資料事物commit rollback
不足和想法
- 隻是用來學習raft的一個小項目,性能肯定不足,扛不住高并發的請求
- 節點之間的Request & Response 基本上都是異步處理的,異步程式設計的代碼風格對人不是很友好,好多on_xxxx方法,許多代碼邏輯是割裂開的(考慮用python中的yield實作協程把代碼風格程式設計同步風格)
- 異步的邏輯導緻調試起來比較痛苦
- 本來以為很簡單的功能,沒想到用了兩個月,才實作了最最最基礎的邏輯,從中我除了學到了raft一緻性協定的原理以外,還對IO多路複用更加的熟練了。