Eureka客户端源码-实例配置更新


客户端配置更新

在初始化DiscoveryClient的时候,会调用方法initScheduledTasks() 在方法中有以下代码

// InstanceInfo replicator 创建InstanceInfo复制器 
// instanceInfoReplicator实现了Runnable接口
instanceInfoReplicator = new InstanceInfoReplicator(
  this,
  instanceInfo,
  clientConfig.getInstanceInfoReplicationIntervalSeconds(),
  //爆点
  2); // burstSize
//状态变化监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
  @Override
  public String getId() {
    return "statusChangeListener";
  }
  @Override
  public void notify(StatusChangeEvent statusChangeEvent) {
    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
      // log at warn level if DOWN was involved
      logger.warn("Saw local status change event {}", statusChangeEvent);
    } else {
      logger.info("Saw local status change event {}", statusChangeEvent);
    }
    instanceInfoReplicator.onDemandUpdate();
  }
};
//是否按需更新,默认为true
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
  //注册监听器
  applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//启动
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

InstanceInfoReplicator#start()方法详解

需要注意的是:调度的时候是一次调度的,在调度完成之后的finally方法中会在重新进行调度

public void start(int initialDelayMs) {
  if (started.compareAndSet(false, true)) {
    instanceInfo.setIsDirty();  // for initial register
    //开始调度,调度的是自身 也就是运行InstanceInfoReplicator中的run方法
    Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
    scheduledPeriodicRef.set(next);
  }
}
public void run() {
  try {
    //刷新实例信息
    discoveryClient.refreshInstanceInfo();
    Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
    //如果客户端本地的缓存是脏的,则发起注册,并改为非脏的
    if (dirtyTimestamp != null) {
      discoveryClient.register();
      instanceInfo.unsetIsDirty(dirtyTimestamp);
    }
  } catch (Throwable t) {
  } finally {
    //开始调度
    Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
    scheduledPeriodicRef.set(next);
  }
}

执行刷新本地实例信息

//刷新本地缓存的instanceInfo
void refreshInstanceInfo() {
  //下面两种刷新信息的方式都是更新本地的instanceInfo信息,并设置为脏的并且更新最后脏的时间戳
  //验证hostname是否变化,如果变化则在下一次心跳的时候会把DataCenterInfo传到EurekaServer
  applicationInfoManager.refreshDataCenterInfoIfRequired();
  //刷新租约信息,客户端可以修改心跳间隔时间、实例驱逐时间(多长时间没有收到心跳,服务端移除实例)
  applicationInfoManager.refreshLeaseInfoIfRequired();
  InstanceStatus status;
  try {
    status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
  } catch (Exception e) {
    logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
    status = InstanceStatus.DOWN;
  }
  if (null != status) {
    //状态变更处理
    applicationInfoManager.setInstanceStatus(status);
  }
}

当状态变更的时候,之前的监听器可以监听到状态的变更

//限流器
private final RateLimiter rateLimiter;
//爆点 固定值为2
private final int burstSize;
//允许每分钟修改instance的的频率
private final int allowedRatePerMinute;

InstanceInfoReplicator(。。。。){
  //replicationIntervalSeconds 根据传入的值(可以进行配置的) 默认是30s
  //算出结果也就是 每分钟允许更改 4 次
  this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
}

public boolean onDemandUpdate() {
  //如果没有超过频率限制
  if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
    //当前调度任务没有关闭
    if (!scheduler.isShutdown()) {
      //提交新的任务task
      scheduler.submit(new Runnable() {
        @Override
        public void run() {
          logger.debug("Executing on-demand update of local InstanceInfo");
					//这个在前文调度的时候有进行设置scheduledPeriodicRef(设置的是当前执行的调度任务)
          Future latestPeriodic = scheduledPeriodicRef.get();
          //如果不为空,并且任务还没有完整,则进行取消
          if (latestPeriodic != null && !latestPeriodic.isDone()) {
            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
            latestPeriodic.cancel(false);
          }
					//调用run方法,进行新一轮的调度
          InstanceInfoReplicator.this.run();
        }
      });
      return true;
    } else {
      return false;
    }
  } else {
    return false;
  }
}

总结

  1. 客户端Instance配置更新了,InstanceInfo复制器会定时执行把客户端的实例更改信息同步到EurekaServer
  2. 更频频率是有限制的用到了RateLimiter
  3. 实例更改和状态变更用到的task都是InstanceInfoReplicator,调度细节可以了解一下