天天看點

開發SpringCloud微服務三年,我才知道@EnableEurekaServer注解到底做了什麼(中)InstanceRegistry#registerPeerAwareInstanceRegistryImpl#registerAbstractInstanceRegistry#register 真正的注冊服務續約

InstanceRegistry#register

開發SpringCloud微服務三年,我才知道@EnableEurekaServer注解到底做了什麼(中)InstanceRegistry#registerPeerAwareInstanceRegistryImpl#registerAbstractInstanceRegistry#register 真正的注冊服務續約

PeerAwareInstanceRegistryImpl#register

開發SpringCloud微服務三年,我才知道@EnableEurekaServer注解到底做了什麼(中)InstanceRegistry#registerPeerAwareInstanceRegistryImpl#registerAbstractInstanceRegistry#register 真正的注冊服務續約

這裡取得微服務過期時間 90s,服務之間心跳請求 30s 一次,如果 90s 還沒發生,就說明挂了

開發SpringCloud微服務三年,我才知道@EnableEurekaServer注解到底做了什麼(中)InstanceRegistry#registerPeerAwareInstanceRegistryImpl#registerAbstractInstanceRegistry#register 真正的注冊服務續約

這就是責任鍊模式!!!

AbstractInstanceRegistry#register 真正的注冊

  • 系統資料庫

    K:微服務叢集 V:微服務叢集内的各個節點

  • 開發SpringCloud微服務三年,我才知道@EnableEurekaServer注解到底做了什麼(中)InstanceRegistry#registerPeerAwareInstanceRegistryImpl#registerAbstractInstanceRegistry#register 真正的注冊服務續約

當發生注冊資訊沖突時,咋辦?

根據最後活躍時間,确定覆寫哪個

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 并發讀鎖
            read.lock();
            // 先獲得微服務名,然後獲得執行個體
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            // 第一次注冊時還不存在,是以 new 一個
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                // 沒有時才 put,有就不更新! 因為registry還是可能被寫的!畢竟他不在讀鎖範圍内!
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 已存在的注冊節點
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 拿到已存在節點的注冊時間
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                // 目前正在注冊的節點的注冊時間
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // 注冊ing 時,有更加新的節點注冊了!
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    // 那麼就用已存在的替換目前注冊節點,因為考慮到盡量保證可用性,因為既然還在活躍說明老的還能用
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 新的心跳續約對象,包括注冊資訊,最後操作時間,注冊事件,過期時間,剔除時間
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 将新對象放入系統資料庫
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                // 加入注冊隊列
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }      

服務續約

通知Eureka Server Service Provider還活着,避免服務被剔除。

和register實作思路基本一緻

  • 更新自身狀态
  • 再同步到其它Peer
public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            // 拿到具體的注冊節點
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            // 不存在,則重新注冊續約
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
        //存在
            // 先得到節點資訊
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                // 看是否處于當機狀态
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            // 正常則續約
            leaseToRenew.renew();
            return true;
        }
    }      
public enum InstanceStatus {
        UP, // Ready to receive traffic
        DOWN, // Do not send traffic- healthcheck callback failed
        STARTING, // Just about starting- initializations to be done - do not
        // send traffic
        OUT_OF_SERVICE, // Intentionally shutdown for traffic
        UNKNOWN;

        public static InstanceStatus toEnum(String s) {
            if (s != null) {
                try {
                    return InstanceStatus.valueOf(s.toUpperCase());
                } catch (IllegalArgumentException e) {
                    // ignore and fall through to unknown
                    logger.debug("illegal argument supplied to InstanceStatus.valueOf: {}, defaulting to {}", s, UNKNOWN);
                }
            }
            return UNKNOWN;
        }
    }      

續約,利用更新的持續時間,如果它是由相關聯的指定T注冊期間,否則預設持續時間是DEFAULT_DURATION_IN_SECS,即 90s

開發SpringCloud微服務三年,我才知道@EnableEurekaServer注解到底做了什麼(中)InstanceRegistry#registerPeerAwareInstanceRegistryImpl#registerAbstractInstanceRegistry#register 真正的注冊服務續約

繼續閱讀