Nacos源码深度解析3-服务监听(客户端)
一.监听服务
naming.subscribe("nacos.test.3", new EventListener() { @Override public void onEvent(Event event) { System.out.println(((NamingEvent) event).getServiceName()); System.out.println(((NamingEvent) event).getInstances()); } });二.subscribe
@Override public void subscribe(String serviceName, String groupName, List三.getServiceInfoclusters, EventListener listener) throws NacosException { eventDispatcher.addListener(hostReactor .getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener); }
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); //是否开启容灾 if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //从内存serviceInfoMap中获取serviceInfo ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); //设置为正在更新 updatingMap.put(serviceName, new Object()); //从服务器中查找服务 updateServiceNow(serviceName, clusters); //从正在更新中移除 updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { //正在更新就wait 5秒钟 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //添加定时更新的任务 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }四.添加任务到队列
/** * Schedule update if absent. * * @param serviceName service name * @param clusters clusters */ public void scheduleUpdateIfAbsent(String serviceName, String clusters) { //查询任务是否已经存在 if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } //添加任务 ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } }五.updateTask run()
@Override public void run() { //1000 long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); //内存中为空,就从服务器更新 if (serviceObj == null) { updateService(serviceName, clusters); return; } //最后刷新时间 lastRefTime = Long.MAX_VALUE; if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }六.添加监听器
/** * Add listener. * * @param serviceInfo service info * @param clusters clusters * @param listener listener */ public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); Listobservers = Collections.synchronizedList(new ArrayList ()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo); }