Eureka客户端源码-实例配置更新
客户端配置更新
在初始化DiscoveryClient的时候,会调用方法initScheduledTasks() 在方法中有以下代码
// InstanceInfo replicator 创建InstanceInfo复制器
// instanceInfoReplicator实现了Runnable接口
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
//爆点
2); // burstSize
//状态变化监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
//是否按需更新,默认为true
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
//注册监听器
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//启动
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
InstanceInfoReplicator#start()方法详解
需要注意的是:调度的时候是一次调度的,在调度完成之后的finally方法中会在重新进行调度
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
//开始调度,调度的是自身 也就是运行InstanceInfoReplicator中的run方法
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public void run() {
try {
//刷新实例信息
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
//如果客户端本地的缓存是脏的,则发起注册,并改为非脏的
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
} finally {
//开始调度
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
执行刷新本地实例信息
//刷新本地缓存的instanceInfo
void refreshInstanceInfo() {
//下面两种刷新信息的方式都是更新本地的instanceInfo信息,并设置为脏的并且更新最后脏的时间戳
//验证hostname是否变化,如果变化则在下一次心跳的时候会把DataCenterInfo传到EurekaServer
applicationInfoManager.refreshDataCenterInfoIfRequired();
//刷新租约信息,客户端可以修改心跳间隔时间、实例驱逐时间(多长时间没有收到心跳,服务端移除实例)
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
//状态变更处理
applicationInfoManager.setInstanceStatus(status);
}
}
当状态变更的时候,之前的监听器可以监听到状态的变更
//限流器
private final RateLimiter rateLimiter;
//爆点 固定值为2
private final int burstSize;
//允许每分钟修改instance的的频率
private final int allowedRatePerMinute;
InstanceInfoReplicator(。。。。){
//replicationIntervalSeconds 根据传入的值(可以进行配置的) 默认是30s
//算出结果也就是 每分钟允许更改 4 次
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
}
public boolean onDemandUpdate() {
//如果没有超过频率限制
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
//当前调度任务没有关闭
if (!scheduler.isShutdown()) {
//提交新的任务task
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
//这个在前文调度的时候有进行设置scheduledPeriodicRef(设置的是当前执行的调度任务)
Future latestPeriodic = scheduledPeriodicRef.get();
//如果不为空,并且任务还没有完整,则进行取消
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
//调用run方法,进行新一轮的调度
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
return false;
}
} else {
return false;
}
}
总结
- 客户端Instance配置更新了,InstanceInfo复制器会定时执行把客户端的实例更改信息同步到EurekaServer
- 更频频率是有限制的用到了RateLimiter
- 实例更改和状态变更用到的task都是InstanceInfoReplicator,调度细节可以了解一下