如何设计一个RPC框架
如何设计一个RPC框架?
今天,就来聊聊“如何设计一个RPC框架
”,那么首先明确一个问题什么是 RPC 呢?
RPC
是Remote Procedure Call
的缩写,即远程过程调用。
RPC 是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而开发人员无需额外地为这个交互编程。
值得注意是,两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样。接下来我们便来分析一下一次 RPC 调用发生了些什么?
RPC 调用的基本流程
现在业界内比较流行的一些 RPC 框架,例如 Dubbo 提供的是基于接口的远程方法调用,即客户端只需要知道接口的定义即可调用远程服务。在 Java 中接口并不能直接调用实例方法,必须通过其实现类对象来完成此操作,这意味着客户端必须为这些接口生成代理对象,对此 Java 提供了 Proxy
、InvocationHandler
生成动态代理的支持;生成了代理对象,那么每个具体的发方法是怎么调用的呢?JDK 动态代理生成的代理对象调用指定方法时实际会执行InvocationHandler
中定义的 #invoke
方法,在该方法中完成远程方法调用并获取结果。
抛开客户端,回过头来看 RPC 是两台计算机间的调用,实质上是两台主机间的网络通信,涉及到网络通信又必然会有序列化、反序列化,编解码等一些必须要考虑的问题;同时实际上现在大多系统都是集群部署的,多台主机/容器对外提供相同的服务,如果集群的节点数量很大的话,那么管理服务地址也将是一件十分繁琐的事情,常见的做法是各个服务节点将自己的地址和提供的服务列表注册到一个注册中心,由注册中心来统一管理服务列表;这样的做法解决了一些问题同时为客户端增加了一项新的工作——那就是服务发现,通俗来说就是从注册中心中找到远程方法对应的服务列表并通过某种策略从中选取一个服务地址来完成网络通信。
聊了客户端和注册中心,另外一个重要的角色自然是服务端,服务端最重要的任务便是提供服务接口的真正实现并在某个端口上监听网络请求,监听到请求后从网络请求中获取到对应的参数(比如服务接口、方法、请求参数等),再根据这些参数通过反射的方式调用接口的真正实现获取结果并将其写入对应的响应流中。
综上所述,一次基本的 RPC 调用流程大致如下:

基本实现
服务端(生产者)
服务接口:
在 RPC 中,生产者和消费者有一个共同的服务接口 API。如下,定义一个 HelloService 接口。
/**
* @Descrption 服务接口
***/
public interface HelloService {
String sayHello(String somebody);
}
服务实现:
生产者要提供服务接口的实现,创建 HelloServiceImpl 实现类。
/**
* @Descrption 服务实现
***/
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String somebody) {
return "hello " + somebody + "!";
}
}
服务注册:
本例使用 Spring 来管理 bean,采用自定义 XML 和解析器的方式来将服务实现类载入容器(当然也可以采用自定义注解的方式,此处不过多论述)并将服务接口信息注册到注册中心。
首先自定义 XSD:
:element name="service">
:complexType>
:complexContent>
:extension base="beans:identifiedType">
:attribute name="interface" type="xsd:string" use="required"/>
:attribute name="timeout" type="xsd:int" use="required"/>
:attribute name="serverPort" type="xsd:int" use="required"/>
:attribute name="ref" type="xsd:string" use="required"/>
:attribute name="weight" type="xsd:int" use="optional"/>
:attribute name="workerThreads" type="xsd:int" use="optional"/>
:attribute name="appKey" type="xsd:string" use="required"/>
:attribute name="groupName" type="xsd:string" use="optional"/>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
分别指定 Schema 和 XSD,Schema 和对应 Handler 的映射。
Schema:
http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd
http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd
Handler:
http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler
http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler
将编写好的文件放入 Classpath 下的 META-INF 目录下:

在 Spring 配置文件中配置服务类:
<!-- 发布远程服务 -->
="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
:service id="helloServiceRegister"
interface="com.hsunfkqm.storm.framework.test.HelloService"
ref="helloService"
groupName="default"
weight="2"
appKey="ares"
workerThreads="100"
serverPort="8081"
timeout="600"/>
编写对应的 Handler 和 Parser:
StormServiceNamespaceHandler:
import org.springframework.beans.factory.xml.NamespaceHandlerSupport;
/**
* @author 孙浩
* @Descrption 服务发布自定义标签
***/
public class StormServiceNamespaceHandler extends NamespaceHandlerSupport {
@Override
public void init() {
registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser());
}
}
ProviderFactoryBeanDefinitionParser:
protected Class getBeanClass(Element element) {
return ProviderFactoryBean.class;
}
protected void doParse(Element element, BeanDefinitionBuilder bean) {
try {
String serviceItf = element.getAttribute("interface");
String serverPort = element.getAttribute("serverPort");
String ref = element.getAttribute("ref");
// ....
bean.addPropertyValue("serverPort", Integer.parseInt(serverPort));
bean.addPropertyValue("serviceItf", Class.forName(serviceItf));
bean.addPropertyReference("serviceObject", ref);
//...
if (NumberUtils.isNumber(weight)) {
bean.addPropertyValue("weight", Integer.parseInt(weight));
}
//...
} catch (Exception e) {
// ...
}
}
ProviderFactoryBean:
/**
* @Descrption 服务发布
***/
public class ProviderFactoryBean implements FactoryBean, InitializingBean {
//服务接口
private Class<?> serviceItf;
//服务实现
private Object serviceObject;
//服务端口
private String serverPort;
//服务超时时间
private long timeout;
//服务代理对象,暂时没有用到
private Object serviceProxyObject;
//服务提供者唯一标识
private String appKey;
//服务分组组名
private String groupName = "default";
//服务提供者权重,默认为 1 , 范围为 [1-100]
private int weight = 1;
//服务端线程数,默认 10 个线程
private int workerThreads = 10;
@Override
public Object getObject() throws Exception {
return serviceProxyObject;
}
@Override
public Class<?> getObjectType() {
return serviceItf;
}
@Override
public void afterPropertiesSet() throws Exception {
//启动 Netty 服务端
NettyServer.singleton().start(Integer.parseInt(serverPort));
//注册到 zk, 元数据注册中心
List> providerServiceList = buildProviderServiceInfos();
IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
registerCenter4Provider.registerProvider(providerServiceList);
}
}
//================RegisterCenter#registerProvider======================
@Override
public void registerProvider(final List> serviceMetaData) {
if (CollectionUtils.isEmpty(serviceMetaData)) {
return;
}
//连接 zk, 注册服务
synchronized (RegisterCenter.class) {
for (ProviderService provider : serviceMetaData) {
String serviceItfKey = provider.getServiceItf().getName();
List> providers = providerServiceMap.get(serviceItfKey);
if (providers == null) {
providers = Lists.newArrayList();
}
providers.add(provider);
providerServiceMap.put(serviceItfKey, providers);
}
if (zkClient == null) {
zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
}
//创建 ZK 命名空间/当前部署应用 APP 命名空间/
String APP_KEY = serviceMetaData.get(0).getAppKey();
String ZK_PATH = ROOT_PATH + "/" + APP_KEY;
boolean exist = zkClient.exists(ZK_PATH);
if (!exist) {
zkClient.createPersistent(ZK_PATH, true);
}
for (Map