Eureka客户端源码-初始化


这一章节只列出客户端初始化的过程,后面章节会逐一展开

客户端的配置信息类EurekaClientConfig

//实现类是DefaultEurekaClientConfig
@ImplementedBy(DefaultEurekaClientConfig.class)
public interface EurekaClientConfig {
    /**
     * 多长时间从eureka server 获取注册信息 (单位是秒)默认:30s
     */
    int getRegistryFetchIntervalSeconds();
    /**
     * 实例更改复制到其他eureka服务器的频率 (单位是秒)默认:30s
     */
    int getInstanceInfoReplicationIntervalSeconds();
		/**
     * 初始化的时候多长时间之后验证instanceInfo是否变更(单位是秒)默认:40s
     */
    int getInitialInstanceInfoReplicationIntervalSeconds();//
    /**
     * 多长时间拉取一次EurekaServer的信息(Eureka服务增加或者删除) 单位是毫秒 默认:5分钟
     */
    int getEurekaServiceUrlPollIntervalSeconds(); 
	  /**
     * 客户端是否注册到EurekaServer中 默认:true
     */
    boolean shouldRegisterWithEureka();
    /**
     * 客户端是否应从Eureka 服务器获取eureka注册表信息 默认:true
     */
    boolean shouldFetchRegistry();
    /**
     * 初始化heatBeatExecutor 的线程池大小 默认:5
     */
    int getHeartbeatExecutorThreadPoolSize();//
    /**
     * 刷新本地注册表执行器线程池的大小 默认:5
     */
    int getCacheRefreshExecutorThreadPoolSize();
    /**
     * Eureka连接空闲时间 默认:30s
     */
    int getEurekaConnectionIdleTimeoutSeconds();
    /**
     * 从EurekaServer读取数据的超时时间 默认:8s
     */
    int getEurekaServerReadTimeoutSeconds();
    /**
     * EureakaServer连接超时时间 默认:5s
     */
    int getEurekaServerConnectTimeoutSeconds();
    /**
     * EurekaServer允许的最大客户端连接数 默认:200
     */
    int getEurekaServerTotalConnections();
    /**
     * 每个EurekaServer允许的最大客户端连接数 默认:50
     */
    int getEurekaServerTotalConnectionsPerHost();//
}

初始化DiscoveryClient 这里把DiscoveryClient分4个部分进行描述

  • 初始化执行器

    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor heartbeatExecutor;
    private final ThreadPoolExecutor cacheRefreshExecutor;
    try {
      //初始化调度器,2个核心线程
      scheduler = Executors.newScheduledThreadPool(2,
                                                   new ThreadFactoryBuilder().
                                                   setNameFormat("DiscoveryClient-%d").
                                                   setDaemon(true).build());
      //心跳执行器,核心线程1,最大线程可以配置,
      heartbeatExecutor = new ThreadPoolExecutor(1, 
                                                 clientConfig.getHeartbeatExecutorThreadPoolSize(), 
                                                 0, TimeUnit.SECONDS,
        																				 new SynchronousQueue(), 
        																				 new ThreadFactoryBuilder().
                                                 setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").
                                                 setDaemon(true).build()
      ); 
      //本地注册表缓存刷新执行器
      cacheRefreshExecutor = new ThreadPoolExecutor(1, 
                                                    clientConfig.getCacheRefreshExecutorThreadPoolSize(), 
                                                    0, TimeUnit.SECONDS,
        																						new SynchronousQueue(),
        																						new ThreadFactoryBuilder().
                                                    setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").
                                                    setDaemon(true).build()
      ); 
    } catch (Throwable e) {
      throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }
    
  • 获取注册信息前提配置允许从EurekaServer获取注册表,默认是true

    //是否从EurekaServer获取注册信息
    if (clientConfig.shouldFetchRegistry()) {
      try {
        //是否全量获取注册信息(传入的参数是false,表示不强制进行全量获取)
        boolean primaryFetchRegistryResult = fetchRegistry(false);
        boolean backupFetchRegistryResult = true;
        //上面获取失败的时候才会执行fetchRegistryFromBackup()
        //fetchRegistryFromBackup(从其他EuerkaServer或者本地进行获取注册表)
        if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
          backupFetchRegistryResult = false;
        }
        //上面两种方式都没有获取到,并且配置了 「强制在初始化时获取注册信息」为true,则抛出异常,默认是false
        if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
          throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
        }
      } catch (Throwable th) {
      }
    }
    
  • 服务注册

    //是否注册到Eureka 并且 在初始化的时候强制进行注册
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
      try {
        //执行注册流程
        if (!register() ) {
          throw new IllegalStateException("Registration error at startup. Invalid server response.");
        }
      } catch (Throwable th) {
      }
    }
    
  • 初始各种task

    initScheduledTasks();
    private void initScheduledTasks() {
      //如果需要从EurekaServer获取注册信息
      if (clientConfig.shouldFetchRegistry()) {
        //配置注解参考上面的EurekaClientConfig配置注解
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        
        //本地缓存task
        cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", 
                                                   scheduler, cacheRefreshExecutor, 			
                                                   registryFetchIntervalSeconds, TimeUnit.SECONDS,
                                                   expBackOffBound, new CacheRefreshThread());
        //调度本地缓存task
        scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
      }
      
      //如果需要注册到注册表
      if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        // 心跳Task
        heartbeatTask = new TimedSupervisorTask("heartbeat", 
                                                scheduler, heartbeatExecutor,
          																			renewalIntervalInSecs, TimeUnit.SECONDS, 
                                                expBackOffBound, new HeartbeatThread());
        //开始调度心跳task
        scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);
        
        // InstanceInfo复制器,实现Runnable接口,主要是更新本地的instanceInfo到EurekaServer上
        instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, 
          clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize
        
        //状态变化监听器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
          @Override
          public String getId() {
            return "statusChangeListener";
          }
          @Override
          public void notify(StatusChangeEvent statusChangeEvent) {
            if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            }
            //状态变化之后,进行instanceInfo更新
            instanceInfoReplicator.onDemandUpdate();
          }
        };
        
        //如果设置为true,本地InstanceInfo状态边个了,会注册或者更新到EurekaServer上
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
          applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        
        //启动,开始任务调度,调度的任务就是自身(instanceInfoReplicator) 主要是更新本地的InstanceInfo
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
      }
    }
    

? 总结:

  • 初始化EurekaClientConfig:主要是客户端的相关配置信息
  • 初始化DiscoveryClient 核心类主要是用于获取注册信息、续约、更新、取消等操作
  • 初始化并启动 本地注册表刷新定时任务、心跳定时任务、instance配置更新定时任务、
  • 并根据配置是否发起注册流程