天天看點

Spring Cloud Eureka源碼分析 --- client 注冊流程

Eureka Client 是一個Java 用戶端,用于簡化與Eureka Server的互動,用戶端同時也具備一個内置的、使用輪詢負載算法的負載均衡器。

在應用啟動後,将會向Eureka Server發送心跳(預設周期為30秒),如果Eureka Server在多個心跳周期沒有收到某個節點的心跳,Eureka Server 将會從服務系統資料庫中把這個服務節點移除(預設90秒)。

Eureka Client具有緩存的機制,即使所有的Eureka Server 都挂掉的話,用戶端依然可以利用緩存中的資訊消費其它服務的API。下面我們一起來看用戶端相關操作。

1.從啟動類入手

我們還是和分析 Eureka Server 源碼一樣,從啟動類的

@EnableDiscoveryClient

注解入手看調用流程。

進入 EnableDiscoveryClient 之後,通過注釋知道它的作用是為了激活 DiscoveryClient:

首先是在類頭使用了 import 注解引入了:EnableDiscoveryClientImportSelector。該類的主要作用是執行個體化:AutoServiceRegistrationConfiguration。

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
		extends SpringFactoryImportSelector<EnableDiscoveryClient> {

	@Override
	public String[] selectImports(AnnotationMetadata metadata) {
         //調用父類的方法,拿到通過父類方法要注入的全路徑類名數組
		String[] imports = super.selectImports(metadata);
		//獲得該注解(@EnableDiscoveryClient)的所有屬性參數
		AnnotationAttributes attributes = AnnotationAttributes.fromMap(
				metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
		 //獲得屬性autoRegister的值,該值預設是true的
		boolean autoRegister = attributes.getBoolean("autoRegister");
		//根據注解配置來判斷是否要執行個體化下面的那個自動配置類
		if (autoRegister) {
			List<String> importsList = new ArrayList<>(Arrays.asList(imports));
			importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
			imports = importsList.toArray(new String[0]);
		} else {
			Environment env = getEnvironment();
			if(ConfigurableEnvironment.class.isInstance(env)) {
				ConfigurableEnvironment configEnv = (ConfigurableEnvironment)env;
				LinkedHashMap<String, Object> map = new LinkedHashMap<>();
				map.put("spring.cloud.service-registry.auto-registration.enabled", false);
				MapPropertySource propertySource = new MapPropertySource(
						"springCloudDiscoveryClient", map);
				configEnv.getPropertySources().addLast(propertySource);
			}

		}

		return imports;
	}

	@Override
	protected boolean isEnabled() {
		return getEnvironment().getProperty(
				"spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE);
	}

	@Override
	protected boolean hasDefaultFactory() {
		return true;
	}

}

           

這裡最終的目的是想執行個體化:AutoServiceRegistrationConfiguration,我們來看他做了什麼:

@Configuration
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration {

	@Autowired(required = false)
	private AutoServiceRegistration autoServiceRegistration;

	@Autowired
	private AutoServiceRegistrationProperties properties;

	@PostConstruct
	protected void init() {
		if (autoServiceRegistration == null && this.properties.isFailFast()) {
			throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
		}
	}
}
           

從這裡看主要目的是為了執行個體化:AutoServiceRegistration,AutoServiceRegistrationProperties這兩個類。那麼初始化這兩個bean的作用是什麼呢,檢視調用 的地方:

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
    ......
    ......
    ......
    @Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
	public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) {
		return EurekaRegistration.builder(instanceConfig)
				.with(applicationInfoManager)
				.with(eurekaClient)
				.with(healthCheckHandler)
				.build();
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
	public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
		return new EurekaAutoServiceRegistration(context, registry, registration);
	}   
    ......
    ......
    ......
}
           

原因是在這裡執行個體化bean的時候被做為前置條件。

