这是euraka官网的架构图
从上面图中可以看到eureka的功能
- 服务注册
- 服务续约
- 服务同步
- 服务下线
- 远程调用
一、服务注册
这个服务提供者需需要把自己的实例注册到注册中心中(就相当于相亲时把自己的信息放到婚恋中心),这个时候必须考虑几个因素
- 服务提供者怎么和注册中心通信
- 注册中心如何保存信息
1.1 服务器供者和注册中心通信
eureka利用了一个和springMVC差不多的框架
jersey
,这是一个基于webservice框架,所以他们的通信走的不是http而是webservice。
- 1.X的版本是sun公司提供的独立的jar包,
- 在2.X版本中,已经将jersey融合到JavaSE中,在javax.ws.rs.*包中
我这次看的源码还用的是1.X的版本。
在类
EurekaServerAutoConfiguration
中有jersey的初始化,先分析类
EurekaServerAutoConfiguration
,这个是典型的springboot项目的自动配置类
Jersey主要是@Bean的配置,详细的配置我就不分析了,反正今后也用不上
@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader) {
.......
}
jersey构建好后,类似springmvc有controller,它有Resource,他的功能类似controller.
其中
addInstance
就是服务提供者就是调用该方法进行注册。至此一个可调用的服务就已经构建好了,合适会触发注册接口呢?
eureka中关于服务提供者启动后调用注册接口的这块代码很乱,而且命名也不规范,我第一次读的时候感觉特别困惑,下面我简单梳理一下(详细代码不贴了,主要感觉代码没有spring优雅)
-
中有个利用EurekaClientAutoConfiguration
配置了@Bean
EurekaAutoServiceRegistration
-
因为实现了特殊的接口EurekaAutoServiceRegistration
该接口的目的就是在 applicationContext刷新或者shutdown的时候启动某些东西SmartLifecycle
- 在spring的IOC最后一步
(熟悉spring的应该知道这步是spring 创建context调用方法finishRefresh()
最后一步了),中会调动refresh()
方法,然后步入正轨,start()
方法调用start()
这里感觉还是正常的this.serviceRegistry.register(this.registration)
- 在
中会调用register
,我一开始以为这是个平平无奇的步骤,然后后来发现这里面竟然是注册的入口reg.getEurekaClient()
- 在创建
(这个是具体的而实现类)会调用父类CloudEurekaClient
的构造方法,在构造方法中调用DiscoveryClient
方法,这个方法非常重要重要重要。就是这里面完成注册的。initScheduledTasks
- 他会创建一个异步任务
,然后在InstanceInfoReplicator
方法中调用run
这个时候才完成真正的调用discoveryClient.register();
上面就是调用的整个过程。我就再次分析一下
- 整个过程太过冗长了
- 在整个过程中有2次出现
同名方法,就会让人会晕,不像spring,一层套一层,但是每层都是循环渐进的任务来完成register()
- 最头疼的还是
,整个核心的入口是这个,所以的东西隐藏在构造函数中,这风格如果看静态代码就很不友好reg.getEurekaClient()
1.2 注册中心如何保存(注册表的结构)
上面已经描述了eureka如何构建通信和客户端何时调用注册方法,现在开始分析 eureka的注册方法
整个注册方法主要做了2件事情
- 信息保存到某个节点的注册表
- 复制注册信息到其他注册表
1.2.1 保存信息到某个节点注册表
注册方法整体分析(只保留了核心代码)
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();//读写锁提供更高的并发性
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
if (gMap == null) {
//注册表本身就是一个ConcurrentHashMap
final c<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
//
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
}
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
gMap.put(registrant.getId(), lease);
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
read.unlock();
}
}
整个过程利用了读写锁,在注册的过程中加上读锁,允许线程读,不允许其他线程去写,提供了并发性
注册表利用的是ConcurrentHashMap,结构也非常简单
eureka节点是p2p的,他不保证强一致性,只保证最终一致性,在服务同步的时候也是调用该接口,如果从其他节点同步来的信息没有自己本地的信息新,那么就会放弃这个旧消息
invalidateCache
这句话非常重要,这牵扯到eureka的另外一个特性:二层缓存结构(当然有人也称之为三级缓存,第三级就是注册表,我就不称之为缓存了)
- 第一层:readOnlyCacheMap(本质是guava cache)
- 第二层:readWriteCacheMap(本质是一个ConcurrentHashMap) 取数据一般先去
取数据,如果没有再去
readOnlyCacheMap
取数据,如果
readWriteCacheMap
也没有就向注册表中获取数据
readWriteCacheMap
public void invalidate(Key... keys) {
for (Key key : keys) {
logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(key);
........
}
}
在跟踪
invalidateCache
方法,他只把二级缓存
readWriteCacheMap
失效了,因为是增加服务信息,所以以及缓存也没有,不用失效以及缓存
readOnlyCacheMap
1.2.2 赋值注册信息到其他节点
这个也是服务同步的概念,当你每次注册服务到注册中心的时候,其实只是负载均衡的选择了一个节点,但是注册中心是一个集群的概念,所以必须做好服务同步
- 获取集群中的其他节点信息
- 调用其他节点的注册方法
该过程比较简单,我就不深入分析了
二、服务续约
这个功能相对比较简答,就是心跳,服务的提供一个接口,客户端在初始化的时候搞一个定时器去调用该接口即可
2.1 服务端接口
主要是调用
PeerAwareInstanceRegistryImpl
中的
renew
方法
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
其中
super.renew()
就是调用父类
AbstractInstanceRegistry
中的
renew
方法。该方法的核心极其简单,就是更新
lastUpdateTimestamp
字段的值
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
2.2 服务同步
方法
replicateToPeers
已经分析过了。
三、服务同步
服务同步在很多地方都需要,比如上面的注册接口,续约接口等等。这些情况都是因为客户端做了某些改动进行的同步。
其实还有一个比较重要的情况,比如新加入一个节点或者某个节点下线后又重新上线。显然这属于服务端的同步,所以这个时候需要分析
EurekaServerAutoConfiguration
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
.................
}
这次分析的在
@Import
上,重点类是
EurekaServerInitializerConfiguration
该类实现了SmartLifecycle接口,那么在
finishRefreshContext
的时候回调用他的
start
方法
最终会调用下面2个方法
int registryCount = this.registry.syncUp();//服务同步
this.registry.openForTraffic(this.applicationInfoManager, registryCount);//服务剔除的入口
重点分析syncUp()
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
这个方法其实也很简答,就是循环调用前面分析的
register
方法。
四、服务下线
当一个服务停止了,就不会发送心跳给服务端,那么这个时候服务端就需要一个定时器定时检测,如果超过一定时间没有收到续约那么就必须执行下线操作
在分析服务同步的时候有一段代码是
int registryCount = this.registry.syncUp();//服务同步
this.registry.openForTraffic(this.applicationInfoManager, registryCount);//服务剔除的入口
重点是
openForTraffic
,我把核心代码贴出来
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),//这里就是定时任务
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
下面开始真正的Eviction分析
- 首先获取所有的服务,判断是否过期,如果过期就放入一个集合中
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
//判断过期的方式也很简单
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
- 上面已经过认为过期的都准备剔除,但是eureka做了功能叫自我保护机制,因为不续约也有可能是网络暂时不同导致,所以kafka有一个参数叫剔除的阈值(默认是15%),如果剔除的个数大于15%,那么最大也只剔除15%。
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;//计算剔除的最大阈值
int toEvict = Math.min(expiredLeases.size(), evictionLimit);//这句话就是如果剔除的大于阈值,那么就去最大阈值作为要剔除的个数
if (toEvict > 0) { //说明要剔除的总数不小于0,要执行剔除
//这个还是随机剔除的。
int next = i + random.nextInt(expiredLeases.size() - i);
.......................
}
}
- 开始真正的剔除动作
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
invalidateCache(appName, vip, svip);
} finally {
read.unlock();
}
}
其实也很简单,就是把服务从注册表中移除,然后把二级缓存对应的失效。 为什么不把一级缓存给清楚呢, 如果不清除一级缓存,那么在一级缓存没向二级缓存同步之前,其他客户端调用的时候还是会调用已经不存在的服务。这合理吗?