天天看点

Eureka客户端源码解析 注册/心跳/本地刷新/下线

Eureka服务端与客户端交互是通过发送http请求完成的. 使用JerseyClient进行服务间通讯, Jersey是一个RESTFUL请求服务JAVA框架, 与常规的JAVA编程使用的struts框架类似, 它主要用于处理业务逻辑层.

入口

在Client包中找到启动类 重点EurekaClientAutoConfiguration

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
           

org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaInstanceConfigBean

@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
		ManagementMetadataProvider managementMetadataProvider) {
	//	从配置文件中加载一系列配置
	...
}
           

初始化client

org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration#eurekaClient
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
		EurekaClientConfig config, EurekaInstanceConfig instance,
		@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
	...
	// 初始化client
	CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
			config, this.optionalArgs, this.context);
	cloudEurekaClient.registerHealthCheck(healthCheckHandler);
	return cloudEurekaClient;
}

org.springframework.cloud.netflix.eureka.CloudEurekaClient#CloudEurekaClient
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
		EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
		ApplicationEventPublisher publisher) {
	super(applicationInfoManager, config, args); // 调用父类构造方法
	this.applicationInfoManager = applicationInfoManager;
	this.publisher = publisher;
	this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
			"eurekaTransport");
	ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
           

同步 注册 开启刷新/心跳包

com.netflix.discovery.DiscoveryClient#DiscoveryClient
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
				Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
	// 一系列赋值
	...
	try {
		// 创建可定时执行的线程池 
		scheduler = Executors.newScheduledThreadPool(2,
				new ThreadFactoryBuilder()
						.setNameFormat("DiscoveryClient-%d")
						.setDaemon(true)
						.build());

		// 创建用于发送心跳包的线程池
		heartbeatExecutor = new ThreadPoolExecutor(
				1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(),
				new ThreadFactoryBuilder()
						.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
						.setDaemon(true)
						.build()
		);  // use direct handoff

		// 创建用于刷新本地应用列表的线程池
		cacheRefreshExecutor = new ThreadPoolExecutor(
				1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(),
				new ThreadFactoryBuilder()
						.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
						.setDaemon(true)
						.build()
		);  // use direct handoff
		...
	} catch (Throwable e) {
		throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
	}

	// 从注册中心获取服务列表
	if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
		fetchRegistryFromBackup();
	}
	...
	try {
		if (!register() ) { // 注册服务
			throw new IllegalStateException("Registration error at startup. Invalid server response.");
		}
	} catch (Throwable th) {
		logger.error("Registration error at startup: {}", th.getMessage());
		throw new IllegalStateException(th);
	}
	...
	// 开启执行线程池 发送心跳包 刷新本地应用列表
	initScheduledTasks();
	...
}
           

从服务端同步信息

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
	Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

	try {
		Applications applications = getApplications();

		if (clientConfig.shouldDisableDelta()
				|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
				|| forceFullRegistryFetch
				|| (applications == null)
				|| (applications.getRegisteredApplications().size() == 0)
				|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
		{
			...
			getAndStoreFullRegistry();
		}
		...
	} catch (Throwable e) {
		logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
		return false;
	} finally {
		if (tracer != null) {
			tracer.stop();
		}
	}

	// Notify about cache refresh before updating the instance remote status
	onCacheRefreshed();

	// Update remote status based on refreshed data held in the cache
	updateInstanceRemoteStatus();

	// registry was fetched successfully, so return true
	return true;
}

private void getAndStoreFullRegistry() throws Throwable {
	long currentUpdateGeneration = fetchRegistryGeneration.get();
	Applications apps = null;
	EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
			? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) // 默认执行此方法
			: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
	...
	if (apps == null) {
		logger.error("The application is null for some reason. Not storing this information");
	} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
		// 过滤可用的服务 并打乱存储 AtomicReference<Applications> 底层使用ConcurrentHashMap存储
		localRegionApps.set(this.filterAndShuffle(apps));
		logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
	} else {
		logger.warn("Not updating applications as another thread is updating it already");
	}
}

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#getApplications
public EurekaHttpResponse<Applications> getApplications(String... regions) {
	return getApplicationsInternal("apps/", regions);
}

// 使用jerseyClient发送GET请求
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
	ClientResponse response = null;
	String regionsParamValue = null;
	try {
		WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
		if (regions != null && regions.length > 0) {
			regionsParamValue = StringUtil.join(regions);
			webResource = webResource.queryParam("regions", regionsParamValue);
		}
		Builder requestBuilder = webResource.getRequestBuilder();
		addExtraHeaders(requestBuilder);
		response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

		Applications applications = null;
		if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
			applications = response.getEntity(Applications.class);
		}
		return anEurekaHttpResponse(response.getStatus(), Applications.class)
				.headers(headersOf(response))
				.entity(applications)
				.build();
	} finally {
		if (logger.isDebugEnabled()) {
			logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
					serviceUrl, urlPath,
					regionsParamValue == null ? "" : "regions=" + regionsParamValue,
					response == null ? "N/A" : response.getStatus()
			);
		}
		if (response != null) {
			response.close();
		}
	}
}
           