EurekaClientAutoConfiguration 算是到目前為止比較重要的一個類,主要做的事情包括:

  1. 注冊 EurekaClientConfigBean ,初始化client端配置資訊;
  2. 注冊 EurekaInstanceConfigBean ,初始化用戶端執行個體資訊;
  3. 初始化 EurekaRegistration,EurekaServiceRegistry,EurekaAutoServiceRegistration實作Eureka服務自動注冊;
  4. 初始化 EurekaClient ,ApplicationInfoManager。EurekaClient 的預設實作是 DiscoveryClient,是我們接下來要分析的重點;
  5. 初始化 EurekaHealthIndicator,為

    /health

    端點提供Eureka相關資訊,主要有Status目前執行個體狀态和applications服務清單。

繼續看 EurekaClientAutoConfiguration 又在哪裡被使用:

@ConditionalOnClass(ConfigServicePropertySourceLocator.class)
@ConditionalOnProperty(value = "spring.cloud.config.discovery.enabled", matchIfMissing = false)
@Configuration
@Import({ EurekaDiscoveryClientConfiguration.class, // this emulates @EnableDiscoveryClient, the import selector doesn't run before the bootstrap phase
		EurekaClientAutoConfiguration.class })
public class EurekaDiscoveryClientConfigServiceBootstrapConfiguration {
}
           

在 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 類中被作為注入的對象。

而 EurekaDiscoveryClientConfigServiceBootstrapConfiguration 被引用的分地方就比較特殊,被配置在配置檔案中。

spring.factories

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
           

這個配置的Key部分對應着一個注解類 BootstrapConfiguration:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BootstrapConfiguration {

	/**
	 * Exclude specific auto-configuration classes such that they will never be applied.
	 */
	Class<?>[] exclude() default {};

}
           

他被使用的地方是:BootstrapApplicationListener 的 164行,在這裡拿到類的全路徑之後,186行進行加載類。

public class BootstrapApplicationListener
		implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered {
    
    ......
    @Override
	public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
		ConfigurableEnvironment environment = event.getEnvironment();
		......
		ConfigurableApplicationContext context = null;
		String configName = environment
				.resolvePlaceholders("${spring.cloud.bootstrap.name:bootstrap}");
		for (ApplicationContextInitializer<?> initializer : event.getSpringApplication()
				.getInitializers()) {
			if (initializer instanceof ParentContextApplicationContextInitializer) {
				context = findBootstrapContext(
						(ParentContextApplicationContextInitializer) initializer,
						configName);
			}
		}
		if (context == null) {
            //在這裡被調用
			context = bootstrapServiceContext(environment, event.getSpringApplication(),
					configName);
		}
		apply(context, event.getSpringApplication(), environment);
	}
    
    private ConfigurableApplicationContext bootstrapServiceContext(
        ConfigurableEnvironment environment, final SpringApplication application,
        String configName) {
        StandardEnvironment bootstrapEnvironment = new StandardEnvironment();
        MutablePropertySources bootstrapProperties = bootstrapEnvironment
            .getPropertySources();
        for (PropertySource<?> source : bootstrapProperties) {
            bootstrapProperties.remove(source.getName());
        }
        String configLocation = environment
            .resolvePlaceholders("${spring.cloud.bootstrap.location:}");
        Map<String, Object> bootstrapMap = new HashMap<>();
        bootstrapMap.put("spring.config.name", configName);
        // if an app (or test) uses spring.main.web-application-type=reactive, bootstrap will fail
        // force the environment to use none, because if though it is set below in the builder
        // the environment overrides it
        bootstrapMap.put("spring.main.web-application-type", "none");
        if (StringUtils.hasText(configLocation)) {
            bootstrapMap.put("spring.config.location", configLocation);
        }
        bootstrapProperties.addFirst(
            new MapPropertySource(BOOTSTRAP_PROPERTY_SOURCE_NAME, bootstrapMap));
        for (PropertySource<?> source : environment.getPropertySources()) {
            if (source instanceof StubPropertySource) {
                continue;
            }
            bootstrapProperties.addLast(source);
        }
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        // 在這裡掃描BootstrapConfiguration注解
        List<String> names = new ArrayList<>(SpringFactoriesLoader
                                             .loadFactoryNames(BootstrapConfiguration.class, classLoader));
        for (String name : StringUtils.commaDelimitedListToStringArray(
            environment.getProperty("spring.cloud.bootstrap.sources", ""))) {
            names.add(name);
        }
        // TODO: is it possible or sensible to share a ResourceLoader?
        SpringApplicationBuilder builder = new SpringApplicationBuilder()
            .profiles(environment.getActiveProfiles()).bannerMode(Mode.OFF)
            .environment(bootstrapEnvironment)
            // Don't use the default properties in this builder
            .registerShutdownHook(false).logStartupInfo(false)
            .web(WebApplicationType.NONE);
        if (environment.getPropertySources().contains("refreshArgs")) {
            // If we are doing a context refresh, really we only want to refresh the
            // Environment, and there are some toxic listeners (like the
            // LoggingApplicationListener) that affect global static state, so we need a
            // way to switch those off.
            builder.application()
                .setListeners(filterListeners(builder.application().getListeners()));
        }
        List<Class<?>> sources = new ArrayList<>();
        for (String name : names) {
            Class<?> cls = ClassUtils.resolveClassName(name, null);
            try {
                cls.getDeclaredAnnotations();
            }
            catch (Exception e) {
                continue;
            }
            sources.add(cls);
        }
        AnnotationAwareOrderComparator.sort(sources);
        builder.sources(sources.toArray(new Class[sources.size()]));
        final ConfigurableApplicationContext context = builder.run();
        // gh-214 using spring.application.name=bootstrap to set the context id via
        // `ContextIdApplicationContextInitializer` prevents apps from getting the actual
        // spring.application.name
        // during the bootstrap phase.
        context.setId("bootstrap");
        // Make the bootstrap context a parent of the app context
        addAncestorInitializer(application, context);
        // It only has properties in it now that we don't want in the parent so remove
        // it (and it will be added back later)
        bootstrapProperties.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME);
        mergeDefaultProperties(environment.getPropertySources(), bootstrapProperties);
        return context;
    }
    
    
    ......
    ......
      
}
           

