项目结构
- rpc-provider,服务提供者。负责发布 RPC 服务,接收和处理 RPC 请求。
- rpc-consumer,服务消费者。使用动态代理发起 RPC 远程调用,帮助使用者来屏蔽底层网络通信的细节。
- rpc-registry,注册中心模块。提供服务注册、服务发现、负载均衡的基本功能。
- rpc-protocol,网络通信模块。包含 RPC 协议的编解码器、序列化和反序列化工具等。
- rpc-core,基础类库。提供通用的工具类以及模型定义,例如 RPC 请求和响应类、RPC 服务元数据类等。
- rpc-facade,RPC 服务接口。包含服务提供者需要对外暴露的接口,本模块主要用于模拟真实 RPC 调用的测试。
如何使用
举个例子:
// rpc-facade # HelloFacade
public interface HelloFacade {
String hello(String name);
}
// rpc-provider # HelloFacadeImpl
@RpcService(serviceInterface = HelloFacade.class, serviceVersion = "1.0.0")
public class HelloFacadeImpl implements HelloFacade {
@Override
public String hello(String name) {
return "hello" + name;
}
}
// rpc-consumer # HelloController
@RestController
public class HelloController {
@RpcReference(serviceVersion = "1.0.0", timeout = 3000)
private HelloFacade helloFacade;
@RequestMapping(value = "/hello", method = RequestMethod.GET)
public String sayHello() {
return helloFacade.hello("mini rpc");
}
}
rpc-provider 和 rpc-consumer 两个模块独立启动,模拟服务端和客户端。rpc-provider 通过 @RpcService 注解暴露 RPC 服务 HelloFacade,rpc-consumer 通过 @RpcReference 注解引用 HelloFacade 服务并发起调用,基本与我们常用的 RPC 框架使用方式保持一致。
服务发布与订阅
服务提供者发布服务
服务提供者 rpc-provider 需要完成哪些事情呢?主要分为四个核心流程:
- 服务提供者启动服务,并暴露服务端口;
- 启动时扫描需要对外发布的服务,并将服务元数据信息发布到注册中心;
- 接收 RPC 请求,解码后得到请求消息;
- 提交请求至自定义线程池进行处理,并将处理结果写回客户端。
服务提供者启动
服务提供者启动的配置方式基本是固定模式,也是从引导器 Bootstrap 开始入手。我们首先看下服务提供者的启动实现,代码如下所示:
private void startRpcServer() throws Exception {
this.serverAddress = InetAddress.getLocalHost().getHostAddress();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
log.info("server addr {} started on port {}", this.serverAddress, this.serverPort);
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
服务提供者采用的是主从 Reactor 线程模型,启动过程包括配置线程池、Channel 初始化、端口绑定三个步骤,我们暂时先不关注 Channel 初始化中自定义的业务处理器 Handler 是如何设计和实现的。
参数配置
服务提供者启动需要配置一些参数,我们不应该把这些参数固定在代码里,而是以命令行参数或者配置文件的方式进行输入。我们可以使用 Spring Boot 的 @ConfigurationProperties 注解很轻松地实现配置项的加载,并且可以把相同前缀类型的配置项自动封装成实体类。接下来我们为服务提供者提供参数映射的对象:
@Data
@ConfigurationProperties(prefix = "rpc")
public class RpcProperties {
/**
* 服务暴露的端口
*/
private int servicePort;
/**
* 注册中心的地址
*/
private String registryAddr;
/**
* 注册中心的类型
*/
private String registryType;
}
我们一共提取了三个参数,分别为服务暴露的端口 servicePort、注册中心的地址 registryAddr 和注册中心的类型 registryType。@ConfigurationProperties 注解最经典的使用方式就是通过 prefix 属性指定配置参数的前缀,默认会与全局配置文件 application.properties 或者 application.yml 中的参数进行一一绑定。如果你想自定义一个配置文件,可以通过 @PropertySource 注解指定配置文件的位置。下面我们在 rpc-provider 模块的 resources 目录下创建全局配置文件 application.properties,并配置以上三个参数:
rpc.servicePort=2781
rpc.registryType=ZOOKEEPER
rpc.registryAddr=127.0.0.1:2181
application.properties 配置文件中的属性必须和实体类的成员变量是一一对应的,可以采用以下常用的命名规则,例如驼峰命名 rpc.servicePort=2781;或者虚线 - 分割的方式 rpc.service-port=2781;以及大写加下划线的形式 RPC_Service_Port,建议在环境变量中使用。@ConfigurationProperties 注解还可以支持更多复杂结构的配置,并且可以 Validation 功能进行参数校验,如果有兴趣可以自行研究。
有了 RpcProperties 实体类,我们接下来应该如何使用呢?如果只配置 @ConfigurationProperties 注解,Spring 容器并不能获取配置文件的内容并映射为对象,这时 @EnableConfigurationProperties 注解就登场了。@EnableConfigurationProperties 注解的作用就是将声明 @ConfigurationProperties 注解的类注入为 Spring 容器中的 Bean。具体用法如下:
@Configuration
@EnableConfigurationProperties(RpcProperties.class)
public class RpcProviderAutoConfiguration {
@Resource
private RpcProperties rpcProperties;
@Bean
public RpcProvider init() throws Exception {
RegistryType type = RegistryType.valueOf(rpcProperties.getRegistryType());
//构建单例注册服务
RegistryService serviceRegistry = RegistryFactory.getInstance(rpcProperties.getRegistryAddr(), type);
return new RpcProvider(rpcProperties.getServicePort(), serviceRegistry);
}
}
我们通过 @EnableConfigurationProperties 注解使得 RpcProperties 生效,并通过 @Configuration 和 @Bean 注解自定义了 RpcProvider 的生成方式。@Configuration 主要用于定义配置类,配置类内部可以包含多个 @Bean 注解的方法,可以替换传统 XML 的定义方式。被 @Bean 注解的方法会返回一个自定义的对象,@Bean 注解会将这个对象注册为 Bean 并装配到 Spring 容器中,@Bean 比 @Component 注解的自定义功能更强。
至此,我们服务提供者的准备工作就完成了,下面添加 Spring Boot 的 main 方法,如下所示,然后尝试启动下 rpc-provider 模块。
@EnableConfigurationProperties
@SpringBootApplication
public class RpcProviderApplication {
public static void main(String[] args) {
SpringApplication.run(RpcProviderApplication.class, args);
}
}
发布服务
在服务提供者启动时,我们需要思考一个核心问题,服务提供者需要将服务发布到注册中心,怎么知道哪些服务需要发布呢?服务提供者需要定义发布服务类型、服务版本等属性,主流的 RPC 框架都采用 XML 文件或者注解的方式进行定义。以注解的方式暴露服务现在最为常用,省去了很多烦琐的 XML 配置过程。例如 Dubbo 框架中使用 @Service 注解替代 dubbo:service 的定义方式,服务消费者则使用 @Reference 注解替代 dubbo:reference。接下来我们看看作为服务提供者,如何通过注解暴露服务,首先给出我们自定义的 @RpcService 注解定义:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Component
public @interface RpcService {
/**
* 服务类型
* @return
*/
Class<?> serviceInterface() default Object.class;
/**
* 服务版本
* @return
*/
String serviceVersion() default "1.0";
}
@RpcService 提供了两个必不可少的属性:服务类型 serviceInterface 和服务版本 serviceVersion,服务消费者必须指定完全一样的属性才能正确调用。有了 @RpcService 注解之后,我们就可以在服务实现类上使用它,@RpcService 注解本质上就是 @Component,可以将服务实现类注册成 Spring 容器所管理的 Bean,那么 serviceInterface、serviceVersion 的属性值怎么才能和 Bean 关联起来呢?这就需要我们就 Bean 的生命周期以及 Bean 的可扩展点有所了解。
Spring 的 BeanPostProcessor 接口给提供了对 Bean 进行再加工的扩展点,BeanPostProcessor 常用于处理自定义注解。自定义的 Bean 可以通过实现 BeanPostProcessor 接口,在 Bean 实例化的前后加入自定义的逻辑处理。如下所示,我们通过 RpcProvider 实现 BeanPostProcessor 接口,来实现对 声明 @RpcService 注解服务的自定义处理。
/**
* Spring 的 BeanPostProcessor 接口给提供了对 Bean 进行再加工的扩展点,BeanPostProcessor 常用于处理自定义注解。
* 自定义的 Bean 可以通过实现 BeanPostProcessor 接口,在 Bean 实例化的前后加入自定义的逻辑处理。
*/
@Slf4j
public class RpcProvider implements InitializingBean, BeanPostProcessor {
private String serverAddress;
private final int serverPort;
private final RegistryService serviceRegistry;
private final Map rpcServiceMap = new HashMap<>();
public RpcProvider(int serverPort, RegistryService serviceRegistry) {
this.serverPort = serverPort;
this.serviceRegistry = serviceRegistry;
}
@Override
public void afterPropertiesSet() {
new Thread(() -> {
try {
startRpcServer();
} catch (Exception e) {
log.error("start rpc server error.", e);
}
}).start();
}
private void startRpcServer() throws Exception {
this.serverAddress = InetAddress.getLocalHost().getHostAddress();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new MiniRpcEncoder())
.addLast(new MiniRpcDecoder())
.addLast(new RpcRequestHandler(rpcServiceMap));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
log.info("server addr {} started on port {}", this.serverAddress, this.serverPort);
channelFuture.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
/**
* 对所有初始化完成后的 Bean 进行扫描。
* 如果 Bean 包含 @RpcService 注解,那么通过注解读取服务的元数据信息并构造出 ServiceMeta 对象,接下来准备将服务的元数据信息发布至注册中心。
* 此外,RpcProvider 还维护了一个 rpcServiceMap,存放服务初始化后所对应的 Bean,rpcServiceMap 起到了缓存的角色,
* 在处理 RPC 请求时可以直接通过 rpcServiceMap 拿到对应的服务进行调用。
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
if (rpcService != null) {
String serviceName = rpcService.serviceInterface().getName();
String serviceVersion = rpcService.serviceVersion();
try {
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setServiceAddr(serverAddress);
serviceMeta.setServicePort(serverPort);
serviceMeta.setServiceName(serviceName);
serviceMeta.setServiceVersion(serviceVersion);
serviceRegistry.register(serviceMeta);
rpcServiceMap.put(RpcServiceHelper.buildServiceKey(serviceMeta.getServiceName(), serviceMeta.getServiceVersion()), bean);
} catch (Exception e) {
log.error("failed to register service {}#{}", serviceName, serviceVersion, e);
}
}
return bean;
}
}
RpcProvider 重写了 BeanPostProcessor 接口的 postProcessAfterInitialization 方法,对所有初始化完成后的 Bean 进行扫描。如果 Bean 包含 @RpcService 注解,那么通过注解读取服务的元数据信息并构造出 ServiceMeta 对象,接下来准备将服务的元数据信息发布至注册中心,注册中心的实现后续再细说。此外,RpcProvider 还维护了一个 rpcServiceMap,存放服务初始化后所对应的 Bean,rpcServiceMap 起到了缓存的角色,在处理 RPC 请求时可以直接通过 rpcServiceMap 拿到对应的服务进行调用。
服务消费者订阅服务
与服务提供者不同的是,服务消费者并不是一个常驻的服务,每次发起 RPC 调用时它才会去选择向哪个远端服务发送数据。所以服务消费者的实现要复杂一些,对于声明 @RpcReference 注解的成员变量,我们需要构造出一个可以真正进行 RPC 调用的 Bean,然后将它注册到 Spring 的容器中。
首先我们看下 @RpcReference 注解的定义,代码如下所示:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface RpcReference {
String serviceVersion() default "1.0";
String registryType() default "ZOOKEEPER";
String registryAddress() default "127.0.0.1:2181";
long timeout() default 5000;
}
@RpcReference 注解提供了服务版本 serviceVersion、注册中心类型 registryType、注册中心地址 registryAddress 和超时时间 timeout 四个属性,接下来我们需要使用这些属性构造出一个自定义的 Bean,并对该 Bean 执行的所有方法进行拦截。
Spring 的 FactoryBean 接口可以帮助我们实现自定义的 Bean,FactoryBean 是一种特种的工厂 Bean,通过 getObject() 方法返回对象,而并不是 FactoryBean 本身。
public class RpcReferenceBean implements FactoryBean