注册服务

com.netflix.discovery.DiscoveryClient#register
boolean register() throws Throwable {
	...
	httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
	...
}

// POST请求注册服务
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register
public EurekaHttpResponse<Void> register(InstanceInfo info) {
	String urlPath = "apps/" + info.getAppName();
	ClientResponse response = null;
	try {
		Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
		addExtraHeaders(resourceBuilder);
		response = resourceBuilder
				.header("Accept-Encoding", "gzip")
				.type(MediaType.APPLICATION_JSON_TYPE)
				.accept(MediaType.APPLICATION_JSON)
				.post(ClientResponse.class, info);
		return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
	} finally {
		if (logger.isDebugEnabled()) {
			logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
					response == null ? "N/A" : response.getStatus());
		}
		if (response != null) {
			response.close();
		}
	}
}
           

开启线程池

com.netflix.discovery.DiscoveryClient#initScheduledTasks
new TimedSupervisorTask(
		"cacheRefresh",
		scheduler,
		cacheRefreshExecutor,
		registryFetchIntervalSeconds,
		TimeUnit.SECONDS,
		expBackOffBound,
		new CacheRefreshThread() // 本地服务刷新线程
)

new TimedSupervisorTask(
	"heartbeat",
	scheduler,
	heartbeatExecutor,
	renewalIntervalInSecs,
	TimeUnit.SECONDS,
	expBackOffBound,
	new HeartbeatThread() // 心跳线程
)
           

刷新本地服务列表

com.netflix.discovery.DiscoveryClient.CacheRefreshThread
class CacheRefreshThread implements Runnable {
	public void run() {
		refreshRegistry();
	}
}

void refreshRegistry() {
	try {
		...
		boolean success = fetchRegistry(remoteRegionsModified);
		...
	} catch (Throwable e) {
		logger.error("Cannot fetch registry from server", e);
	}
}
           

心跳线程

com.netflix.discovery.DiscoveryClient.HeartbeatThread

private class HeartbeatThread implements Runnable {
	public void run() {
		if (renew()) {
			lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
		}
	}
}

boolean renew() {
	...
	httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
	...
}

// jerseyClient发送PUT心跳请求
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#sendHeartBeat
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
	String urlPath = "apps/" + appName + '/' + id;
	ClientResponse response = null;
	try {
		WebResource webResource = jerseyClient.resource(serviceUrl)
				.path(urlPath)
				.queryParam("status", info.getStatus().toString())
				.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
		if (overriddenStatus != null) {
			webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
		}
		Builder requestBuilder = webResource.getRequestBuilder();
		addExtraHeaders(requestBuilder);
		response = requestBuilder.put(ClientResponse.class);
		EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
		if (response.hasEntity()) {
			eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
		}
		return eurekaResponseBuilder.build();
	} finally {
		if (logger.isDebugEnabled()) {
			logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
		}
		if (response != null) {
			response.close();
		}
	}
}
           

取消服务

PreDestroy 在容器关闭前调用

com.netflix.discovery.DiscoveryClient#shutdown
@PreDestroy 
public synchronized void shutdown() {
	...
	cancelScheduledTasks(); // shutdownNow各线程池
	if (applicationInfoManager != null
			&& clientConfig.shouldRegisterWithEureka()
			&& clientConfig.shouldUnregisterOnShutdown()) {
		applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
		// 发送取消服务请求
		unregister(); 
	}
	...
}

void unregister() {
	// It can be null if shouldRegisterWithEureka == false
	if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
		try {
			logger.info("Unregistering ...");
			EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
			logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
		} catch (Exception e) {
			logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
		}
	}
}

// jerseyClient发送DELETE请求
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#cancel
public EurekaHttpResponse<Void> cancel(String appName, String id) {
	String urlPath = "apps/" + appName + '/' + id;
	ClientResponse response = null;
	try {
		Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
		addExtraHeaders(resourceBuilder);
		response = resourceBuilder.delete(ClientResponse.class);
		return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
	} finally {
		if (logger.isDebugEnabled()) {
			logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
		}
		if (response != null) {
			response.close();
		}
	}
}
           

点此跳转 Eureka服务端源码解析 入口/注册/刷新/下线