我們在看流程前看下下面的一個類圖,其中client的流程基本都在DiscoveryClient的構造方法中,而eureka-server的邏輯大部分在PeerAwareInstanceRegistryImpl和PeerEurekaNodes中。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX6lkaONTUU10MNRkTmJ1MjxmSXFGb4dVYshnMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2LcRHelR3LcJzLctmch1mclRXY39DM2YDM1YTM4EzNygDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
這裡再介紹下InstanceInfo和EurekClient的差別, 主要用于标示這個應用的執行個體資訊,如應用名稱、consumer(provider)等,而EurekClient就是一個用戶端的資訊,如位址、注冊資訊等
那麼eureka-client啟動之前,我們先理一下它應該需要做那些事情。
1、告訴eureka-server(注冊)。
2、拉取eureka-server上面的注冊資訊緩存到本地,實作高可用。
3、同步client的狀态給eureka-server(心跳等)。
一、首先介紹下client向eureka-server注冊的實作:
那麼我們先看eureka-client的注冊是怎麼實作的,并不是直接注冊的,而是通過背景線程是實作的,看下代碼:
//DiscoveryClient.java
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//代碼省略...
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//代碼省略...
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
//代碼省略...
}
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
//省略代碼...
// InstanceInfo replicator建立 應用執行個體資訊複制器
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//省略代碼...
//開啟應用執行個體資訊複制器
instanceInfoReplicator.start(clientConfig.
getInitialInstanceInfoReplicationIntervalSeconds());
}
在看下instanceInfoReplicator是如何實作注冊的,繼續看它的start方法:
public void start(int initialDelayMs) {
//
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
//定時任務
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
//onDemandUpdate方法使用
scheduledPeriodicRef.set(next);
}
}
看它的run方法:
public void run() {
try {
//重新整理執行個體資訊
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
//是否已經注冊過,
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
//schedule
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
到這裡終于看到register方法了,不過在register之前有個refreshInstanceInfo方法,是用于重新整理應用執行個體資訊,跟進代碼後我們發現了這些應用執行個體資訊的
hostName
、
ipAddr等的
屬性的變化。因為IP變了後服務是無法知曉的,隻能這個定時任務是随時變化。
下面我們看register方法,代碼如下:
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
無非就是調用
AbstractJerseyEurekaHttpClient#register(...)
方法,請求 Eureka-Server 的
apps/${APP_NAME}
接口,參數為 InstanceInfo ,實作注冊執行個體資訊的注冊。
補上一點,當refreshInstanceInfo方法刷到status變化時會被喚醒notify(),然後調用onDemandUpdate()告訴server修改對應的client資訊。
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
//狀态變化,通過server
instanceInfoReplicator.onDemandUpdate();
}
};
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
//取消任務
latestPeriodic.cancel(false);
}
//再次拉起
InstanceInfoReplicator.this.run();
}
});
return true;
//代碼省略。。。
}
二、下面我們看下server端是如何接受這個注冊請求的,請求的路徑是:"apps/" + info.getAppName(),也就是apps/${APP_NAME}。
是在com.netflix.eureka.resources.ApplicationResource中
ApplicationResource#addInstance()
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
//省略驗證的代碼。。。
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//注冊isReplication用來區分是eureka-server節點同步還是直接注冊的
super.register(info, leaseDuration, isReplication);
//同步到對應的節點
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
最終進入了AbstractInstanceRegistry中的registry的Map中,
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<>();
不過其中使用了Lease(契約),非常nice(有興趣可以自行了解下~),心跳、下線後續會補充上來。
有什麼不對的地方還請幫忙指正!