BootstrapApplicationListener 實作了ApplicationEnvironmentPreparedEvent,作為監聽器在項目啟動的時候被加載。Spring根據應用啟動的過程,提供了四種事件供我們使用:

  • ApplicationStartedEvent :Spring Boot啟動開始時執行的事件;
  • ApplicationEnvironmentPreparedEvent:Spring Boot 對應Enviroment已經準備完畢,但此時上下文context還沒有建立;
  • ApplicationPreparedEvent:Spring Boot 上下文context建立完成,但此時spring中的bean是沒有完全加載完成的;
  • ApplicationFailedEvent:Spring Boot 啟動異常時執行事件。

即這裡的BootstrapApplicationListener 是在項目啟動加載環境變量完成,還沒有建立bean的時候去加載的。

分析到這裡,我們把整個的EnableDiscoveryClient注解的初始化鍊路都走了一遍。大緻流程如下:

總結上面分析的部分主要兩個作用:

  1. 初始化配置檔案;
  2. 激活 DiscoveryClient。

下面就開始分析DiscoveryClient的作用。

2. DiscoveryClient

啟動用戶端的時候檢視啟動日志你會看到服務注冊也是從 DiscoveryClient 類中發出的:

足以見得這個類在服務注冊過程中應該做了一些重要的事情。下面一起來分析一下具體實作。

2.1 服務注冊

DiscoveryClient 是一個接口,繼續觀看它的實作類,可以看到每個實作類中都有一個:DESCRIPTION字段,這個字段明确描述了目前類的作用。

  1. EurekaDiscoveryClient:client 的主要實作邏輯類;
  2. CompositeDiscoveryClient:會裝載别的服務注冊用戶端,順序查找;
  3. NoopDiscoveryClient:已經被廢棄;
  4. SimpleDiscoveryClient:具體的服務執行個體從 SimpleDiscoveryProperties 配置中擷取。

從描述上看 EurekaDiscoveryClient 是 client 的主要實作類。而在 EurekaDiscoveryClient 中,擷取client執行個體主要是從 EurekaClient 中查找的:

@Override
public List<ServiceInstance> getInstances(String serviceId) {
    List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,false);
    List<ServiceInstance> instances = new ArrayList<>();
    for (InstanceInfo info : infos) {
        instances.add(new EurekaServiceInstance(info));
    }
    return instances;
}
           

