系列文章目錄
一.SpringCloud源碼剖析-Eureka核心API
二.SpringCloud源碼剖析-Eureka Client 初始化過程
三.SpringCloud源碼剖析-Eureka服務注冊
四.SpringCloud源碼剖析-Eureka服務發現
五.SpringCloud源碼剖析-Eureka Client服務續約
六.SpringCloud源碼剖析-Eureka Client取消注冊
七.SpringCloud源碼剖析-Eureka Server的自動配置
八.SpringCloud源碼剖析-Eureka Server初始化流程
九.SpringCloud源碼剖析-Eureka Server服務注冊流程
十.SpringCloud源碼剖析-Eureka Server服務續約
十一.SpringCloud源碼剖析-Eureka Server服務系統資料庫拉取
十二.SpringCloud源碼剖析-Eureka Server服務剔除
十三.SpringCloud源碼剖析-Eureka Server服務下線
前言
這一章我們來分析一下Eureka Server 服務系統資料庫的拉取流程,請結合《Eureka Client服務發現》
在《Eureka Client服務發現》我們分析了,用戶端會通過兩種方式從服務端拉取系統資料庫,在用戶端系統啟動的時候會進行全量拉取,随後預設30s/次會進行差異更新,那麼在Eureka Server 服務端是如何處理服務系統資料庫全量拉取和差異更新的呢?
全量拉取
Eureka Client向Eureka Server發請求,拉取服務系統資料庫,Server端還是通過ServeltContainer接待請求,最終交給com.netflix.eureka.resources.ApplicationsResource#getContainers處理
/**
傳回所有的應用
* Get information about all {@link com.netflix.discovery.shared.Applications}.
*/
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
//系統資料庫全量拉取統計計數增加
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
//檢查服務端時候準備好可以被通路
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
//處理傳回的資料類型預設JSON
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
//請求頭麼有指定格式,傳回XML格式
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
//建立緩存key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS, //通過ALL_APPS建構key
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
//這裡判斷是否是GZIP格式,傳回結果的編碼類型不一樣,擷取方式是一緻的
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//如果格式是gzip,調用responseCache.getGZIP(cacheKey)擷取
//底層會從一個ConcurrentMap<Key, Value> readOnlyCacheMap 隻讀緩存中去擷取全量系統資料庫
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
//普通擷取responseCache.get(cacheKey)
//底層會從一個ConcurrentMap<Key, Value> readOnlyCacheMap 隻讀緩存中去擷取全量系統資料庫
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
responseCache.getGZIP(cacheKey)最終會調用 com.netflix.eureka.registry.ResponseCacheImpl#getValue
/**
* Get the payload in both compressed and uncompressed form.
*/
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
//從隻讀緩存中擷取
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
//如果隻讀緩存中擷取不到,從讀寫緩存中擷取
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
差異更新
差異更新也在ApplicationsResource中:com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential,源碼如下
/**
擷取Applications服務系統資料庫中有改變的服務,注冊,取消,狀态更改和過期都會造成服務的改變
* Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.
*
* <p>
* The delta changes represent the registry information change for a period
* as configured by
* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The
* changes that can happen in a registry include
* <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally
* the changes to the registry are infrequent and hence getting just the
* delta will be much more efficient than getting the complete registry.
* </p>
*
* <p>
* Since the delta information is cached over a period of time, the requests
* may return the same data multiple times within the window configured by
* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients
* are expected to handle this duplicate information.
* <p>
*
* @param version the version of the request.
* @param acceptHeader the accept header to indicate whether to serve JSON or XML data.
* @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
* @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
* @param uriInfo the {@link java.net.URI} information of the request made.
* @return response containing the delta information of the
* {@link AbstractInstanceRegistry}.
*/
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
//如果禁用了Delta系統資料庫差異化拉取,或者服務不可通路,傳回拒絕
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
CurrentRequestVersion.set(Version.toEnum(version));
//處理反會的資料格式JSON預設
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
//建構緩存key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA, //通過ALL_APPS_DELTA建構key
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
//從responseCache擷取内容
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
總結
Eureka Server 拉取服務系統資料庫的邏輯還是比較簡單的,不管是全量拉取,還是差别拉取都是通過ApplicationsResource中處理,然後建構出不同的key,從ResponseCache中去擷取服務。