天天看點

十一.SpringCloud源碼剖析-Eureka Server服務系統資料庫拉取

系列文章目錄

一.SpringCloud源碼剖析-Eureka核心API

二.SpringCloud源碼剖析-Eureka Client 初始化過程

三.SpringCloud源碼剖析-Eureka服務注冊

四.SpringCloud源碼剖析-Eureka服務發現

五.SpringCloud源碼剖析-Eureka Client服務續約

六.SpringCloud源碼剖析-Eureka Client取消注冊

七.SpringCloud源碼剖析-Eureka Server的自動配置

八.SpringCloud源碼剖析-Eureka Server初始化流程

九.SpringCloud源碼剖析-Eureka Server服務注冊流程

十.SpringCloud源碼剖析-Eureka Server服務續約

十一.SpringCloud源碼剖析-Eureka Server服務系統資料庫拉取

十二.SpringCloud源碼剖析-Eureka Server服務剔除

十三.SpringCloud源碼剖析-Eureka Server服務下線

前言

這一章我們來分析一下Eureka Server 服務系統資料庫的拉取流程,請結合《Eureka Client服務發現》

在《Eureka Client服務發現》我們分析了,用戶端會通過兩種方式從服務端拉取系統資料庫,在用戶端系統啟動的時候會進行全量拉取,随後預設30s/次會進行差異更新,那麼在Eureka Server 服務端是如何處理服務系統資料庫全量拉取和差異更新的呢?

全量拉取

Eureka Client向Eureka Server發請求,拉取服務系統資料庫,Server端還是通過ServeltContainer接待請求,最終交給com.netflix.eureka.resources.ApplicationsResource#getContainers處理

/**
	傳回所有的應用
     * Get information about all {@link com.netflix.discovery.shared.Applications}.
*/
 @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
       		 //系統資料庫全量拉取統計計數增加
            EurekaMonitors.GET_ALL.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }
		
        // Check if the server allows the access to the registry. The server can
        // restrict access if it is not
        // ready to serve traffic depending on various reasons.
        //檢查服務端時候準備好可以被通路
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
        CurrentRequestVersion.set(Version.toEnum(version));
        //處理傳回的資料類型預設JSON
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        	//請求頭麼有指定格式,傳回XML格式
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
		//建立緩存key
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,	//通過ALL_APPS建構key 
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        Response response;
 		//這裡判斷是否是GZIP格式,傳回結果的編碼類型不一樣,擷取方式是一緻的
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        	//如果格式是gzip,調用responseCache.getGZIP(cacheKey)擷取
        	//底層會從一個ConcurrentMap<Key, Value> readOnlyCacheMap 隻讀緩存中去擷取全量系統資料庫
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
        	//普通擷取responseCache.get(cacheKey)
        	//底層會從一個ConcurrentMap<Key, Value> readOnlyCacheMap 隻讀緩存中去擷取全量系統資料庫
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        return response;
    }

           

responseCache.getGZIP(cacheKey)最終會調用 com.netflix.eureka.registry.ResponseCacheImpl#getValue

/**
     * Get the payload in both compressed and uncompressed form.
     */
    @VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
            	//從隻讀緩存中擷取
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                	//如果隻讀緩存中擷取不到,從讀寫緩存中擷取
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }
           

差異更新

差異更新也在ApplicationsResource中:com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential,源碼如下

/**
  		擷取Applications服務系統資料庫中有改變的服務,注冊,取消,狀态更改和過期都會造成服務的改變
     * Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.
     *
     * <p>
     * The delta changes represent the registry information change for a period
     * as configured by
     * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The
     * changes that can happen in a registry include
     * <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally
     * the changes to the registry are infrequent and hence getting just the
     * delta will be much more efficient than getting the complete registry.
     * </p>
     *
     * <p>
     * Since the delta information is cached over a period of time, the requests
     * may return the same data multiple times within the window configured by
     * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients
     * are expected to handle this duplicate information.
     * <p>
     *
     * @param version the version of the request.
     * @param acceptHeader the accept header to indicate whether to serve  JSON or XML data.
     * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
     * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
     * @param uriInfo  the {@link java.net.URI} information of the request made.
     * @return response containing the delta information of the
     *         {@link AbstractInstanceRegistry}.
     */
    @Path("delta")
    @GET
    public Response getContainerDifferential(
            @PathParam("version") String version,
            @HeaderParam(HEADER_ACCEPT) String acceptHeader,
            @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
            @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
            @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

        // If the delta flag is disabled in discovery or if the lease expiration
        // has been disabled, redirect clients to get all instances
        //如果禁用了Delta系統資料庫差異化拉取,或者服務不可通路,傳回拒絕
        if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
            return Response.status(Status.FORBIDDEN).build();
        }

        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL_DELTA.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
        }

        CurrentRequestVersion.set(Version.toEnum(version));
        //處理反會的資料格式JSON預設
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
		//建構緩存key
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS_DELTA,	//通過ALL_APPS_DELTA建構key 
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        if (acceptEncoding != null
                && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                //從responseCache擷取内容
            return Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            return Response.ok(responseCache.get(cacheKey))
                    .build();
        }
    }
           

總結

Eureka Server 拉取服務系統資料庫的邏輯還是比較簡單的,不管是全量拉取,還是差别拉取都是通過ApplicationsResource中處理,然後建構出不同的key,從ResponseCache中去擷取服務。