Nacos源码深度解析3-服务监听(客户端)


一.监听服务

naming.subscribe("nacos.test.3", new EventListener() {
    @Override
    public void onEvent(Event event) {
        System.out.println(((NamingEvent) event).getServiceName());
        System.out.println(((NamingEvent) event).getInstances());
    }
});
  二.subscribe
@Override
public void subscribe(String serviceName, String groupName, List clusters, EventListener listener)
        throws NacosException {
    eventDispatcher.addListener(hostReactor
                    .getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")),
            StringUtils.join(clusters, ","), listener);
}
三.getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    //是否开启容灾
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }

    //从内存serviceInfoMap中获取serviceInfo
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        //设置为正在更新
        updatingMap.put(serviceName, new Object());
        //从服务器中查找服务
        updateServiceNow(serviceName, clusters);
        //从正在更新中移除
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {
        //正在更新就wait 5秒钟
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }
    //添加定时更新的任务
    scheduleUpdateIfAbsent(serviceName, clusters);

    return serviceInfoMap.get(serviceObj.getKey());
}
四.添加任务到队列

/**
 * Schedule update if absent.
 *
 * @param serviceName service name
 * @param clusters    clusters
 */
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    //查询任务是否已经存在
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }

    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        //添加任务
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}
  五.updateTask run()
@Override
public void run() {
    //1000
    long delayTime = DEFAULT_DELAY;

    try {
        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        //内存中为空,就从服务器更新
        if (serviceObj == null) {
            updateService(serviceName, clusters);
            return;
        }
        //最后刷新时间 lastRefTime = Long.MAX_VALUE;
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            updateService(serviceName, clusters);
            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        } else {
            // if serviceName already updated by push, we should not override it
            // since the push data may be different from pull through force push
            refreshOnly(serviceName, clusters);
        }

        lastRefTime = serviceObj.getLastRefTime();

        if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
            // abort the update task
            NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
            return;
        }
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        delayTime = serviceObj.getCacheMillis();
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}
  六.添加监听器

/**
 * Add listener.
 *
 * @param serviceInfo service info
 * @param clusters    clusters
 * @param listener    listener
 */
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {

    NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
    List observers = Collections.synchronizedList(new ArrayList());
    observers.add(listener);

    observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
    if (observers != null) {
        observers.add(listener);
    }

    serviceChanged(serviceInfo);
}