01 | 說明
接着上一篇文章,上一篇主要講解了Eureka的搭建過程,以及自動配置類的一些相關配置,本篇重點說明以下内容
1.1 服務端是如何接收用戶端請求的?
02 | 服務端如何接收用戶端請求
2.1 建構Jersey過濾器
1. 該過濾器的作用類似于Spring-MVC,用于攔截rest請求
2. 與Spring-MVC不同的是,Spring-MVC使用攔截器進行請求攔截,而Jersey使用過濾器攔截。
2.2 建構過程
@Bean
//jersey過濾器
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
@Bean
// 構造一個jersey應用,相當于Spring-mvc,建構rest請求
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
false, environment);
// Filter to include only classes that have a particular annotation.
// 過濾Path.class以及Provider.class注解修飾的類
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages)
// 通過指定的包名查找類
Set<Class<?>> classes = new HashSet<>();
for (String basePackage : EUREKA_PACKAGES) {
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
for (BeanDefinition bd : beans) {
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
resourceLoader.getClassLoader());
classes.add(cls);
}
}
// Construct the Jersey ResourceConfig
Map<String, Object> propsAndFeatures = new HashMap<>();
propsAndFeatures.put(
// Skip static content used by the webapp
ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
2.3 處理執行個體的注冊過程
1. 通過上面建構Jersey應用可知,在建構過程中,會對指定的包名進行掃描,通過斷點可知,最終掃描的類主要有九個,其中我們最關心的是ApplicationResource這個類。
用戶端的注冊請求,主要就是在這個ApplicationResource進行攔截及處理。處理的過程如下:
2. 通過請求進入該類的addInstance方法,該方法主要用于判斷請求執行個體的一些基本資訊,包括執行個體ID是否為空,hostName是否為空等
2.4. 啟動執行個體節點注冊器
1. 建構執行個體節點注冊器,用于注冊節點
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications();
// force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
2.5 服務端context初始化
2.5.1 代碼
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
2.5.1 EurekaContext 初始化的過程
1. 通過DefaultEurekaServerContext的initialize()方法進行初始化
2. 通過peerEurekaNodes.start()方法,進行定時的重新整理節點資訊(關于定時重新整理節點資訊,後續補充源碼分析,這裡隻講述有這麼個過程)
3. 通過注冊器的實作類,進行注冊器的初始化 registry.init(peerEurekaNodes) 實作類為:PeerAwareInstanceRegistryImpl
2.6 建構Eureka節點資訊
2.6.1 代碼
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
2.7 啟動EurekaServerBootstrap
2.7.1 代碼
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
2.7.2 初始化
1. 初始化Eureka的環境資訊 initEurekaEnvironment();
2. 初始化Eureka的Context資訊 initEurekaServerContext();
2.8 同步節點資訊的過程分析
2.8.1 同步所有節點資訊
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 擷取所有的節點資訊
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
// 判斷是否可以注冊
if (isRegisterable(instance)) {
// 進行注冊
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
2.8.2 注冊過程說明
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 上隻讀鎖
read.lock();
// 從本地MAP裡面擷取目前執行個體的資訊。
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 增加注冊次數到監控資訊裡面去。
REGISTER.increment(isReplication);
if (gMap == null) {
// 如果第一次進來,那麼gMap為空,則建立一個ConcurrentHashMap放入到registry裡面去
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
// putIfAbsent方法主要是在向ConcurrentHashMap中添加鍵—值對的時候,它會先判斷該鍵值對是否已經存在。
// 如果不存在(新的entry),那麼會向map中添加該鍵值對,并傳回null。
// 如果已經存在,那麼不會覆寫已有的值,直接傳回已經存在的值。
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
// 表明map中确實不存在,則設定gMap為最新建立的那個
gMap = gNewMap;
}
}
// 從MAP中查詢已經存在的Lease資訊 (比如第二次來)
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// 當Lease的對象不為空時。
if (existingLease != null && (existingLease.getHolder() != null)) {
// 當instance已經存在是,和用戶端的instance的資訊做比較,時間最新的那個,為有效instance資訊
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // server
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // client
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 這裡隻有當existinglease不存在時,才會進來。 像那種恢複心跳,資訊過期的,都不會進入這裡。
// Eureka-Server的自我保護機制做的操作,為每分鐘最大續約數+2 ,同時重新計算每分鐘最小續約數
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 建構一個最新的Lease資訊
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 當原來存在Lease的資訊時,設定他的serviceUpTimestamp, 保證服務開啟的時間一直是第一次的那個
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 放入本地Map中
gMap.put(registrant.getId(), lease);
// 添加到最近的注冊隊列裡面去,以時間戳作為Key, 名稱作為value,主要是為了運維界面的統計資料。
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
// 分析instanceStatus
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 得到instanceStatus,判斷是否是UP狀态,
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 設定注冊類型為添加
registrant.setActionType(ActionType.ADDED);
// 租約變更記錄隊列,記錄了執行個體的每次變化, 用于注冊資訊的增量擷取、
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清理緩存 ,傳入的參數為key
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
(1) 加鎖,避免多執行個體同時注冊并發問題
(2)從本地MAP裡面擷取目前執行個體資訊,如果是第一次進入,則Map為空,則建立一個ConcurrentMap,用于存儲對應的執行個體資訊
(3)建構執行個體相關資訊,加入Map中
(4)分析執行個體的狀态
(5)設定注冊類型為添加
(6)清理緩存