switch (reason) {
case TopologyCoordinator::StartElectionReason::kElectionTimeout:
log() << "Starting an election, since we've seen no PRIMARY in the past "
<< _rsConfig.getElectionTimeoutPeriod();
break;
case TopologyCoordinator::StartElectionReason::kPriorityTakeover:
log() << "Starting an election for a priority takeover";
break;
case TopologyCoordinator::StartElectionReason::kStepUpRequest:
case TopologyCoordinator::StartElectionReason::kStepUpRequestSkipDryRun:
log() << "Starting an election due to step up request";
break;
case TopologyCoordinator::StartElectionReason::kCatchupTakeover:
log() << "Starting an election for a catchup takeover";
break;
case TopologyCoordinator::StartElectionReason::kSingleNodePromptElection:
log() << "Starting an election due to single node replica set prompt election";
break;
}
複制
_startElectSelfV1_inlock()
首先要擷取目前副本集的任期(
term
),對于除了
kStepUpRequestsSkipDryRun
之外的所有選舉原因,都需要進行一次預選舉(dry run),然後走
_processDryRunResult(term)
的邏輯。
主要邏輯如下:
// 擷取目前term
long long term = _topCoord->getTerm();
// 如果需要跳過預選舉,則term自增,并且開始真正的選舉
if (reason == TopologyCoordinator::StartElectionReason::kStepUpRequestSkipDryRun) {
long long newTerm = term + 1;
log() << "skipping dry run and running for election in term " << newTerm;
_startRealElection_inlock(newTerm);
lossGuard.dismiss();
return;
}
// 否則都需要進行預選舉(dry run)
log() << "conducting a dry run election to see if we could be elected. current term: " << term;
// 選舉準備(通過心跳heartbeat實作)
_voteRequester.reset(new VoteRequester);
// 預選舉
_processDryRunResult(term);
複制
接下來是預選舉階段
_processDryRunResult(term)
接收目前的
term
值作為參數,通過
_voteRequester
擷取結果,判斷自身是否滿足發起真正選舉的條件,如果滿足的話,再自增term并調用
_startRealElection_inlock(newTerm)
;否則列印預選舉失敗原因并退出。
long long newTerm = originalTerm + 1;
log() << "dry election run succeeded, running for election in term " << newTerm;
_startRealElection_inlock(newTerm);
PostMemberStateUpdateAction result;
if (_memberState.primary() || newState.removed() || newState.rollback()) {
// Wake up any threads blocked in awaitReplication, close connections, etc.
_replicationWaiterList.signalAll_inlock();
// Wake up the optime waiter that is waiting for primary catch-up to finish.
_opTimeWaiterList.signalAll_inlock();
// _canAcceptNonLocalWrites 為false,表示無法接受非local的寫入
invariant(!_canAcceptNonLocalWrites);
serverGlobalParams.validateFeaturesAsMaster.store(false);
// 主節點會傳回這個
result = kActionCloseAllConnections;
} else {
//從節點會傳回這個
result = kActionFollowerModeStateChange;
}
{
// We have to do this check even if our current and target state are the same as we might
// have just failed a stepdown attempt and thus are staying in PRIMARY state but restoring
// our ability to accept writes.
bool canAcceptWrites = _topCoord->canAcceptWrites();
if (canAcceptWrites != _canAcceptNonLocalWrites) {
// We must be holding the global X lock to change _canAcceptNonLocalWrites.
invariant(opCtx);
invariant(opCtx->lockState()->isW());
}
_canAcceptNonLocalWrites = canAcceptWrites;
}
// We've caught up.
if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
log() << "Caught up to the latest optime known via heartbeats after becoming primary.";
abort_inlock();
return;
}