DiscoveryClient 是 EurekaClient 的唯一實作類,他有一個很重要的構造方法:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
        this.preRegistrationHandler = args.preRegistrationHandler;
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
        this.preRegistrationHandler = null;
    }

    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    this.backupRegistryProvider = backupRegistryProvider;

    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
	//上面主要是初始化一些參數
    //如果 shouldFetchRegistry= true,注冊監控
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }
	//如果shouldRegisterWithEureka=true,注冊監控
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
	//如果shouldRegisterWithEureka = false && shouldFetchRegistry=false
    //就不做初始化的工作,直接傳回
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());

        return;  // no need to setup up an network tasks and we are done
    }
	//從這裡開始建立各種任務的線程池
    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        //建立定時線程池,線程數量為2個,分别用來維持心跳連接配接和重新整理其他eureka client執行個體緩存
        scheduler = Executors.newScheduledThreadPool(2,
                                                     new ThreadFactoryBuilder()
                                                     .setNameFormat("DiscoveryClient-%d")
                                                     .setDaemon(true)
                                                     .build());
		//建立一個線程池,線程池大小預設為2個,用來維持心跳連接配接
        heartbeatExecutor = new ThreadPoolExecutor(
            1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
            .setDaemon(true)
            .build()
        );  // use direct handoff
		//建立一個線程池,線程池大小預設為2個,用來重新整理其他eureka client執行個體緩存
        cacheRefreshExecutor = new ThreadPoolExecutor(
            1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactoryBuilder()
            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
            .setDaemon(true)
            .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);
		......
        ......
        ......
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }
     //抓取遠端執行個體注冊資訊,fetchRegistry()方法裡的參數,這裡為false,意思是要不要強制抓取所有執行個體注冊資訊
    //這裡擷取注冊資訊,分兩種方式,一種是全量擷取,另一種是增量擷取,預設是增量擷取
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        //如果配置的是要擷取執行個體注冊資訊,但是從遠端擷取失敗,從備份擷取執行個體注冊資訊
        fetchRegistryFromBackup();
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }
        //如果client配置注冊到eureka server 且 強制 初始化就注冊到eureka 那麼就注冊到eureka server,預設是不初始化就注冊到eureka
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    //初始化維持心跳連接配接、更新注冊資訊緩存的定時任務
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
}
           

初始化的過程主要做了兩件事:

  1. 建立了 scheduler 定時任務的線程池,heartbeatExecutor 心跳檢查線程池(服務續約),cacheRefreshExecutor 服務擷取線程池 ;
  2. 調用

    initScheduledTasks()

    方法開啟線程池,往上面3個線程池分别添加相應任務。然後建立了一個

    instanceInfoReplicator(Runnable任務)

    ,然後調用

    InstanceInfoReplicator.start

    方法,把這個任務放進上面scheduler定時任務線程池(服務注冊并更新)。

接着看

initScheduledTasks

做了哪些事情 :

private void initScheduledTasks() {
    //擷取服務清單資訊
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        //擷取預設的注冊頻率資訊,預設30S
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        //如果緩存重新整理逾時,下一次執行的delay最大是registryFetchIntervalSeconds的幾倍(預設10),預設每次執行是上一次的2倍
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        //執行CacheRefreshThread,服務清單緩存重新整理任務
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
	//注冊到eureka server
    if (clientConfig.shouldRegisterWithEureka()) {
        //續租時間間隔,預設30s
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 如果心跳任務逾時,下一次執行的delay最大是renewalIntervalInSecs的幾倍(預設10),預設每次執行是上一次的2倍
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        //執行HeartbeatThread,發送心跳資料
        scheduler.schedule(
            new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
            ),
            renewalIntervalInSecs, TimeUnit.SECONDS);

        // 用戶端執行個體資訊複制
        instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize
		//注冊監聽器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                    InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
		//進行服務重新整理
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}
           

總的來說

initScheduledTasks()

