InstanceRegistry#register
PeerAwareInstanceRegistryImpl#register
這裡取得微服務過期時間 90s,服務之間心跳請求 30s 一次,如果 90s 還沒發生,就說明挂了
這就是責任鍊模式!!!
AbstractInstanceRegistry#register 真正的注冊
-
系統資料庫
K:微服務叢集 V:微服務叢集内的各個節點
當發生注冊資訊沖突時,咋辦?
根據最後活躍時間,确定覆寫哪個
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 并發讀鎖
read.lock();
// 先獲得微服務名,然後獲得執行個體
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 第一次注冊時還不存在,是以 new 一個
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
// 沒有時才 put,有就不更新! 因為registry還是可能被寫的!畢竟他不在讀鎖範圍内!
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 已存在的注冊節點
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
// 拿到已存在節點的注冊時間
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
// 目前正在注冊的節點的注冊時間
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// 注冊ing 時,有更加新的節點注冊了!
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
// 那麼就用已存在的替換目前注冊節點,因為考慮到盡量保證可用性,因為既然還在活躍說明老的還能用
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 新的心跳續約對象,包括注冊資訊,最後操作時間,注冊事件,過期時間,剔除時間
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 将新對象放入系統資料庫
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
// 加入注冊隊列
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
服務續約
通知Eureka Server Service Provider還活着,避免服務被剔除。
和register實作思路基本一緻
- 更新自身狀态
- 再同步到其它Peer
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
// 拿到具體的注冊節點
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
// 不存在,則重新注冊續約
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
//存在
// 先得到節點資訊
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 看是否處于當機狀态
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 正常則續約
leaseToRenew.renew();
return true;
}
}
public enum InstanceStatus {
UP, // Ready to receive traffic
DOWN, // Do not send traffic- healthcheck callback failed
STARTING, // Just about starting- initializations to be done - do not
// send traffic
OUT_OF_SERVICE, // Intentionally shutdown for traffic
UNKNOWN;
public static InstanceStatus toEnum(String s) {
if (s != null) {
try {
return InstanceStatus.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
// ignore and fall through to unknown
logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
}
}
return UNKNOWN;
}
}
續約,利用更新的持續時間,如果它是由相關聯的指定T注冊期間,否則預設持續時間是DEFAULT_DURATION_IN_SECS,即 90s