sentinel-initFunc&控制台


sentinel-initFunc

 Sentinel提供了一套完整的流控配置UI界面,用于配置流控的各个参数,阈值。其实现的大体逻辑是引入一个web服务,这个web服务可以接收界面上的请求参数,来初始化阈值,流控规则等。并且与控制台服务sentinel-dashboard保持心跳。

那么这个web服务又是如何启动监听,初始化业务handler,以及保存流控规则的呢?将会在下面的源码中讲解。

在sentinel-core的Env中 执行一个静态方法快

public class Env {

    public static final Sph sph = new CtSph();


    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }

}

还是通过SPI的方式获取所有的InitFunc的实现,执行其init方法,接下来看一下这个实现类 com.alibaba.csp.sentinel.metric.extension.MetricCallbackInit

public class MetricCallbackInit implements InitFunc {
    @Override
    public void init() throws Exception {
        StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
            new MetricEntryCallback());
        StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
            new MetricExitCallback());
    }
}

注册了一个entry的回调和exit的回调,内部还是通过SPI的方式获取到MetricExtension的实现,来实现一些扩展计量器的addPass方法等

public class MetricEntryCallback implements ProcessorSlotEntryCallback {

    @Override
    public void onPass(Context context, ResourceWrapper rw, DefaultNode param, int count, Object... args)
        throws Exception {
        for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
            if (m instanceof AdvancedMetricExtension) {
                ((AdvancedMetricExtension) m).onPass(rw, count, args);
            } else {
                m.increaseThreadNum(rw.getName(), args);
                m.addPass(rw.getName(), count, args);
            }
        }
    }

    @Override
    public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
                          int count, Object... args) {
        for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
            if (m instanceof AdvancedMetricExtension) {
                ((AdvancedMetricExtension) m).onBlocked(resourceWrapper, count, context.getOrigin(), ex, args);
            } else {
                m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args);
            }
        }
    }
}

当然,在sentinel-core中没有注册默认的MetricExtension。

控制台server的实现

依赖这部分的初始化方法,在自己的项目中引入sentinel-transport。在sentinel-transport-common的resources中InitFunc定义了两个初始化实现

com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc

com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc

CommandCenterInitFunc

@InitOrder(-1)
public class CommandCenterInitFunc implements InitFunc {

    @Override
    public void init() throws Exception {
        CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

        if (commandCenter == null) {
            RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
            return;
        }

        // 添加handler处理器
        commandCenter.beforeStart();
        // 启动服务端
        commandCenter.start();
        RecordLog.info("[CommandCenterInit] Starting command center: "
                + commandCenter.getClass().getCanonicalName());
    }
}

在上面的SPI的处理中获CommandCenter的实现,假如引入的是sentinel-transport-simple-http,在其resource中com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter

然后处理beforeStart和start方法,做的事初始化请求处理的各个handler以及启动server监听,很简单这边不做过多阐述,读者可以自行查阅代码。

HeartbeatSenderInitFunc

还是假如引入的是sentinel-transport-simple-http,在其resource中com.alibaba.csp.sentinel.transport.heartbeat.SimpleHttpHeartbeatSender

每隔10会发送一次心跳

 @Override
    public boolean sendHeartbeat() throws Exception {
        if (TransportConfig.getRuntimePort() <= 0) {
            RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat");
            return false;
        }
        Endpoint addrInfo = getAvailableAddress();
        if (addrInfo == null) {
            return false;
        }

        SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());
        request.setParams(heartBeat.generateCurrentMessage());
        try {
            SimpleHttpResponse response = httpClient.post(request);
            if (response.getStatusCode() == OK_STATUS) {
                return true;
            } else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) {
                RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo
                    + ", http status code: " + response.getStatusCode());
            }
        } catch (Exception e) {
            RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e);
        }
        return false;
    }