SpringCloud学习笔记(三万字长文)
SpringCloud学习文档
转载请声明!!!
一、微服务架构
1.1 应用架构发展
集中式架构
网站流量很小,一个应用将所有功能部署
优点:系统开发速度快;维护成本低;适用于并发要求较低的系统
缺点:代码耦合高,维护困难;无法进行不同模块的针对性优化;无法水平拓展;单点容错率低,并发能力差。
垂直拆分
当访问量增大,为了应对更高的并发和业务需求,对业务进行拆分。
优点:系统拆分实现类流量分担,解决了并发,可以针对不同模块进行优化,方便水平拓展负载均衡,容错率提高。
缺点:系统相互独立,会有很多重复的开发工作,影响效率
分布式服务
当垂直应用越来越多,应用交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端能更快速地相应多变的市场需求
优点:将基础服务抽取,系统间互相调用提高了代码的复用和开发效率
缺点:系统间耦合变高,调用关系错综复杂,难以维护。
面向服务(SOA)
SOA面向服务架构,包含多个服务服务之间通过相互依赖最终提供一系列的功能。一个服务通常以独立的形式存在与操作系统进程中。各个服务之间通过网络调用。
缺点:每个供应商提供的ESB产品有偏差,自身实现较为复杂;应用服务粒度较大,ESB集成整合所有服务和协议、数据转换使得运维、测试部署困难。所有服务都通过一个通路通信,直接降低了通信速度。
微服务
API Gateway网关是一个服务器,是系统的唯一入口。为每个客户端提供一个定制API。API网关核心是,所有的客户端和消费端都通过统一的网关接入微服务,在网关层处理所有的非业务功能。如它还可以具有其它职责,如身份验证、监控、负载均衡、缓存、请求分片与管理、静态响应处理。通常,网关提供RESTful/HTTP的方式访问服务。而服务端通过服务注册中心进行服务注册和管理。
微服务架构是使用一套小服务来开发单个应用的方式或途径,每个服务基于单一业务能力构建,运行在自己的进程中,并使用轻量级机制通信,通常是HTTP API,并能够通过自动化部署机制来独立部署。这些服务可以使用不同的编程语言实现,以及不同数据存储技术,并保持最低限度的集中式管理。
1.2 服务调用方式
RPC和HTTP
无论是微服务还是SOA,都面临着服务间的远程调用。
常见的远程调用方式有以下2种:
RPC:Remote Produce Call远程过程调用,RPC基于Socket,工作在会话层。自定义数据格式,速度快,效率高。早期的webservice,现在热门的dubbo,都是RPC的典型代表。
Http:http其实是一种网络传输协议,基于TCP,工作在应用层,规定了数据传输的格式。现在客户端浏览器与服务端通信基本都是采用Http协议,也可以用来进行远程服务调用。缺点是消息封装臃肿,优势是对服务的提供和调用方没有任何技术限定,自由灵活,更符合微服务理念。
区别:RPC的机制是根据语言的API(language API)来定义的,而不是根据基于网络的应用来定义的。如果公司全部采用Java技术栈,那么使用Dubbo作为微服务架构是一个不错的选择。相反,如果公司的技术栈多样化,而且你更青睐Spring家族,那么Spring Cloud搭建微服务是不二之选。
Http客户端工具
既然微服务选择了Http,那么我们就需要考虑自己来实现对请求和响应的处理。不过开源世界已经有很多的http客户端工具,能够帮助我们做这些事情,例如:
HttpClient
OKHttp
URLConnection
不过这些不同的客户端,API各不相同。而Spring也有对http的客户端进行封装,提供了工具类叫RestTemplate。
Spring的RestTemplate
Spring提供了一个RestTemplate模板工具类,对基于Http的客户端进行了封装,并且实现了对象与json的序列化和
反序列化,非常方便。RestTemplate并没有限定Http的客户端类型,而是进行了抽象,目前常用的3种都有支持:HttpClient、OkHttp、JDK原生的URLConnection(默认的)
1.3 微服务中的一些概念
-
服务注册和服务发现
服务注册:服务提供者将所提供服务的信息(服务器IP和端?、服务访问协议等)注册/登记到注册中?。
服务发现:服务消费者能够从注册中?获取到较为实时的服务列表,然后根究?定的策略选择?个服务访问。
-
负载均衡
负载均衡即将请求压?分配到多个服务器(应?服务器、数据库服务器等),以此来提?服务的性能、可靠性。
-
熔断
熔断即断路保护。微服务架构中,如果下游服务因访问压?过??响应变慢或失败,上游服务为了保护系统整体可?性,可以暂时切断对下游服务的调?。这种牺牲局部,保全整体的措施就叫做熔断。
-
链路追踪
微服务架构越发流?,?个项?往往拆分成很多个服务,那么?次请求就需要涉及到很多个服务。不同的微服务可能是由不同的团队开发、可能使?不同的编程语?实现、整个项?也有可能部署在了很多服务器上(甚?百台、千台)横跨多个不同的数据中?。所谓链路追踪,就是对?次请求涉及的很多个服务链路进??志记录、性能监控。
-
API网关
微服务架构下,不同的微服务往往会有不同的访问地址,客户端可能需要调?多个服务的接?才能完成?个业务需求,如果让客户端直接与各个微服务通信可能出现以下问题:
- 客户端需要调?不同的url地址,增加了维护调?难度
- 在?定的场景下,也存在跨域请求的问题(前后端分离就会碰到跨域问题,原本我们在后端采?Cors就能解决,现在利??关,那么就放在?关这层做好了)
- 每个微服务都需要进?单独的身份认证
那么,API?关就可以较好的统?处理上述问题,API请求调?统?接?API?关层,由?关转发请求。API?关更专注在安全、路由、流量等问题的处理上(微服务团队专注于处理业务逻辑即可),它的功能有:
- 统?接?——路由
- 安全防护——统?鉴权,负责?关访问身份认证验证,与“访问认证中?”通信,实际认证业务逻辑交移“访问认证中?”处理
- 黑白名单——实现通过IP地址控制禁?访问?关功能,控制访问
- 协议适配——实现通信协议校验、适配转换的功能
- 流量管控——限流
- 长短连接支持
- 容错能力(负载均衡)
二、Spring Cloud概述
2.1 什么是SpringCloud
[百度百科]Spring Cloud是?系列框架的有序集合。它利?Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册、配置中?、消息总线、负载均衡、断路器、数据监控等,都可以? Spring Boot的开发?格做到?键启动和部署。Spring Cloud并没有重复制造轮?,它只是将?前各家公司开发的?较成熟、经得起实际考验的服务框架组合起来,通Spring Boot?格进?再封装屏蔽掉了复杂的配置和实现原理,最终给开发者留出了?套简单易懂、易部署和易维护的分布式系统开发?具包。
Spring Cloud 规范及实现意图要解决的问题其实就是微服务架构实施过程中存在的?些问题,?如微服务架构中的服务注册发现问题、?络问题(?如熔断场景)、统?认证安全授权问题、负载均衡问题、链路追踪等问题。
2.2 Spring Cloud架构
SpringCloud的核心组件按照发展可以分为第一代SpringCloud组件和第二代核心组件。
第一代(Netflix,SCN) | 第二代(Alibaba,SCA) | |
---|---|---|
注册中心 | Netflix Eureka | 阿里 Nacos |
客户端负载均衡 | Netflix Ribbon | 阿里 Dubbo LB、SpringCloud Loadbalancer |
熔断器 | Netflix Hystrix | 阿里 Sentinel |
网关 | Netflix Zuul | SpringCloud Gateway |
配置中心 | SpringCloud Config | 阿里 Nacos、携程Apollo |
服务调用 | Netflix Feign | 阿里 Dubbo RPC |
消息驱动 | SpringCloud Stream | |
链路追踪 | SpringCloud Sleuth/Zipkin | |
阿里巴巴Seata分布式事务 |
Spring Cloud中的各组件协同?作,才能够?持?个完整的微服务架构。?如:
- 注册中?负责服务的注册与发现,很好将各服务连接起来
- API?关负责转发所有外来的请求
- 断路器负责监控服务之间的调?情况,连续多次失败进?熔断保护。
- 配置中?提供了统?的配置信息管理服务,可以实时的通知各个服务获取最新的配置信息
2.3 对dubbo和SpringBoot的关系
Dubbo是阿?巴巴公司开源的?个?性能优秀的服务框架,基于RPC调?,对于?前使?率较?的Spring Cloud Netflflix来说,它是基于HTTP的,所以效率上没有Dubbo?,但问题在于Dubbo体系的组件不全,不能够提供?站式解决?案,?如服务注册与发现需要借助于Zookeeper等实现,?Spring Cloud Netflflix则是真正的提供了?站式服务化解决?案,且有Spring?家族背景。
Spring Cloud 只是利?了Spring Boot 的特点,让我们能够快速的实现微服务组件开发,否则不使?Spring Boot的话,我们在使?Spring Cloud时,每?个组件的相关Jar包都需要我们??导?配置以及需要开发?员考虑兼容性等各种情况。所以Spring Boot是我们快速把Spring Cloud微服务技术应?起来的?种?式。
三、微服务入门案例
我们在这部分模拟一个微服务之间的调用,在之后会一步一步使用SpringCloud组件对案例进行改造。
项目地址:Github下载地址
启动方式
java -Dserver.port=8080 -jar sentinel-dashboard.jar
其中
-Dserver.port=8080
用于指定 Sentinel 控制台端口为8080
。也可以加&
来后台启动。启动后就可以登录,账号密码:sentinel/sentinel
Sentinel控制台最好和客户端版本一致,我的sentinelcore版本是1.8,我是用1.7.1的控制台会造成RT降级失效,因为sentinel1.8将RT降级策略升级为慢调用比例策略。我将控制台升级为1.8后满调用比例就可以熔断了。
刚登录什么都没有,需要我们将微服务注册上去。下面我们通过autodeliver工程来实现sentinel配置。
5.2.2 Sentinel应用
为了保留之前的配置方式,所以我们按照autodelivernacos复制一份工程,然后删除hystrix的配置,并删除原有OpenFeign的降级配置。
然后我们在这个工程进行sentinel的改造,首先导入依赖
com.alibaba.cloud spring-cloud-starter-alibaba-sentinel 然后配置Sentinal注册到DashBoard:
server: port: 8073 spring: application: name: autodeliver cloud: nacos: discovery: server-addr: 192.168.22.162:8848,192.168.22.162:8849,192.168.22.162:8850 config: server-addr: 192.168.22.162:8848,192.168.22.162:8849,192.168.22.162:8850 file-extension: yml namespace: autodeliver extension-configs[0]: data-id: test.yml group: DEFAULT_GROUP refresh: true extension-configs[1]: data-id: test.properties group: DEFAULT_GROUP refresh: true sentinel: transport: dashboard: 127.0.0.1:8080 #dashboard地址 port: 8719 #此端口用于与Sentinel控制台交互,如果8719被占用会依次加一 # springboot中暴露健康检查等断点接? management: endpoints: web: exposure: include: "*" # 暴露健康接?的细节 endpoint: health: show-details: always user: ribbon: ConnectTimeout: 2000 ReadTimeout: 8000 OkToRetryOnAllOperations: true MaxAutoRetries: 0 MaxAutoRetriesNextServer: 0 NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule
我们启动并通过postman发送请求,在控制台中查看:
如果启动后发现Sentinel控制台什么都没有,那么发送一次请求即可,因为Sentinel是懒加载
5.2.3 Sentinel中的概念
名称 描述 资源 它是Java应用程序中的任何内容,例如由应用程序提供的服务,或者是由应用程序调用的其它应用提供的服务,甚至可以是一段代码。(也可以简单的理解为API接口就是资源) 规则 围绕资源的实时状态设定的规则,可以包括流量控制规则,熔断降级规则以及系统保护规则。所有规则可以动态实时调整。 如上图,在簇点链路中我们可以看到现有的资源,当然在右上角切换为列表视图我们可以只看到接口,这样更直观和清晰。
5.2.4 Sentinel中的流量规则
在接口的操作中或者在流控规则中都可以新增定义流控规则,如下图:
或者流控规则中也可以:
在Sentinel的流控中有阈值类型、流控模式和流控效果我们点击新增流控规则,即可看到:
下面对这些进行一一详解。
- 资源名:请求路径
- 针对来源:Sentinel可以针对调用者进行限流,可以填写微服务名称,如果是default就不区分来源
- 阈值类型/单机阈值
- QPS(每秒钟请求数量):当调用该资源的QPS达到阈值进行限流
- 线程数:当调用该资源的线程数达到阈值进行限流(线程处理请求时,如果业务逻辑很长,当流量洪峰来临时,会消耗很多线程资源,这些线程资源会堆积,最终造成服务的不可用,甚至雪崩)
- 流控模式
- 直接:资源调用达到限流条件时,直接限流。
- 关联:关联的资源调用达到阈值时限流自己。
- 链路:只记录指定链路上的流量。
- 流控效果
- 快速失败:直接失败,抛出异常
- Warm Up:根据冷加载因子(默认3)的值,从阈值或者冷加载因子开始,经过预热时长,才能达到设置的QPS阈值。
- 排队等待:匀速排队,让请求匀速通过,阈值类型必须是QPS,否则无效
我们下面进行验证:首先验证QPS和线程数
阈值类型应用:QPS和线程数
我们对接口进行限流设置:将其他都默认,将类型设置为QPS,阈值设置为1,然后打开postman进行快速发送请求,我们会发现Sentinel会抛出失败,因为我们点击的速度超过了1QPS,所以会限流:
然后将流控设置改为:将其他都默认,将类型设置为线程数,阈值设置为1,然后将接口增加以下代码让线程休眠5s:
@GetMapping("/findOpenStatusByUid") public Integer findOpenStatusByUid(@RequestParam("uid") Integer uid){ try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } UserInfo userById = userServiceFeignClient.findUserById(uid); return userById.getOpen(); }
然后postman开两个窗口访问同一个接口,第一个窗口5s后会返回结果,第二个窗口会返回失败,因为同时访问线程达到了2超过阈值进行限流。
第二个窗口:
流控模式应用:直接、关联和链路
直接模式略,上面已经试过。
关联模式:关联的资源调用达到阈值限流自己
我们以案例来验证此功能,比如我们用户注册接口需要调用身份验证接口,如果身份验证接口请求达到阈值使用关联模式可以直接对用户注册接口进行限流。我们首先在autodeliverSentinel(在这个工程中编写是为了方便,因为这个工程已经注册到Sentinel中了)中编写这两个接口:
@RestController @RequestMapping("/user") public class UserController { @GetMapping("/register") public String register(){ System.out.println("注册成功"); return "注册成功"; } @GetMapping("/validateID") public String validateID(){ System.out.println("验证身份成功"); return "验证身份成功"; } }
然后我们进行Sentinel设置,如下图,设置完之后
/validateID
达到阈值后/register
会被限流:我们在postman中每0.5s发一次
/validateID
请求发送20次(使用RunCollection即可,使用方法见4.3.4关于postman的RunCollection介绍),在这期间我们访问/register
接口,我们发现此接口会被限流,当postman的RunCollection执行完之后,我们再访问就不会被限流了。:链路模式
/ \ / \ register1 register2 / \ / \ validateID validateID
如上图:有两条调用链路都调用了validateID这一个资源,在Sentinel中允许只根据调用入口的统计信息对资源限流。比如链路模式下设置资源为register1,这样的话表示只有从register1的调用才会记录到validateID的限流统计中,而不关心register2到来的调用,如下图:
流控效果应用:快速失败、Warm UP和排队等待
快速失败:直接失败,抛出异常,略
Warm Up预热模式
当系统长期处于空闲的情况下,当流量突然增加时,直接把系统拉升到高水位可能会瞬间把系统压垮,比如电商的秒杀模块,所以我们可以通过WarmUp模式,让流速缓慢增加,通过设置的预热时间之后,达到系统处理请求速率的设定值。
Warm Up预热模式会默认从设置的QPS阈值的1/3开始慢慢上升到QPS设置值,比如我们给register接口设置为如下规则:
当请求来临到5s内阈值都是10的1/3,5s后才会恢复到阈值10。
我们在浏览器快速发送请求,可以看到在5s内请求会出现限流,5s后就不会出现限流(除非手速快到1s10次以上请求):
排队等待
排队等待模式下,会严格控制请求通过的间隔时间,即匀速通过请求,允许部分请求排队等待,通常用于削峰等场景,需要设置具体的超时时间,如果等待时间超过超时时间请求就会被拒绝。等待时间使用1s除以阈值,比如QPS设置为5,那么就代表没200ms才能通过一个请求。下面我们进行实验:我们先给register接口打印时间:
@GetMapping("/register") public String register(){ System.out.println(new Date()+"注册成功"); return "注册成功"; }
然后在Sentinel中设置规则,我们设置为1s通过一个请求:
然后用postman的run每20ms发一个请求:
观察控制台发现虽然是每20ms发送一个请求,但是实际上是1s通过一个请求:
5.2.5 Sentinel中的降级规则
流控是对外部的大流量进行控制,而熔断降级是对内部问题进行处理,Sentinel降级会在调用链路中某个资源出现不稳定状态时(调用超时或异常比例升高),对这个资源的调用进行限制,让请求快速失败,避免影响到其他的资源而导致级联错误。当资源被降级后,在接下来的降级时间窗口内,对该资源的调用都自动熔断。
Sentinel中对降级有三种策略:
RT(平均响应时间)
选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(
statIntervalMs
)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。Sentineld额默认的RT上限是4900ms超出时间都会算作4900ms,可以配置启动项
- Dcsp.sentinel.statistic.,ax.rt=xxx
下面我们进行实验:首先在findOpenStatusByUid方法中,加入休眠一秒:
@GetMapping("/findOpenStatusByUid") public Integer findOpenStatusByUid(@RequestParam("uid") Integer uid){ // 测试慢比例调用 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 测试异常熔断 // int i = 1/0; System.out.println("请求到这了"); UserInfo userById = userServiceFeignClient.findUserById(uid); return userById.getOpen(); }
然后配置RT为200ms,以下配置表示统计时长1000ms内超过RT(200ms)的调用且请求数大于5个则该请求成为慢调用,慢调用的比例大于配置的阈值即0.5那么就熔断5s。
然后开启jmeter设置线程数为10,启动时间1s永远循环,这时我们在用postman调用就会失败,如果我们关闭jmeter的循环请求,那么5s后将不在熔断,请求恢复正常。
异常比例
当单位统计时长(
statIntervalMs
)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是[0.0, 1.0]
,代表 0% - 100%。我们注释掉线程休眠然后制造异常:
@GetMapping("/findOpenStatusByUid") public Integer findOpenStatusByUid(@RequestParam("uid") Integer uid){ // 测试慢比例调用 // try { // TimeUnit.SECONDS.sleep(1); // } catch (InterruptedException e) { // e.printStackTrace(); // } // 测试异常熔断 int i = 1/0; System.out.println("请求到这了"); UserInfo userById = userServiceFeignClient.findUserById(uid); return userById.getOpen(); }
然后配置sentinel降级:
打开之前的jemter,使用postman测试发现可以熔断:
异常数
当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。
设置降级规则:
打开jmeter测试postman:发现可以降级:
在Sentinel中降级约等于Hystrix的熔断
5.2.6 自定义兜底数据
使用@SentinelResource注解自定义兜底数据,它类似于Hystrix中的@HystrixCommand注解。
@SentinelResource有两个属性需要区分一个是blockHandler用于指定不满足Sentinel规则的降级方法,fallback属性用于指定Java运行时异常方法。
案例应用:
Controller中API配置注解:
@GetMapping("/findOpenStatusByUid") @SentinelResource(value = "findOpenStatusByUid" ,blockHandler = "doException") public Integer findOpenStatusByUid(@RequestParam("uid") Integer uid){ UserInfo userById = userServiceFeignClient.findUserById(uid); return userById.getOpen(); }
兜底数据类:
public class SentinelFallback { public static Integer handleException(Integer uid, BlockException blockException) { return -100; } public static Integer handleError(Integer uid) { return -500; } }
我们可以开一个阈值为1的流控,然后postman测试handleException
然后写一个1/0的异常,再次测试:
5.2.7 基于nacos的Sentinel规则持久化
Sentinel的规则数据都是存储在内存中的,所以一旦我们停掉微服务,数据就会消失,因此我们可以将规则数据持久化到nacos上,让微服务获取nacos数据。Sentinel的持久化一共有三种模式:
- 规则存在内存中
- 拉模式,规则存在文件中
- 推模式,规则存在数据源中
官方文档介绍如下:
我们用push模式:下面我们在autodeliverSentinel中进行规则数据持久化配置。
首先我们导入依赖:
com.alibaba.csp sentinel-datasource-nacos 然后在配置文件中进行配置:
sentinel: transport: dashboard: 127.0.0.1:8080 #dashboard地址 port: 8719 #此端口用于与Sentinel控制台交互,如果8719被占用会依次加一 datasource: #flow为自定义的数据源名称 flow: nacos: server-addr: ${spring.cloud.nacos.discovery.server-addr} data-id: ${spring.application.name}-flow-rules groupId: DEFAULT_GROUP data-type: json rule-type: flow #类型来自RuleType degrade: nacos: server-addr: ${spring.cloud.nacos.discovery.server-addr} data-id: ${spring.application.name}-degrade-rules groupId: DEFAULT_GROUP data-type: json rule-type: degrade #类型来自RuleType
在nacos上的public的DEFAULT_GROUP添加两个配置文件
内容分别是:以下规则可以在源码FlowRule类和DegradeRule类中找到
autodeliver-flow-rules
[ { "resource":"/findOpenStatusByUid", "limitApp":"default", "grade":1, "count":1, "strategy":0, "controlBehavior":0, "clusterMode":false } ]
autodeliver-degrade-rules
[ { "resource":"/findOpenStatusByUid", "grade":2, "count":1, "timeWindow":5 } ]
启动工程就可以在Sentinel中看到我们配置的规则了。这种配置的特点如下:
- ?个资源可以同时有多个限流规则和降级规则,所以配置集中是?个json数组。
- 这种配置方式在Sentinel控制台中修改规则,仅是内存中?效,不会修改Nacos中的配置值,重启后恢复原来的值; Nacos控制台中修改规则,不仅内存中?效,Nacos中持久化规则也?效,重启后规则依然保持。也就是说只能在Nacos中对配置进行更改
这部分官方文档写的很不清楚,我查了很多资料按照上面的流程走了一遍,发现并不能完成配置同步,报错空指针:
我查了很多资料可能是jdk版本的问题,可以见下面的issue
https://github.com/alibaba/Sentinel/issues/1817
也有可能是Sentinel版本与springcloudalibaba版本冲突的的问题:
解决办法:我将SpringCloudAlibaba降为2.2.1;sentinel降为1.7.2即可正常使用
最终解决办法:我不想降低springCloudAlibaba的版本,因为很多东西再用,不可能说降就降,所以我在NacosDataSourceFactoryBean的第70行源码加上了断点,发现username找不到,会报空指针,所以我们配置文件上配置了username和password然后就可以使用了,真的是很坑!!!配置文件如下:
spring: application: name: autodeliver cloud: sentinel: transport: dashboard: localhost:8080 # sentinel注册地址 datasource: # 名称随意 flow: nacos: server-addr: localhost:8848 username: nacos password: nacos dataId: ${spring.application.name}-flow-rules namespace: sentinel groupId: SENTINEL_GROUP rule-type: flow # 规则类型,取值见:org.springframework.cloud.alibaba.sentinel.datasource.RuleType degrade: nacos: server-addr: localhost:8848 username: nacos password: nacos dataId: ${spring.application.name}-degrade-rules namespace: sentinel groupId: SENTINEL_GROUP rule-type: degrade
问题分析:应该是这个版本sentinel配置nacos持久化,需要账户密码去验证但之前的版本不需要,所以会报这个错。PS:官方文档真的很不友好。
所以如果想实现sentinel修改能保存规则,那么可以通过修改源代码的方式实现。见下面的5.3.5。
5.3 Sentinel源码1.8版本
5.3.1 工程搭建
下载Sentinel源码并解压,源码在Github可以下载。解压后用IDEA打开,如下图,可以通过dashboard启动类来启动工程,这时我们就可以使用我们自己的入门案例连接到这个控制台上。
项目结构:
- sentinel-core 核心模块,限流降级、系统保护等实现
- sentinel-dashboard控制台模块,可以实现可视化管理
- sentinel-transport传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现。
- sentinel-extension扩展模块,主要对datasource进行了部分扩展
- sentinel-adapter适配器模块,主要对以下常见框架进行了适配
- sentinel-demo样例模块,可以参考
- sentinel-benchmark基准测试模块,对核心代码的精确性提供基准测试。
如果控制台打开页面时会一直转圈,可以清除浏览器缓存,缓存会造成打开sentinel页面时一直转圈。
5.3.2 入门案例
在Github的wiki上有Sentinel简单的使用案例:
public static void main(String[] args) { initFlowRules(); while (true) { Entry entry = null; try { entry = SphU.entry("HelloWorld"); /*您的业务逻辑 - 开始*/ System.out.println("hello world"); /*您的业务逻辑 - 结束*/ } catch (BlockException e1) { /*流控逻辑处理 - 开始*/ System.out.println("block!"); /*流控逻辑处理 - 结束*/ } finally { if (entry != null) { entry.exit(); } } } }
当然上面的代码是对业务有侵入性的所以也提供了注解模式即
@SentinelResource
,我们之前已经介绍过了。我们下面使用SphU的方式写一个案例,方便我们理解源码:
我们在autodeliverSentinel工程中新建一个OrderController,然后编写如下代码:
@RestController @RequestMapping("/order") public class OrderController { @RequestMapping("/testFunc") public String testFunc(String application,long id){ initFlowRules(); ContextUtil.enter("user",application); Entry entry = null; try { //1 SphU.entry entry = SphU.entry("testFunc", EntryType.IN); /*您的业务逻辑 - 开始*/ System.out.println("hello world"); return getOrderName(id); /*您的业务逻辑 - 结束*/ } catch (BlockException e1) {//2 BlockException异常分支 /*流控逻辑处理 - 开始*/ System.out.println("block!"); throw new RuntimeException("系统繁忙"); /*流控逻辑处理 - 结束*/ } finally { if (entry != null) { entry.exit(); } } } //用此方法模拟调用其他服务的接口 public String getOrderName(long id){ Entry entry = null; try { entry = SphU.entry("getOrderName"); /*您的业务逻辑 - 开始*/ return "BuySomething"; /*您的业务逻辑 - 结束*/ } catch (BlockException e1) { /*流控逻辑处理 - 开始*/ return null; /*流控逻辑处理 - 结束*/ } finally { if (entry != null) { entry.exit(); } } } private static void initFlowRules(){ List
rules = new ArrayList<>(); FlowRule rule = new FlowRule(); rule.setResource("testFunc"); rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // Set limit QPS to 20. rule.setCount(20); rules.add(rule); FlowRuleManager.loadRules(rules); } } 我们对上面的代码进行解释,部分解释来自源码,注意实际开发不会这么写,目前是为了分析源码:
Entry,这时Sentinel的重点,对于
SphU#entry
方法,作用是记录统计信息并对给定资源进行规则检查,它有很多重载,我们这里介绍上面代码中的两个参数
第一个参数是标识资源,通常就是我们的接口标识,对于数据统计、规则控制等,我们一般都是在这个粒度上进行的,根据这个字符串来唯一标识,我们跟源码进入会发现,它最后会被包装成ResourceWrapper 实例,ResourceWrapper 的hashCode和equals源码如下,可以证实资源是根据这个字符串来唯一标识
@Override public int hashCode() { return getName().hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof ResourceWrapper) { ResourceWrapper rw = (ResourceWrapper)obj; return rw.getName().equals(getName()); } return false; }
第二个参数标识资源的类型,我们的代码使用了
EntryType.IN
代表这个是入口流量,比如我们的接口对外提供服务,那么我们通常就是控制入口流量;EntryType.OUT
代表出口流量,比如上面的 getOrderName方法(没写默认就是 OUT源码,自行查看源码确认),这个流量类型主要在SystemSlot
类中实现自适应限流。BlockException:进入 BlockException 异常分支,代表该次请求被流量控制规则限制了,我们一般会让代码走入到熔断降级的逻辑里面。当然,BlockException 其实有好多个子类,如 DegradeException、FlowException 等,我们也可以 catch 具体的子类来进行处理。
5.3.3 Sentinel客户端与dashboard通信
在 Sentinel 的源码中,打开 sentinel-transport 工程,可以看到三个子工程,common 是基础包和接口定义。
如果客户端要接入 dashboard,可以使用 netty-http 或 simple-http 中的一个。我们这里使用http的方式。
我们在使用Sentinel时需要执行
SphU#entry
方法,源码如下:public static Entry entry(String name) throws BlockException { return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); }
这里有一个Env类,这个类是用来与dashboard通信的。
public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }
这个类有一个doInit方法,点进去源码如下:
public static void doInit() { if (!initialized.compareAndSet(false, true)) { return; } try { ServiceLoader
loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class); List initList = new ArrayList (); for (InitFunc initFunc : loader) { RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName()); insertSorted(initList, initFunc); } for (OrderWrapper w : initList) { //断点 w.func.init(); RecordLog.info(String.format("[InitExecutor] Executing %s with order %d", w.func.getClass().getCanonicalName(), w.order)); } } catch (Exception ex) { RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex); ex.printStackTrace(); } catch (Error error) { RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error); error.printStackTrace(); } } 我们在
w.func.init();
这行加入断点,这里使用 SPI 加载 InitFunc 的实现。可以发现这里加载了CommandCenterInitFunc
类和HeartbeatSenderInitFunc
类。前者是客户端启动的接口服务,提供给 dashboard 查询数据和规则设置使用的。后者用于客户端主动发送心跳信息给 dashboard。
客户端服务注册
我们先看
HeartbeatSenderInitFunc#init
方法@Override public void init() { //SPI机制,如果我们添加了http的依赖,那么 SimpleHttpHeartbeatSender 就会被加载,可以自行断点查看 HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender(); if (sender == null) { RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded"); return; } initSchedulerIfNeeded(); //设置心跳任务发送的时间间隔,默认10s long interval = retrieveInterval(sender); setIntervalIfNotExists(interval); //启动心跳任务 scheduleHeartbeatTask(sender, interval); }
我们跟进去查看sendHeartbeat()方法
@Override public boolean sendHeartbeat() throws Exception { if (TransportConfig.getRuntimePort() <= 0) { RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat"); return false; } //获取Socket连接地址 Tuple2
addrInfo = getAvailableAddress(); if (addrInfo == null) { return false; } InetSocketAddress addr = new InetSocketAddress(addrInfo.r1, addrInfo.r2); //封装SimpleHttpRequest对象,发送路径/registry/machine SimpleHttpRequest request = new SimpleHttpRequest(addr, TransportConfig.getHeartbeatApiPath()); request.setParams(heartBeat.generateCurrentMessage()); try { //发送请求,ps:在这行打断点就可以看到发送路径 SimpleHttpResponse response = httpClient.post(request); if (response.getStatusCode() == OK_STATUS) { return true; } else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) { RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + ", http status code: " + response.getStatusCode()); } } catch (Exception e) { RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr, e); } return false; } 请求参数截图:
有了路径后我们可以搜索这个请求,在dashboard的MachineRegistryController中可以看见请求最终到达receiverHeartBeat方法。
@ResponseBody @RequestMapping("/machine") public Result<?> receiveHeartBeat(String app, @RequestParam(value = "app_type", required = false, defaultValue = "0") Integer appType, Long version, String v, String hostname, String ip, Integer port) { if (app == null) { app = MachineDiscovery.UNKNOWN_APP_NAME; } if (ip == null) { return Result.ofFail(-1, "ip can't be null"); } if (port == null) { return Result.ofFail(-1, "port can't be null"); } if (port == -1) { logger.info("Receive heartbeat from " + ip + " but port not set yet"); return Result.ofFail(-1, "your port not set yet"); } String sentinelVersion = StringUtil.isEmpty(v) ? "unknown" : v; version = version == null ? System.currentTimeMillis() : version; try { MachineInfo machineInfo = new MachineInfo(); machineInfo.setApp(app); machineInfo.setAppType(appType); machineInfo.setHostname(hostname); machineInfo.setIp(ip); machineInfo.setPort(port); machineInfo.setHeartbeatVersion(version); machineInfo.setLastHeartbeat(System.currentTimeMillis()); machineInfo.setVersion(sentinelVersion); //将接收到的信息添加到应用程序管理中 appManagement.addMachine(machineInfo); return Result.ofSuccessMsg("success"); } catch (Exception e) { logger.error("Receive heartbeat error", e); return Result.ofFail(-1, e.getMessage()); } }
客户端处理请求
在sentinel中数据存储先流规则都是在客户端存储的。实现类CommandCenterInitFunc完成sentinel服务端发送过来的请求相关操作。我们再看
CommandCenterInitFunc#init
方法。//命令中心初始化类 @InitOrder(-1) public class CommandCenterInitFunc implements InitFunc { @Override public void init() throws Exception { CommandCenter commandCenter = CommandCenterProvider.getCommandCenter(); if (commandCenter == null) { RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter"); return; } //注册处理器 commandCenter.beforeStart(); //启动命令中心 commandCenter.start(); RecordLog.info("[CommandCenterInit] Starting command center: " + commandCenter.getClass().getCanonicalName()); } }
我们进入到beforeStart()方法,打断点会发现handlers会注册所有的处理器,然后最后会将所有的处理器注册到handlerMap中。
@Override @SuppressWarnings("rawtypes") public void beforeStart() throws Exception { // Register handlers Map
handlers = CommandHandlerProvider.getInstance().namedHandlers(); registerCommands(handlers); } @SuppressWarnings("rawtypes") public static void registerCommands(Map handlerMap) { if (handlerMap != null) { for (Entry e : handlerMap.entrySet()) { registerCommand(e.getKey(), e.getValue()); } } } @SuppressWarnings("rawtypes") public static void registerCommand(String commandName, CommandHandler handler) { if (StringUtil.isEmpty(commandName)) { return; } if (handlerMap.containsKey(commandName)) { CommandCenterLog.warn("Register failed (duplicate command): " + commandName); return; } handlerMap.put(commandName, handler); } 以上就是注册处理器的工作,然后我们进入
commandCenter#start
方法,此方法会创建并执行一个serverInitTask线程,在这个线程中启用一个ServerThread线程监听socket请求,在ServerThread中将接收到的socket封装成HttpEventTask由业务线程去处理。@Override public void start() throws Exception { int nThreads = Runtime.getRuntime().availableProcessors(); //创建业务线程池 this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue
(10), new NamedThreadFactory("sentinel-command-center-service-executor"), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { CommandCenterLog.info("EventTask rejected"); throw new RejectedExecutionException(); } }); Runnable serverInitTask = new Runnable() { int port; { try { //获取端口号,没有就设置默认端口8719 port = Integer.parseInt(TransportConfig.getPort()); } catch (Exception e) { port = DEFAULT_PORT; } } @Override public void run() { boolean success = false; //根据端口创建一个可用的Socket连接 ServerSocket serverSocket = getServerSocketFromBasePort(port); if (serverSocket != null) { CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort()); socketReference = serverSocket; //在主线程中在启用一个ServerThread线程监听请求 executor.submit(new ServerThread(serverSocket)); success = true; port = serverSocket.getLocalPort(); } else { CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work"); } if (!success) { port = PORT_UNINITIALIZED; } TransportConfig.setRuntimePort(port); executor.shutdown(); } }; new Thread(serverInitTask).start(); } private static ServerSocket getServerSocketFromBasePort(int basePort) { int tryCount = 0; while (true) { try { //如果发现端口占用情况则默认加一,重试三次 ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100); server.setReuseAddress(true); return server; } catch (IOException e) { tryCount++; try { TimeUnit.MILLISECONDS.sleep(30); } catch (InterruptedException e1) { break; } } } return null; } class ServerThread extends Thread { private ServerSocket serverSocket; ServerThread(ServerSocket s) { this.serverSocket = s; setName("sentinel-courier-server-accept-thread"); } @Override public void run() { while (true) { Socket socket = null; try { //socket监听 socket = this.serverSocket.accept(); setSocketSoTimeout(socket); //将接收到的socket封装成HttpEventTask由业务线程去处理 HttpEventTask eventTask = new HttpEventTask(socket); bizExecutor.submit(eventTask); } catch (Exception e) { CommandCenterLog.info("Server error", e); if (socket != null) { try { socket.close(); } catch (Exception e1) { CommandCenterLog.info("Error when closing an opened socket", e1); } } try { // In case of infinite log. Thread.sleep(10); } catch (InterruptedException e1) { // Indicates the task should stop. break; } } } } } 我们进入到
HttpEventTask#run
中,这里主要是处理接收到socket监听的请求后的业务逻辑。首先是读取消息内容,然后将消息封装成CommandRequest对象,然后在handlerMap中找到请求的commandName对应的处理器,然后执行该处理器的handle方法。@Override public void run() { if (socket == null) { return; } PrintWriter printWriter = null; InputStream inputStream = null; try { long start = System.currentTimeMillis(); inputStream = new BufferedInputStream(socket.getInputStream()); OutputStream outputStream = socket.getOutputStream(); printWriter = new PrintWriter( new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset()))); //读取消息内容 String firstLine = readLine(inputStream); CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine + ", addr: " + socket.getInetAddress()); //封装CommandRequest对象 CommandRequest request = processQueryString(firstLine); if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) { // Deal with post method processPostRequest(inputStream, request); } // Validate the target command.验证目标命令是否合法 String commandName = HttpCommandUtils.getTarget(request); if (StringUtil.isBlank(commandName)) { writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE); return; } // Find the matching command handler.找到匹配的命令处理程序 //在getHandler中就是通过handlerMap进行key的匹配 CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName); if (commandHandler != null) { //执行处理方法 CommandResponse<?> response = commandHandler.handle(request); handleResponse(response, printWriter); } else { // No matching command handler. writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`'); } long cost = System.currentTimeMillis() - start; CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine + ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms"); } catch (RequestException e) { writeResponse(printWriter, e.getStatusCode(), e.getMessage()); } catch (Throwable e) { CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e); try { if (printWriter != null) { String errorMessage = SERVER_ERROR_MESSAGE; e.printStackTrace(); if (!writtenHead) { writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage); } else { printWriter.println(errorMessage); } printWriter.flush(); } } catch (Exception e1) { CommandCenterLog.warn("Failed to write error response", e1); } } finally { closeResource(inputStream); closeResource(printWriter); closeResource(socket); } }
不同的处理器的handle方法肯定是不同的,所以我们以其中一个ModifyRulesCommandHandler处理器举例,这是个流控规则的处理器。我们先来验证这个处理器:
我们先在dashboard页面新增一个限流规则,在新增前在对应的dashboard的controller打上断点。
新增流控规则如下,先不点新增按钮:
我们先去找这个对应的代码,加上断点,然后回到页面点新增按钮,断点就会停下,如下图:
apiAddFlowRule
方法代码如下,dashboard前台将规则信息传给后台,后台封装到entity保存,然后在发布规则:@PostMapping("/rule") @AuthAction(PrivilegeType.WRITE_RULE) public Result
apiAddFlowRule(@RequestBody FlowRuleEntity entity) { Result checkResult = checkEntityInternal(entity); if (checkResult != null) { return checkResult; } entity.setId(null); Date date = new Date(); entity.setGmtCreate(date); entity.setGmtModified(date); entity.setLimitApp(entity.getLimitApp().trim()); entity.setResource(entity.getResource().trim()); try { //dashboard保存流控规则信息 entity = repository.save(entity); //发布规则 publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS); return Result.ofSuccess(entity); } catch (Throwable t) { Throwable e = t instanceof ExecutionException ? t.getCause() : t; logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e); return Result.ofFail(-1, e.getMessage()); } } //publishRules代码如下: private CompletableFuture publishRules(String app, String ip, Integer port) { //查询之前保存的规则 List rules = repository.findAllByMachine(MachineInfo.of(app, ip, port)); //发布规则:实际上是将信息通过http请求,发布到客户端,具体代码可以自己跟进去看 return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules); } apiAddFlowRule这里的这个publishRules方法,是用于发布规则到客户端的,我们可以对此进行修改,让规则发布到nacos中。
上述代码断点后查看的entity信息:
跟代码到发送http请求的地方,发现会拼接信息,然后发送POST请求,注意这里的SET_RULES_PATH是setRules
到这为止是dashboard将规则发送到sentinel客户端,我们在sentinel客户端打上断点,打断点的位置就是之前说过的
HttpEventTask#run
中,我们可以看到dashboard发送过来的请求,也可以看到SET_RULES_PATH就是setRules我们继续一步一步执行,发现setRules的处理器是ModifyRulesCommandHandler
我们查看
ModifyRulesCommandHandler#handle
的代码:@Override public CommandResponse
handle(CommandRequest request) { //强制失败fastjson过老的版本 // XXX from 1.7.2, force to fail when fastjson is older than 1.2.12 // We may need a better solution on this. if (VersionUtil.fromVersionString(JSON.VERSION) < FASTJSON_MINIMAL_VER) { // fastjson too old return CommandResponse.ofFailure(new RuntimeException("The \"fastjson-" + JSON.VERSION + "\" introduced in application is too old, you need fastjson-1.2.12 at least.")); } //获取规则类型 String type = request.getParam("type"); // rule data in get parameter //获取参数 String data = request.getParam("data"); if (StringUtil.isNotEmpty(data)) { try { data = URLDecoder.decode(data, "utf-8"); } catch (Exception e) { RecordLog.info("Decode rule data error", e); return CommandResponse.ofFailure(e, "decode rule data error"); } } RecordLog.info("Receiving rule change (type: {}): {}", type, data); String result = "success"; if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {//限流 List flowRules = JSONArray.parseArray(data, FlowRule.class); FlowRuleManager.loadRules(flowRules); if (!writeToDataSource(getFlowDataSource(), flowRules)) { result = WRITE_DS_FAILURE_MSG; } return CommandResponse.ofSuccess(result); } else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {//授权 List rules = JSONArray.parseArray(data, AuthorityRule.class); AuthorityRuleManager.loadRules(rules); if (!writeToDataSource(getAuthorityDataSource(), rules)) { result = WRITE_DS_FAILURE_MSG; } return CommandResponse.ofSuccess(result); } else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {//熔断 List rules = JSONArray.parseArray(data, DegradeRule.class); DegradeRuleManager.loadRules(rules); if (!writeToDataSource(getDegradeDataSource(), rules)) { result = WRITE_DS_FAILURE_MSG; } return CommandResponse.ofSuccess(result); } else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {//系统规则 List rules = JSONArray.parseArray(data, SystemRule.class); SystemRuleManager.loadRules(rules); if (!writeToDataSource(getSystemSource(), rules)) { result = WRITE_DS_FAILURE_MSG; } return CommandResponse.ofSuccess(result); } return CommandResponse.ofFailure(new IllegalArgumentException("invalid type")); } 我们当前传递的参数是限流,所以进入到
FlowRuleManager.loadRules(flowRules);
加载规则方法:我们一直让代码一直往下走:
我们继续进入buildFlowRuleMap:
public static
Map > buildFlowRuleMap(List list, Function groupFunction, Predicate filter, boolean shouldSort) { Map > newRuleMap = new ConcurrentHashMap<>(); if (list == null || list.isEmpty()) { return newRuleMap; } Map > tmpMap = new ConcurrentHashMap<>(); //遍历规则 for (FlowRule rule : list) { if (!isValidRule(rule)) { RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); continue; } if (filter != null && !filter.test(rule)) { continue; } if (StringUtil.isBlank(rule.getLimitApp())) { rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); } //根据流量规则生成不同的控制器 TrafficShapingController rater = generateRater(rule); rule.setRater(rater); K key = groupFunction.apply(rule); if (key == null) { continue; } Set flowRules = tmpMap.get(key); if (flowRules == null) { // Use hash set here to remove duplicate rules. flowRules = new HashSet<>(); tmpMap.put(key, flowRules); } flowRules.add(rule); } Comparator comparator = new FlowRuleComparator(); for (Entry > entries : tmpMap.entrySet()) { List rules = new ArrayList<>(entries.getValue()); if (shouldSort) { // Sort the rules. Collections.sort(rules, comparator); } newRuleMap.put(entries.getKey(), rules); } return newRuleMap; } 进入到generateRater方法:
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) { //如果限流规则为QPS,则根据不同的流控规则生成不同的处理器,这个地方使用的是策略模式 if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.getControlBehavior()) { case RuleConstant.CONTROL_BEHAVIOR_WARM_UP://预热策略 return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER://匀速排队 return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: default: // Default mode or unknown mode: default traffic shaping controller (fast-reject). } } //默认是直接拒绝策略 return new DefaultController(rule.getCount(), rule.getGrade()); }
我们让代码返回到保存的地方,然后可以看到我们创建的直接拒绝策略的处理器
然后执行完handle方法并创建完处理器后,会走到handleResponse,在这里会返回response给dashboard断开连接。
5.3.4 Sentinel进行限流
Sentinel是通过
SphU.entry(target, EntryType.IN
)代码完成限流/熔断等操作,所以我们在SphU#entry
方法上打上断点。我们跟进去,最终会走到
CtSph#entryWithPriority
方法,这个方法是 Sentinel 的骨架,非常重要。:private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //从ThreadLocal中获取Context实例 Context context = ContextUtil.getContext(); //如果是 NullContext,那么说明 context name 超过了 2000 个,参见 ContextUtil#trueEnter //这个时候,Sentinel 不再接受处理新的 context 配置,也就是不做这些新的接口的统计、限流熔断等 if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } //如果我们不显式调用 ContextUtil#enter,这里会进入到默认的 context 中 if (context == null) { // Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. //Sentinel的全局开关 if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } //这里使用了责任链模式 //下面这行代码用于构建一个责任链,入参是 resource,资源的唯一标识是 resource name ProcessorSlot
在上面的代码中,Sentinel的处理核心就在这个责任链上,链中每一个节点是一个
Slot
实例,这个链通过 BlockException 异常来告知调用入口最终的执行情况。我们进入
lookProcessChain
方法中,如果链路是空的,我们将会构建一个链路,走到newSlotChain方法:ProcessorSlot
然后我们进入到
newSlotChain
方法中,这里会将所有的Slot添加到链中这里主要是通过SPI的方式,然后构建chain并返回。public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder != null) { return slotChainBuilder.build(); } // Resolve the slot chain builder SPI. //Sentinel 提供了 SPI 端点,,我们可以定制builder slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class); if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: " + slotChainBuilder.getClass().getCanonicalName()); } //构建 return slotChainBuilder.build(); }
具体添加Slot在build方法中:
@Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // Note: the instances of ProcessorSlot should be different, since they are not stateless. List
sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } //添加Slot chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } 然后我们回到entryWithPriority方法,继续往下走会执行,会对链路中每个进行逐一调用,一直到到最后一个Slot。可以自行打断点查看。
//开启链路调用 chain.entry(context, resourceWrapper, null, count, prioritized, args);
执行完后,我们可以在断点查看当前链路中的所有slot。
在这里,我们主要看FlowSlot,因为这个是限流的插槽。
我们进入checkFlow方法,这里会根据资源的名称去找限流规则:
public void checkFlow(Function
> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //根据资源名称找到对应的限流规则 Collection rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //遍历规则以此判断是否通过 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } 我们进入
ruleProvider#apply
方法,然后再FlowRuleManager#getFlowRuleMap
中可以看到我们的限流规则,如下图:private final Function
> ruleProvider = new Function >() { @Override public Collection apply(String resource) { // Flow rule map should not be null. Map > flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } }; 我们回到checkFlow方法,找完限流规则回去执行canPassCheck方法判断规则是否通过,canPassCheck代码如下:
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } //判断是否是集群 if (rule.isClusterMode()) { return passClusterCheck(rule, context, node, acquireCount, prioritized); } //不是集群则本地检查 return passLocalCheck(rule, context, node, acquireCount, prioritized); }
我们跟进本地检查方法
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } //根据规则处理器进行检查 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
我们继续跟进canPass会进入到Defaultcontroller中,因为我们当时设置的就是直接拒绝策略
代码如下:
@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { //当前已经统计的数 int curCount = avgUsedTokens(node); //如果已统计的数+请求计数>限流数量,返回false,代表限流 if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; } private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } //如果当前是线程数限流,则返回当前线程数 //如果是QPS,则返回当前通过的qps数据 return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); }
以上就是最简单的限流的源码跟踪,建议自己跟一遍源码。
5.3.5 Sentinel持久化到Nacos
官网对于持久化的介绍
根据5.2.7的介绍,之前的配置只能修改nacos中的配置,而不能在Sentinel中进行修改,我们需要sentinel和nacos修改都能完成同步,这就需要对源码进行修改了。
我这里的版本是Sentinel DashBoard1.8,下面进行修改:
首先将源码下载下来,然后IDEA打开Sentinel DashBoard1.8源码工程,在源码工程的test中有阿里为我们写好的流控改造的案例,如下图:
然后我们将这四个文件分别拷贝到如下图的位置(放哪都行,只是这样更符合包的逻辑):
然后我们稍作修改NacosConfig和NacosConfigUtil:
NacosConfig.java
:这个文件主要是把nacos上的地址命名空间等配置好即可。@Configuration public class NacosConfig { //可以都写在配置文件里,这里是为了方便 @Value("${nacos.addr}") private String nacosAddr; @Bean public Converter
, String> flowRuleEntityEncoder() { return JSON::toJSONString; } @Bean public Converter
> flowRuleEntityDecoder() { return s -> JSON.parseArray(s, FlowRuleEntity.class); } @Bean public ConfigService nacosConfigService() throws Exception { Properties properties = new Properties(); properties.put(PropertyKeyConst.SERVER_ADDR,nacosAddr); properties.put(PropertyKeyConst.NAMESPACE,"sentinel"); properties.put(PropertyKeyConst.USERNAME,"nacos"); properties.put(PropertyKeyConst.PASSWORD,"nacos"); return ConfigFactory.createConfigService(properties); } }
NacosConfigUtil.java
:这个文件主要是nacos上配置的后缀和GroupIDpublic final class NacosConfigUtil { public static final String GROUP_ID = "SENTINEL_GROUP"; public static final String FLOW_DATA_ID_POSTFIX = "-flow-rules"; public static final String DEGRADE_DATA_ID_POSTFIX = "-degrade-rules"; public static final String PARAM_FLOW_DATA_ID_POSTFIX = "-param-rules"; public static final String SYS_DATA_ID_POSTFIX = "-system-rules"; public static final String AUTH_DATA_ID_POSTFIX = "-auth-rules"; /* public static final String GATEWAY_FLOW_DATA_ID_POSTFIX = "-gateway-flow"; public static final String GATEWAY_API_DATA_ID_POSTFIX = "-gateway-api"; public static final String CLUSTER_MAP_DATA_ID_POSTFIX = "-cluster-map";*/ /** * cc for `cluster-client` */ public static final String CLIENT_CONFIG_DATA_ID_POSTFIX = "-cc-config"; /** * cs for `cluster-server` */ public static final String SERVER_TRANSPORT_CONFIG_DATA_ID_POSTFIX = "-cs-transport-config"; public static final String SERVER_FLOW_CONFIG_DATA_ID_POSTFIX = "-cs-flow-config"; public static final String SERVER_NAMESPACE_SET_DATA_ID_POSTFIX = "-cs-namespace-set"; private NacosConfigUtil() {} }
FlowRuleNacosPublisher和FlowRuleNacosProvider不需要修改,阿里已经写好了。
然后我们把FlowControllerV1的publishRules方法给改了,并且在FlowControllerV1中注入FlowRuleNacosPublisher和FlowRuleNacosProvider。
//注入依赖 @Autowired @Qualifier("flowRuleNacosProvider") private DynamicRuleProvider
> ruleProvider; @Autowired @Qualifier("flowRuleNacosPublisher") private DynamicRulePublisher
> rulePublisher; //修改方法 // private CompletableFuture
publishRules(String app, String ip, Integer port) { // List rules = repository.findAllByMachine(MachineInfo.of(app, ip, port)); // return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules); // } private void publishRules(/*@NonNull*/ String app) throws Exception { List rules = repository.findAllByApp(app); rulePublisher.publish(app, rules); } 到此流控模块就可以实现nacos和sentinel互相同步了。
其他模块如降级模块与流控模块同理,虽然阿里只提供了流控模块的代码但是大概写法都是一样的,自己照着改一下就可以了。
我们下面来测试,打开autoDeliverSentinel和我们改造好的控制台,发送一个请求,然后我们可以看到从nacos中拉取的配置规则:
我们在Sentinel中进行修改比如将阈值修改成1或者新增一个流控规则,然后去nacos上看,发现nacos已经完成修改,然后再nacos中在改一个数,回来看Sentinel中也完成了修改。结果如下图:
六、SpringCloud高级组件
未完结