我們這次依然用上次的例子CounterServer來進行講解:
我這裡就不貼整個代碼了
public static void main(final String[] args) throws IOException {
if (args.length != 4) {
System.out
.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}");
System.out
.println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer " +
"/tmp/server1 " +
"counter " +
"127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
System.exit(1);
}
//日志存儲的路徑
final String dataPath = args[0];
//SOFAJRaft叢集的名字
final String groupId = args[1];
//目前節點的ip和端口
final String serverIdStr = args[2];
//叢集節點的ip和端口
final String initConfStr = args[3];
final NodeOptions nodeOptions = new NodeOptions();
// 為了測試,調整 snapshot 間隔等參數
// 設定選舉逾時時間為 1 秒
nodeOptions.setElectionTimeoutMs(1000);
// 關閉 CLI 服務。
nodeOptions.setDisableCli(false);
// 每隔30秒做一次 snapshot
nodeOptions.setSnapshotIntervalSecs(30);
// 解析參數
final PeerId serverId = new PeerId();
if (!serverId.parse(serverIdStr)) {
throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr);
}
final Configuration initConf = new Configuration();
//将raft分組加入到Configuration的peers數組中
if (!initConf.parse(initConfStr)) {
throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr);
}
// 設定初始叢集配置
nodeOptions.setInitialConf(initConf);
// 啟動
final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions);
System.out.println("Started counter server at port:"
+ counterServer.getNode().getNodeId().getPeerId().getPort());
}
我們在啟動server的main方法的時候會傳入日志存儲的路徑、SOFAJRaft叢集的名字、目前節點的ip和端口、叢集節點的ip和端口并設值到NodeOptions中,作為目前節點啟動的參數。
這裡會将目前節點初始化為一個PeerId對象
PeerId
//存放目前節點的ip和端口号
private Endpoint endpoint = new Endpoint(Utils.IP_ANY, 0);
//預設是0
private int idx;
//是一個ip:端口的字元串
private String str;
public PeerId() {
super();
}
public boolean parse(final String s) {
final String[] tmps = StringUtils.split(s, ':');
if (tmps.length != 3 && tmps.length != 2) {
return false;
}
try {
final int port = Integer.parseInt(tmps[1]);
this.endpoint = new Endpoint(tmps[0], port);
if (tmps.length == 3) {
this.idx = Integer.parseInt(tmps[2]);
} else {
this.idx = 0;
}
this.str = null;
return true;
} catch (final Exception e) {
LOG.error("Parse peer from string failed: {}", s, e);
return false;
}
}
PeerId的parse方法會将傳入的ip:端口解析之後對變量進行一些指派的操作。
然後會調用到CounterServer的構造器中:
CounterServer
public CounterServer(final String dataPath, final String groupId, final PeerId serverId,
final NodeOptions nodeOptions) throws IOException {
// 初始化路徑
FileUtils.forceMkdir(new File(dataPath));
// 這裡讓 raft RPC 和業務 RPC 使用同一個 RPC server, 通常也可以分開
final RpcServer rpcServer = new RpcServer(serverId.getPort());
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
// 注冊業務處理器
rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
// 初始化狀态機
this.fsm = new CounterStateMachine();
// 設定狀态機到啟動參數
nodeOptions.setFsm(this.fsm);
// 設定存儲路徑
// 日志, 必須
nodeOptions.setLogUri(dataPath + File.separator + "log");
// 元資訊, 必須
nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
// snapshot, 可選, 一般都推薦
nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
// 初始化 raft group 服務架構
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
// 啟動
this.node = this.raftGroupService.start();
}
這個方法主要是調用NodeOptions的各種方法進行設定,然後調用raftGroupService的start方法啟動raft節點。
RaftGroupService
我們來到RaftGroupService的start方法:
RaftGroupService#start
public synchronized Node start(final boolean startRpcServer) {
//如果已經啟動了,那麼就傳回
if (this.started) {
return this.node;
}
//校驗serverId和groupId
if (this.serverId == null || this.serverId.getEndpoint() == null
|| this.serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0))) {
throw new IllegalArgumentException("Blank serverId:" + this.serverId);
}
if (StringUtils.isBlank(this.groupId)) {
throw new IllegalArgumentException("Blank group id" + this.groupId);
}
//Adds RPC server to Server.
//設定目前node的ip和端口
NodeManager.getInstance().addAddress(this.serverId.getEndpoint());
//建立node
this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
if (startRpcServer) {
//啟動遠端服務
this.rpcServer.start();
} else {
LOG.warn("RPC server is not started in RaftGroupService.");
}
this.started = true;
LOG.info("Start the RaftGroupService successfully.");
return this.node;
}
這個方法會在一開始的時候對RaftGroupService在構造器執行個體化的參數進行校驗,然後把目前節點的Endpoint添加到NodeManager的addrSet變量中,接着調用RaftServiceFactory#createAndInitRaftNode執行個體化Node節點。
每個節點都會啟動一個rpc的服務,因為每個節點既可以被選舉也可以投票給其他節點,節點之間需要互相通信,是以需要啟動一個rpc服務。
RaftServiceFactory#createAndInitRaftNode
public static Node createAndInitRaftNode(final String groupId, final PeerId serverId, final NodeOptions opts) {
//執行個體化一個node節點
final Node ret = createRaftNode(groupId, serverId);
//為node節點初始化
if (!ret.init(opts)) {
throw new IllegalStateException("Fail to init node, please see the logs to find the reason.");
}
return ret;
}
public static Node createRaftNode(final String groupId, final PeerId serverId) {
return new NodeImpl(groupId, serverId);
}
createAndInitRaftNode方法首先調用createRaftNode執行個體化一個Node的執行個體NodeImpl,然後調用其init方法進行初始化,主要的配置都是在init方法中完成的。
NodeImpl
public NodeImpl(final String groupId, final PeerId serverId) {
super();
if (groupId != null) {
//檢驗groupId是否符合格式規範
Utils.verifyGroupId(groupId);
}
this.groupId = groupId;
this.serverId = serverId != null ? serverId.copy() : null;
//一開始的設定為未初始化
this.state = State.STATE_UNINITIALIZED;
//設定新的任期為0
this.currTerm = 0;
//設定最新的時間戳
updateLastLeaderTimestamp(Utils.monotonicMs());
this.confCtx = new ConfigurationCtx(this);
this.wakingCandidate = null;
final int num = GLOBAL_NUM_NODES.incrementAndGet();
LOG.info("The number of active nodes increment to {}.", num);
}
NodeImpl會在構造器中初始化一些參數。
Node的初始化
Node節點的所有的重要的配置都是在init方法中完成的,NodeImpl的init方法比較長是以分成代碼塊來進行講解。
NodeImpl#init
//非空校驗
Requires.requireNonNull(opts, "Null node options");
Requires.requireNonNull(opts.getRaftOptions(), "Null raft options");
Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory");
//目前就一個實作:DefaultJRaftServiceFactory
this.serviceFactory = opts.getServiceFactory();
this.options = opts;
this.raftOptions = opts.getRaftOptions();
//基于 Metrics 類庫的性能名額統計,具有豐富的性能統計名額,預設不開啟度量工具
this.metrics = new NodeMetrics(opts.isEnableMetrics());
if (this.serverId.getIp().equals(Utils.IP_ANY)) {
LOG.error("Node can't started from IP_ANY.");
return false;
}
if (!NodeManager.getInstance().serverExists(this.serverId.getEndpoint())) {
LOG.error("No RPC server attached to, did you forget to call addService?");
return false;
}
//定時任務管理器
this.timerManager = new TimerManager();
//初始化定時任務管理器的内置線程池
if (!this.timerManager.init(this.options.getTimerPoolSize())) {
LOG.error("Fail to init timer manager.");
return false;
}
//定時任務管理器
this.timerManager = new TimerManager();
//初始化定時任務管理器的内置線程池
if (!this.timerManager.init(this.options.getTimerPoolSize())) {
LOG.error("Fail to init timer manager.");
return false;
}
這段代碼主要是給各個變量指派,然後進行校驗判斷一下serverId不能為0.0.0.0,目前的Endpoint必須要在NodeManager裡面設定過等等(NodeManager的設定是在RaftGroupService的start方法裡)。
然後會初始化一個全局的的定時排程管理器TimerManager:
TimerManager
private ScheduledExecutorService executor;
@Override
public boolean init(Integer coreSize) {
this.executor = Executors.newScheduledThreadPool(coreSize, new NamedThreadFactory(
"JRaft-Node-ScheduleThreadPool-", true));
return true;
}
TimerManager的init方法就是初始化一個線程池,如果目前的伺服器的cpu線程數3 大于20 ,那麼這個線程池的coreSize就是20,否則就是cpu線程數3。
往下走是計時器的初始化:
// Init timers
//設定投票計時器
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
//處理投票逾時
handleVoteTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在一定範圍内傳回一個随機的時間戳
return randomTimeout(timeoutMs);
}
};
//設定預投票計時器
//當leader在規定的一段時間内沒有與 Follower 艦船進行通信時,
// Follower 就可以認為leader已經不能正常擔任旗艦的職責,則 Follower 可以去嘗試接替leader的角色。
// 這段通信逾時被稱為 Election Timeout
//候選者在發起投票之前,先發起預投票
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在一定範圍内傳回一個随機的時間戳
//為了避免同時發起選舉而導緻失敗
return randomTimeout(timeoutMs);
}
};
//leader下台的計時器
//定時檢查是否需要重新選舉leader
this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {
@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};
//快照計時器
this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer", this.options.getSnapshotIntervalSecs() * 1000) {
@Override
protected void onTrigger() {
handleSnapshotTimeout();
}
};
voteTimer是用來控制選舉的,如果選舉逾時,目前的節點又是候選者角色,那麼就會發起選舉。
electionTimer是預投票計時器。候選者在發起投票之前,先發起預投票,如果沒有得到半數以上節點的回報,則候選者就會識趣的放棄參選。
stepDownTimer定時檢查是否需要重新選舉leader。目前的leader可能出現它的Follower可能并沒有整個叢集的1/2卻還沒有下台的情況,那麼這個時候會定期的檢檢視leader的Follower是否有那麼多,沒有那麼多的話會強制讓leader下台。
snapshotTimer快照計時器。這個計時器會每隔1小時觸發一次生成一個快照。
這些計時器的具體實作現在暫時不表,等到要講具體功能的時候再進行梳理。
這些計時器有一個共同的特點就是會根據不同的計時器傳回一個在一定範圍内随機的時間。傳回一個随機的時間可以防止多個節點在同一時間内同時發起投票選舉進而降低選舉失敗的機率。
繼續往下看:
this.configManager = new ConfigurationManager();
//初始化一個disruptor,采用多生産者模式
this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() //
//設定disruptor大小,預設16384
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setEventFactory(new LogEntryAndClosureFactory()) //
.setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
//設定事件處理器
this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
//設定異常處理器
this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
// 啟動disruptor的線程
this.applyQueue = this.applyDisruptor.start();
//如果開啟了metrics統計
if (this.metrics.getMetricRegistry() != null) {
this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",
new DisruptorMetricSet(this.applyQueue));
}
這裡初始化了一個Disruptor作為消費隊列,不清楚Disruptor的朋友可以去看我上一篇文章:Disruptor—核心概念及體驗。然後還校驗了metrics是否開啟,預設是不開啟的。
//fsmCaller封裝對業務 StateMachine 的狀态轉換的調用以及日志的寫入等
this.fsmCaller = new FSMCallerImpl();
//初始化日志存儲功能
if (!initLogStorage()) {
LOG.error("Node {} initLogStorage failed.", getNodeId());
return false;
}
//初始化中繼資料存儲功能
if (!initMetaStorage()) {
LOG.error("Node {} initMetaStorage failed.", getNodeId());
return false;
}
//對FSMCaller初始化
if (!initFSMCaller(new LogId(0, 0))) {
LOG.error("Node {} initFSMCaller failed.", getNodeId());
return false;
}
//執行個體化投票箱
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
//初始化ballotBox的屬性
if (!this.ballotBox.init(ballotBoxOpts)) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}
//初始化快照存儲功能
if (!initSnapshotStorage()) {
LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
return false;
}
//校驗日志檔案索引的一緻性
final Status st = this.logManager.checkConsistency();
if (!st.isOk()) {
LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);
return false;
}
//配置管理raft group中的資訊
this.conf = new ConfigurationEntry();
this.conf.setId(new LogId());
// if have log using conf in log, else using conf in options
if (this.logManager.getLastLogIndex() > 0) {
this.conf = this.logManager.checkAndSetConfiguration(this.conf);
} else {
this.conf.setConf(this.options.getInitialConf());
}
這段代碼主要是對快照、日志、中繼資料等功能初始化。
this.replicatorGroup = new ReplicatorGroupImpl();
//收其他節點或者用戶端發過來的請求,轉交給對應服務處理
this.rpcService = new BoltRaftClientService(this.replicatorGroup);
final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
rgOpts.setLogManager(this.logManager);
rgOpts.setBallotBox(this.ballotBox);
rgOpts.setNode(this);
rgOpts.setRaftRpcClientService(this.rpcService);
rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
rgOpts.setRaftOptions(this.raftOptions);
rgOpts.setTimerManager(this.timerManager);
// Adds metric registry to RPC service.
this.options.setMetricRegistry(this.metrics.getMetricRegistry());
//初始化rpc服務
if (!this.rpcService.init(this.options)) {
LOG.error("Fail to init rpc service.");
return false;
}
this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts);
this.readOnlyService = new ReadOnlyServiceImpl();
final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
rosOpts.setFsmCaller(this.fsmCaller);
rosOpts.setNode(this);
rosOpts.setRaftOptions(this.raftOptions);
//隻讀服務初始化
if (!this.readOnlyService.init(rosOpts)) {
LOG.error("Fail to init readOnlyService.");
return false;
}
這段代碼主要是初始化replicatorGroup、rpcService以及readOnlyService。
接下來是最後一段的代碼:
// set state to follower
this.state = State.STATE_FOLLOWER;
if (LOG.isInfoEnabled()) {
LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf());
}
//如果快照執行器不為空,并且生成快照的時間間隔大于0,那麼就定時生成快照
if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) {
LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm);
this.snapshotTimer.start();
}
if (!this.conf.isEmpty()) {
//新啟動的node需要重新選舉
stepDown(this.currTerm, false, new Status());
}
if (!NodeManager.getInstance().add(this)) {
LOG.error("NodeManager add {} failed.", getNodeId());
return false;
}
// Now the raft node is started , have to acquire the writeLock to avoid race
// conditions
this.writeLock.lock();
//這個分支表示目前的jraft叢集裡隻有一個節點,那麼個節點必定是leader直接進行選舉就好了
if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) {
// The group contains only this server which must be the LEADER, trigger
// the timer immediately.
electSelf();
} else {
this.writeLock.unlock();
}
return true;
這段代碼裡會将目前的狀态設定為Follower,然後啟動快照定時器定時生成快照。
如果目前的叢集不是單節點叢集需要做一下stepDown,表示新生成的Node節點需要重新進行選舉。
最下面有一個if分支,如果目前的jraft叢集裡隻有一個節點,那麼個節點必定是leader直接進行選舉就好了,是以會直接調用electSelf進行選舉。
選舉的代碼我們就暫時略過,要不然後面就沒得講了。
到這裡整個NodeImpl執行個體的init方法就分析完了,這個方法很長,但是還是做了很多事情的。
好了,今天也不早了,各位晚安~