天天看點

《Loy解說Eureka服務端源碼(二)》

01 | 說明

    接着上一篇文章,上一篇主要講解了Eureka的搭建過程,以及自動配置類的一些相關配置,本篇重點說明以下内容

    1.1  服務端是如何接收用戶端請求的?

02 | 服務端如何接收用戶端請求

    2.1 建構Jersey過濾器

           1. 該過濾器的作用類似于Spring-MVC,用于攔截rest請求

           2. 與Spring-MVC不同的是,Spring-MVC使用攔截器進行請求攔截,而Jersey使用過濾器攔截。

      2.2 建構過程

@Bean
 //jersey過濾器
  public FilterRegistrationBean jerseyFilterRegistration(
           javax.ws.rs.core.Application eurekaJerseyApp) {
       FilterRegistrationBean bean = new FilterRegistrationBean();
       bean.setFilter(new ServletContainer(eurekaJerseyApp));
       bean.setOrder(Ordered.LOWEST_PRECEDENCE);
       bean.setUrlPatterns(
                Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

        return bean;
    }


@Bean
// 構造一個jersey應用,相當于Spring-mvc,建構rest請求
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
			ResourceLoader resourceLoader) {

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
				false, environment);

		// Filter to include only classes that have a particular annotation.
		// 過濾Path.class以及Provider.class注解修飾的類
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));

		// Find classes in Eureka packages (or subpackages)
		// 通過指定的包名查找類
		Set<Class<?>> classes = new HashSet<>();
		for (String basePackage : EUREKA_PACKAGES) {
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
						resourceLoader.getClassLoader());
				classes.add(cls);
			}
		}

		// Construct the Jersey ResourceConfig
		Map<String, Object> propsAndFeatures = new HashMap<>();
		propsAndFeatures.put(
				// Skip static content used by the webapp
				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

		DefaultResourceConfig rc = new DefaultResourceConfig(classes);
		rc.setPropertiesAndFeatures(propsAndFeatures);

		return rc;
	}
           

    2.3  處理執行個體的注冊過程

        1.   通過上面建構Jersey應用可知,在建構過程中,會對指定的包名進行掃描,通過斷點可知,最終掃描的類主要有九個,其中我們最關心的是ApplicationResource這個類。

《Loy解說Eureka服務端源碼(二)》

        用戶端的注冊請求,主要就是在這個ApplicationResource進行攔截及處理。處理的過程如下:

        2. 通過請求進入該類的addInstance方法,該方法主要用于判斷請求執行個體的一些基本資訊,包括執行個體ID是否為空,hostName是否為空等  

《Loy解說Eureka服務端源碼(二)》

    2.4. 啟動執行個體節點注冊器

         1. 建構執行個體節點注冊器,用于注冊節點

@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); 
         // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}
           

  2.5  服務端context初始化

      2.5.1  代碼

@Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}
           

      2.5.1 EurekaContext 初始化的過程

            1. 通過DefaultEurekaServerContext的initialize()方法進行初始化

            2. 通過peerEurekaNodes.start()方法,進行定時的重新整理節點資訊(關于定時重新整理節點資訊,後續補充源碼分析,這裡隻講述有這麼個過程)

            3. 通過注冊器的實作類,進行注冊器的初始化 registry.init(peerEurekaNodes) 實作類為:PeerAwareInstanceRegistryImpl

2.6 建構Eureka節點資訊

     2.6.1 代碼

@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs) {
		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}
           

2.7 啟動EurekaServerBootstrap

          2.7.1 代碼

@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}
           

          2.7.2 初始化

                 1.  初始化Eureka的環境資訊 initEurekaEnvironment();

                 2.  初始化Eureka的Context資訊 initEurekaServerContext();

2.8 同步節點資訊的過程分析

        2.8.1 同步所有節點資訊

@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            
            // 擷取所有的節點資訊
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                         // 判斷是否可以注冊
                        if (isRegisterable(instance)) {
                            // 進行注冊
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
           

          2.8.2  注冊過程說明

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 上隻讀鎖
        read.lock();
        // 從本地MAP裡面擷取目前執行個體的資訊。
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        // 增加注冊次數到監控資訊裡面去。
        REGISTER.increment(isReplication);
        if (gMap == null) {
            // 如果第一次進來,那麼gMap為空,則建立一個ConcurrentHashMap放入到registry裡面去
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            // putIfAbsent方法主要是在向ConcurrentHashMap中添加鍵—值對的時候,它會先判斷該鍵值對是否已經存在。
            // 如果不存在(新的entry),那麼會向map中添加該鍵值對,并傳回null。
            // 如果已經存在,那麼不會覆寫已有的值,直接傳回已經存在的值。
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                // 表明map中确實不存在,則設定gMap為最新建立的那個
                gMap = gNewMap;
            }
        }
        // 從MAP中查詢已經存在的Lease資訊 (比如第二次來)
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // 當Lease的對象不為空時。
        if (existingLease != null && (existingLease.getHolder() != null)) {
            // 當instance已經存在是,和用戶端的instance的資訊做比較,時間最新的那個,為有效instance資訊
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // server
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();   // client
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // 這裡隻有當existinglease不存在時,才會進來。 像那種恢複心跳,資訊過期的,都不會進入這裡。
            //  Eureka-Server的自我保護機制做的操作,為每分鐘最大續約數+2 ,同時重新計算每分鐘最小續約數
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        // 建構一個最新的Lease資訊
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 當原來存在Lease的資訊時,設定他的serviceUpTimestamp, 保證服務開啟的時間一直是第一次的那個
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 放入本地Map中
        gMap.put(registrant.getId(), lease);
        // 添加到最近的注冊隊列裡面去,以時間戳作為Key, 名稱作為value,主要是為了運維界面的統計資料。
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        // 分析instanceStatus
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 得到instanceStatus,判斷是否是UP狀态,
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        // 設定注冊類型為添加
        registrant.setActionType(ActionType.ADDED);
        // 租約變更記錄隊列,記錄了執行個體的每次變化, 用于注冊資訊的增量擷取、
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
        // 清理緩存 ,傳入的參數為key
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}
           

      (1) 加鎖,避免多執行個體同時注冊并發問題

      (2)從本地MAP裡面擷取目前執行個體資訊,如果是第一次進入,則Map為空,則建立一個ConcurrentMap,用于存儲對應的執行個體資訊

      (3)建構執行個體相關資訊,加入Map中

      (4)分析執行個體的狀态

      (5)設定注冊類型為添加

      (6)清理緩存

繼續閱讀