做了以下幾件事:

  • 如果shouldFetchRegistry=true,即要從Eureka Server擷取服務清單:

    啟動重新整理服務清單定時線程(DiscoveryClient-CacheRefreshExecutor-%d),預設registryFetchIntervalSeconds=30s執行一次,任務為

    CacheRefreshThread

    ,即從Eureka Server擷取服務清單,也重新整理用戶端緩存。
  • 如果shouldRegisterWithEureka=true,即要注冊到Eureka Server。

    啟動heartbeat心跳定時線程(DiscoveryClient-HeartbeatExecutor-%d),預設renewalIntervalInSecs=30s續約一次,任務為

    HeartbeatThread

    ,即用戶端向Eureka Server發送心跳;

    啟動InstanceInfo複制器定時線程(DiscoveryClient-InstanceInfoReplicator-%d),開啟定時線程檢查目前Instance的DataCenterInfo、LeaseInfo、InstanceStatus,如果發現變更就執行

    discoveryClient.register()

    ,将執行個體資訊同步到Server端。

上面有一個需要關注的點是:InstanceInfoReplicator。它會去定時重新整理用戶端執行個體的最新資訊:目前執行個體最新資料,租約資訊,執行個體狀态。InstanceInfoReplicator 是一個線程類,關注 run()方法:

public void run() {
    try {
        /**
         * 重新整理 InstanceInfo
         * 1、重新整理 DataCenterInfo
         * 2、重新整理 LeaseInfo 租約資訊
         * 3、根據HealthCheckHandler擷取InstanceStatus,并更新,如果狀态發生變化會觸發所有StatusChangeListener
         */
        discoveryClient.refreshInstanceInfo();
		//重新整理完之後,目前服務有變更,還未同步給server,發起注冊
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            //發起注冊
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}
           

看一下

register()

的實作:

/**
     * Register with the eureka service by making the appropriate REST call.
     * 使用http的方式注冊eureka服務
     */
boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}
           

往下跟蹤到

RestTemplateEurekaHttpClient

類:

public class RestTemplateEurekaHttpClient implements EurekaHttpClient {

	protected final Log logger = LogFactory.getLog(getClass());

	private RestTemplate restTemplate;
	private String serviceUrl;

	public RestTemplateEurekaHttpClient(RestTemplate restTemplate, String serviceUrl) {
		this.restTemplate = restTemplate;
		this.serviceUrl = serviceUrl;
		if (!serviceUrl.endsWith("/")) {
			this.serviceUrl = this.serviceUrl+"/";
		}
	}
    
    @Override
	public EurekaHttpResponse<Void> register(InstanceInfo info) {
		String urlPath = serviceUrl + "apps/" + info.getAppName();

		HttpHeaders headers = new HttpHeaders();
		headers.add(HttpHeaders.ACCEPT_ENCODING, "gzip");
		headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);

		ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.POST,
				new HttpEntity<>(info, headers), Void.class);

		return anEurekaHttpResponse(response.getStatusCodeValue())
				.headers(headersOf(response)).build();
	}
    
    ......
    ......
    ......
        
}
           

封裝了

RestTemplate

http client 模闆方法,給 server 端發送一個post 請求。是以啟動 client 的時候,向服務端發送注冊請求的地方就在這裡。

2.2 服務續約

服務續約的入口在DiscoveryClient 類initScheduledTasks()方法的heartBeat timer定時器任務中:

// Heartbeat timer
//開啟定時任務每隔30s發送一次 心跳請求
scheduler.schedule(
    new TimedSupervisorTask(
        "heartbeat",
        scheduler,
        heartbeatExecutor,
        renewalIntervalInSecs,
        TimeUnit.SECONDS,
        expBackOffBound,
        new HeartbeatThread()
    ),
    renewalIntervalInSecs, TimeUnit.SECONDS);

	/**
     * The heartbeat task that renews the lease in the given intervals.
     */
