Eureka客户端源码-初始化
这一章节只列出客户端初始化的过程,后面章节会逐一展开
客户端的配置信息类EurekaClientConfig
//实现类是DefaultEurekaClientConfig
@ImplementedBy(DefaultEurekaClientConfig.class)
public interface EurekaClientConfig {
/**
* 多长时间从eureka server 获取注册信息 (单位是秒)默认:30s
*/
int getRegistryFetchIntervalSeconds();
/**
* 实例更改复制到其他eureka服务器的频率 (单位是秒)默认:30s
*/
int getInstanceInfoReplicationIntervalSeconds();
/**
* 初始化的时候多长时间之后验证instanceInfo是否变更(单位是秒)默认:40s
*/
int getInitialInstanceInfoReplicationIntervalSeconds();//
/**
* 多长时间拉取一次EurekaServer的信息(Eureka服务增加或者删除) 单位是毫秒 默认:5分钟
*/
int getEurekaServiceUrlPollIntervalSeconds();
/**
* 客户端是否注册到EurekaServer中 默认:true
*/
boolean shouldRegisterWithEureka();
/**
* 客户端是否应从Eureka 服务器获取eureka注册表信息 默认:true
*/
boolean shouldFetchRegistry();
/**
* 初始化heatBeatExecutor 的线程池大小 默认:5
*/
int getHeartbeatExecutorThreadPoolSize();//
/**
* 刷新本地注册表执行器线程池的大小 默认:5
*/
int getCacheRefreshExecutorThreadPoolSize();
/**
* Eureka连接空闲时间 默认:30s
*/
int getEurekaConnectionIdleTimeoutSeconds();
/**
* 从EurekaServer读取数据的超时时间 默认:8s
*/
int getEurekaServerReadTimeoutSeconds();
/**
* EureakaServer连接超时时间 默认:5s
*/
int getEurekaServerConnectTimeoutSeconds();
/**
* EurekaServer允许的最大客户端连接数 默认:200
*/
int getEurekaServerTotalConnections();
/**
* 每个EurekaServer允许的最大客户端连接数 默认:50
*/
int getEurekaServerTotalConnectionsPerHost();//
}
初始化DiscoveryClient 这里把DiscoveryClient分4个部分进行描述
-
初始化执行器
private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor heartbeatExecutor; private final ThreadPoolExecutor cacheRefreshExecutor; try { //初始化调度器,2个核心线程 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder(). setNameFormat("DiscoveryClient-%d"). setDaemon(true).build()); //心跳执行器,核心线程1,最大线程可以配置, heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue
(), new ThreadFactoryBuilder(). setNameFormat("DiscoveryClient-HeartbeatExecutor-%d"). setDaemon(true).build() ); //本地注册表缓存刷新执行器 cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder(). setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d"). setDaemon(true).build() ); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } -
获取注册信息前提配置允许从EurekaServer获取注册表,默认是true
//是否从EurekaServer获取注册信息 if (clientConfig.shouldFetchRegistry()) { try { //是否全量获取注册信息(传入的参数是false,表示不强制进行全量获取) boolean primaryFetchRegistryResult = fetchRegistry(false); boolean backupFetchRegistryResult = true; //上面获取失败的时候才会执行fetchRegistryFromBackup() //fetchRegistryFromBackup(从其他EuerkaServer或者本地进行获取注册表) if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { backupFetchRegistryResult = false; } //上面两种方式都没有获取到,并且配置了 「强制在初始化时获取注册信息」为true,则抛出异常,默认是false if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); } } catch (Throwable th) { } }
-
服务注册
//是否注册到Eureka 并且 在初始化的时候强制进行注册 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { //执行注册流程 if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { } }
-
初始各种task
initScheduledTasks(); private void initScheduledTasks() { //如果需要从EurekaServer获取注册信息 if (clientConfig.shouldFetchRegistry()) { //配置注解参考上面的EurekaClientConfig配置注解 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //本地缓存task cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread()); //调度本地缓存task scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } //如果需要注册到注册表 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); // 心跳Task heartbeatTask = new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()); //开始调度心跳task scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo复制器,实现Runnable接口,主要是更新本地的instanceInfo到EurekaServer上 instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //状态变化监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { } //状态变化之后,进行instanceInfo更新 instanceInfoReplicator.onDemandUpdate(); } }; //如果设置为true,本地InstanceInfo状态边个了,会注册或者更新到EurekaServer上 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //启动,开始任务调度,调度的任务就是自身(instanceInfoReplicator) 主要是更新本地的InstanceInfo instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } }
? 总结:
- 初始化EurekaClientConfig:主要是客户端的相关配置信息
- 初始化DiscoveryClient 核心类主要是用于获取注册信息、续约、更新、取消等操作
- 初始化并启动 本地注册表刷新定时任务、心跳定时任务、instance配置更新定时任务、
- 并根据配置是否发起注册流程