SOFAJRaft 是基于 Raft 算法的生産級高性能 Java 實作,支援 MULTI-RAFT-GROUP。應用場景有 Leader 選舉、分布式鎖服務、高可靠的元資訊管理、分布式存儲系統。
如果不了解Raft算法的朋友可以去看看這篇文章:Raft 為什麼是更易了解的分布式一緻性算法,寫的很詳細了。
這張圖是SOFAJRaft的設計圖,其中Node 代表了一個 SOFAJRaft Server 節點。
由于SOFAJRaft的Node節點是一個分布式的結構,是以Node節點需要将資訊傳遞給其他Node,是以Replicator的作用就是用來複制資訊給其他的Node。多個Replicator共同組成一個ReplicatorGroup。
Snapshot是表示一個快照,就是對資料目前值的一個記錄,會存盤儲存,提供冷備資料功能。
Leader 生成快照有這麼幾個作用:
- 當有新的 Node 加入叢集的時候,不用隻靠日志複制、回放去和 Leader 保持資料一緻,而是通過安裝 Leader 的快照來跳過早期大量日志的回放;
- Leader 用快照替代 Log 複制可以減少網絡上的資料量;
- 用快照替代早期的 Log 可以節省存儲空間;
StateMachine 接口是用來給使用者去實作的部分。通過使用者實作具體的業務邏輯進而在分布式系統中達成共識。
在 StateMachine 上,我們要去實作狀态機暴露給我們待實作的幾個接口,最重要的是 onApply 接口,要在這個接口裡将 Cilent 的請求指令進行運算,轉換成具體的計數器值。而 onSnapshotSave 和 onSnapshotLoad 接口則是負責快照的生成和加載。
Client也是需要使用者去實作的部分,使用者需要去定義不同的消息類型和用戶端的處理邏輯。
實作Counter分布式計數器
下面我們給出個需求: 提供一個 Counter,Client 每次計數時可以指定步幅,也可以随時發起查詢。
将它翻譯成具體的功能點,主要有三部分:
- 實作:Counter server,具備計數功能,具體運算公式為:Cn = Cn-1 + delta;
- 提供寫服務,寫入 delta 觸發計數器運算;
- 提供讀服務,讀取目前 Cn 值;
具體代碼:Counter
在這個demo中,我們啟動三個server作為一個group,傳入下面的參數:
/tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
表示使用/tmp/server1 ,/tmp/server2,/tmp/server3三個目錄用來存儲資料,raft group名稱為 counter,節點ip也分别為
127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
然後啟動用戶端,并傳入下面參數:
counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
表示綁定的raft group名稱為 counter,叢集為:
127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
服務端
CounterServer
public CounterServer(final String dataPath, final String groupId, final PeerId serverId,
final NodeOptions nodeOptions) throws IOException {
// 初始化路徑
FileUtils.forceMkdir(new File(dataPath));
// 這裡讓 raft RPC 和業務 RPC 使用同一個 RPC server, 通常也可以分開
final RpcServer rpcServer = new RpcServer(serverId.getPort());
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
// 注冊業務處理器
rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
// 初始化狀态機
this.fsm = new CounterStateMachine();
// 設定狀态機到啟動參數
nodeOptions.setFsm(this.fsm);
// 設定存儲路徑
// 日志, 必須
nodeOptions.setLogUri(dataPath + File.separator + "log");
// 元資訊, 必須
nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
// snapshot, 可選, 一般都推薦
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
// 初始化 raft group 服務架構
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
// 啟動
this.node = this.raftGroupService.start();
}
服務端CounterServer在執行個體化的時候會設定相應的處理器,這裡設定了GetValueRequestProcessor和 IncrementAndGetRequestProcessor。
GetValueRequestProcessor用來提供讀服務,讀取目前 Cn 值;
IncrementAndGetRequestProcessor提供寫服務,寫入 delta 觸發計數器運算;
GetValueRequestProcessor
@Override
public Object handleRequest(final BizContext bizCtx, final GetValueRequest request) throws Exception {
if (!this.counterServer.getFsm().isLeader()) {
return this.counterServer.redirect();
}
final ValueResponse response = new ValueResponse();
response.setSuccess(true);
response.setValue(this.counterServer.getFsm().getValue());
return response;
}
GetValueRequestProcessor的處理非常的簡單,直接擷取狀态機的值然後傳回。
IncrementAndGetRequestProcessor
public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx,
final IncrementAndGetRequest request) {
//判斷目前節點是否是leader
if (!this.counterServer.getFsm().isLeader()) {
asyncCtx.sendResponse(this.counterServer.redirect());
return;
}
//設定響應資料
final ValueResponse response = new ValueResponse();
//封裝請求資料,并回調響應結果
final IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response,
status -> {
//響應成功
if (!status.isOk()) {
response.setErrorMsg(status.getErrorMsg());
response.setSuccess(false);
}
//發送響應請求
asyncCtx.sendResponse(response);
});
try {
final Task task = new Task();
task.setDone(closure);
//序列化請求
task.setData(ByteBuffer
.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(request)));
//調用node處理請求
// apply task to raft group.
counterServer.getNode().apply(task);
} catch (final CodecException e) {
LOG.error("Fail to encode IncrementAndGetRequest", e);
//請求失敗,則立即響應
response.setSuccess(false);
response.setErrorMsg(e.getMessage());
asyncCtx.sendResponse(response);
}
}
這裡使用IncrementAndAddClosure來封裝響應和請求,并通過回調的方式進行異步回寫資料到client。然後執行個體化Task執行個體,序列化請求資料,調用node的apply方法。
然後設定了CounterStateMachine狀态機,并設值了日志,元資訊和快照的存儲路徑。
CounterStateMachine實作了StateMachineAdapter抽象類,并重寫了3個方法:
onApply用來處理具體的業務
onSnapshotSave儲存快照
onSnapshotLoad加載快照
在儲存和加載快照的地方使用了CounterSnapshotFile類來進行輔助。
CounterStateMachine
public class CounterStateMachine extends StateMachineAdapter {
...
private final AtomicLong value = new AtomicLong(0);
public void onApply(final Iterator iter) {
//擷取processor中封裝的資料
while (iter.hasNext()) {
long delta = 0;
//用于封裝請求資料和回調結果
IncrementAndAddClosure closure = null;
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional parsing.
closure = (IncrementAndAddClosure) iter.done();
delta = closure.getRequest().getDelta();
} else {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
final IncrementAndGetRequest request = SerializerManager.getSerializer(SerializerManager.Hessian2)
.deserialize(data.array(), IncrementAndGetRequest.class.getName());
delta = request.getDelta();
} catch (final CodecException e) {
LOG.error("Fail to decode IncrementAndGetRequest", e);
}
}
//擷取目前值
final long prev = this.value.get();
//将目前值加上delta
final long updated = value.addAndGet(delta);
//設定響應,并調用run方法回寫響應方法
if (closure != null) {
closure.getResponse().setValue(updated);
closure.getResponse().setSuccess(true);
closure.run(Status.OK());
}
LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
iter.next();
}
}
}
這裡的onApply方法首先會擷取processor中封裝的資料,然後擷取processor中傳入的closure執行個體,然後處理好業務邏輯後調用closure的run進行回調傳回資料到用戶端。
用戶端
CounterClient
public static void main(final String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}");
System.out
.println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
System.exit(1);
}
final String groupId = args[0];
final String confStr = args[1];
final Configuration conf = new Configuration();
if (!conf.parse(confStr)) {
throw new IllegalArgumentException("Fail to parse conf:" + confStr);
}
// 更新raft group配置
RouteTable.getInstance().updateConfiguration(groupId, conf);
//接下來初始化 RPC 用戶端并更新路由表
final BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
throw new IllegalStateException("Refresh leader failed");
}
//擷取 leader 後發送請求
final PeerId leader = RouteTable.getInstance().selectLeader(groupId);
System.out.println("Leader is " + leader);
final int n = 1000;
final CountDownLatch latch = new CountDownLatch(n);
final long start = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
incrementAndGet(cliClientService, leader, i, latch);
}
latch.await();
System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
System.exit(0);
}
用戶端先是根據groupId和IP綁定server,然後更新路由表,擷取leader
private static void incrementAndGet(final BoltCliClientService cliClientService, final PeerId leader,
final long delta, CountDownLatch latch) throws RemotingException,
InterruptedException {
final IncrementAndGetRequest request = new IncrementAndGetRequest();
request.setDelta(delta);
cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request,
new InvokeCallback() {
@Override
public void onResponse(Object result) {
latch.countDown();
System.out.println("incrementAndGet result:" + result);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public Executor getExecutor() {
return null;
}
}, 5000);
}
然後調用incrementAndGet方法。incrementAndGet方法中使用cliClientService擷取client然後傳入request請求并設值回調函數。
總體流程
這裡總結一下整個server和client的調用流程
首先是CounterClient綁定server後,擷取server的leader節點,然後發送一個IncrementAndGetRequest的request請求到server。
Server接收到請求後根據請求的類型交給IncrementAndGetRequestProcessor處理,并調用handleRequest方法。
然後handleRequest會将資料封裝調用狀态機的onApply方法,處理業務資料後調用closure進行回調。
closure回調後會封裝一個ValueResponse發送響應請求給用戶端。
用戶端會回調onResponse方法。
到這裡整個counter的例子就講解完畢了