private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

	/**
     * Renew with the eureka service by making the appropriate REST call
     */
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id,
                                                      InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = serviceUrl + "apps/" + appName + '/' + id + "?status="
        + info.getStatus().toString() + "&lastDirtyTimestamp="
        + info.getLastDirtyTimestamp().toString() + (overriddenStatus != null
                                                     ? "&overriddenstatus=" + overriddenStatus.name() : "");

    ResponseEntity<InstanceInfo> response = restTemplate.exchange(urlPath,
                                                                  HttpMethod.PUT, null, InstanceInfo.class);

    EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(
        response.getStatusCodeValue(), InstanceInfo.class)
        .headers(headersOf(response));

    if (response.hasBody())
        eurekaResponseBuilder.entity(response.getBody());

    return eurekaResponseBuilder.build();
}
           

上面貼出來了用戶端發送心跳請求的完整調用過程,每隔30s用戶端向服務端發送一次請求,向服務端重新注冊自己。

2.3 服務下線

服務下線比較好了解,在服務關閉的時候取消本機的各種定時任務,給服務端發送請求告知自己下線。

/**
     * Shuts down Eureka Client. Also sends a deregistration request to the
     * eureka server.
     */
@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
		//取消各種定時任務
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
            && clientConfig.shouldRegisterWithEureka()
            && clientConfig.shouldUnregisterOnShutdown()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            //向服務端發送請求告知自己下線
            unregister();
        }

        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }
		//關閉監控
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        logger.info("Completed shut down of DiscoveryClient");
    }
}


	/**
     * unregister w/ the eureka service.
     */
void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
        }
    }
}



@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
    String urlPath = serviceUrl + "apps/" + appName + '/' + id;

    ResponseEntity<Void> response = restTemplate.exchange(urlPath, HttpMethod.DELETE,
                                                          null, Void.class);

    return anEurekaHttpResponse(response.getStatusCodeValue())
        .headers(headersOf(response)).build();
}

           
2.4 服務擷取 和 服務重新整理

服務啟動的時候會去服務端全量拉取所有已經注冊過的其餘client執行個體資訊,增量的時候就是在

initScheduledTasks()

方法中每30s增量跑一次。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    
    ......
    ......
    ......
}

	/**
     * The task that fetches the registry information at specified intervals.
     *
     */
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

@VisibleForTesting
void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        ......
            ......
            ......

            boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        ......
            ......
            ......
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }        
}

	
	/**
     * Fetches the registry information.
     *
     * <p>
     * This method tries to get only deltas after the first fetch unless there
     * is an issue in reconciling eureka server and client registry information.
     * </p>
     *
     * @param forceFullRegistryFetch Forces a full registry fetch.
     *
     * @return true if the registry was fetched
     */
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // 如果現在增量服務擷取不可用,或者是第一次擷取服務的時候,拉去所有的應用
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
            || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}
           

用戶端拉取服務端儲存的所有用戶端節點資訊儲存時間為3分鐘,Eureka client取得的資料雖然是增量更新,仍然可能和30秒前取的資料一樣,是以Eureka client要自己來處理重複資訊。

另外,注意到在

fetchRegistry()

方法中:

applications.setAppsHashCode(applications.getReconcileHashCode());
           

每次增量更新,服務端都會帶過來一個一緻性hash碼。Eureka client的增量更新,其實擷取的是Eureka server最近三分鐘内的變更,如果Eureka client有超過三分鐘沒有做增量更新的話(例如網絡問題),這就造成了Eureka server和Eureka client之間的資料不一緻。正常情況下,Eureka client多次增量更新後,最終的服務清單資料應該Eureka server保持一緻,但如果期間發生異常,可能導緻和Eureka server的資料不一緻,為了暴露這個問題,Eureka server每次傳回的增量更新資料中,會帶有一緻性哈希碼,Eureka client用本地服務清單資料算出的一緻性哈希碼應該和Eureka server傳回的一緻,若不一緻就證明增量更新出了問題導緻Eureka client和Eureka server上的服務清單資訊不一緻了,此時需要全量更新。

關于用戶端的代碼分析就到這裡,本篇主要從兩個角度去分析:

  1. 從啟動類入手,檢視初始化了什麼;
  2. 從啟動日志入手,檢視啟動類做了什麼。

如果大家有更好的分析角度,可以一起探讨,讓我們踩着巨人的肩膀越走越遠。