grpc双向流究竟是什么情况?2段代码告诉你


摘要:为什么需要grpc双向流?

本文分享自华为云社区《grpc双向流究竟是什么情况?2段代码告诉你》,作者:breakDawn。

为什么需要grpc双向流?

有时候请求调用和返回过程,并不是简单的一问一答形式,可能会涉及一次发送,多次分批返回,或者两边随意互相发送。

因此简单的restful模型无法满足上述常见,grpc双向流应运而生,通过一个tpc链接实现了双向的异步IO通信。

grpc双向流

一个双向流式RPC是双方使用读写流去发送一个消息序列。

两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如,服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。

  • 可以理解为常见IO模型里的异步IO的使用

每个流中的消息顺序被预留。你可以通过在请求和响应前加stream关键字去制定方法的类型。

//AcceptsastreamofRouteNotessentwhilearouteisbeingtraversed,
//whilereceivingotherRouteNotes(e.g.fromotherusers).
rpcRouteChat(streamRouteNote)returns(streamRouteNote){}

客户端的双向流调用

  1. 定义一个reponseOberserver,即响应观察者,用于定义如何处理服务端返回的消息。一般都是把消息放到一个某个阻塞队列或者单容量队列SettableFuture中。
  2. 调用stub.sendMessage(reponseOberserver),即告诉grpc框架,我要用这个reponseOberserver去处理sendMessage消息的响应。
    注意,这个sendMesage方法名,取决于我们的proto中怎么定义的。
  3. 然后stub.sendMessage()方法回返回给我们一个requestObserver,让我们用这个观察者.onNext()去发送请求,可以任意发多次,都是立刻返回的。
  4. 当不需要再发送时,可以调用onCompleted告知对方可以结束了
    下面是官网摘抄的代码示例:
publicvoidrouteChat()throwsException{
info("***RoutChat");
finalSettableFuturefinishFuture=SettableFuture.create();
//定义了如何处理收到的返回消息观察者
    StreamObserverreponseObserver=newStreamObserver(){
@Override
publicvoidonNext(RouteNotenote){
info("Gotmessage\"{0}\"at{1},{2}",note.getMessage(),note.getLocation()
.getLatitude(),note.getLocation().getLongitude());
}

@Override
publicvoidonError(Throwablet){
finishFuture.setException(t);
}

@Override
publicvoidonCompleted(){
            //往finishFuture设置空时,说明完成了消息流关闭了
finishFuture.set(null);
}
};

//框架返回给我一个请求流观察者,让我用这个观察者.onNext(message)去发请求,返回结果和我传给他的responseServer绑定了。
StreamObserverrequestObserver=
asyncStub.routeChat();

try{
RouteNote[]requests=
{newNote("Firstmessage",0,0),newNote("Secondmessage",0,1),
newNote("Thirdmessage",1,0),newNote("Fourthmessage",1,1)};

for(RouteNoterequest:requests){
info("Sendingmessage\"{0}\"at{1},{2}",request.getMessage(),request.getLocation()
.getLatitude(),request.getLocation().getLongitude());
requestObserver.onNext(request);
}
requestObserver.onCompleted();

finishFuture.get();
info("FinishedRouteChat");
}catch(Exceptiont){
requestObserver.onError(t);
logger.log(Level.WARNING,"RouteChatFailed",t);
throwt;
}
}

服务端的处理方式:

  1. 我们建立服务端的时候,需要调用nettyServer,建立netty服务,并绑定一个xxxServiceImpl抽象类。这个xxxServiceImpl就是我们在proto中定义的server结构,支持处理我们定义的消息。
  2. xxxServiceImpl中,有很多需要覆写的方法,需要你定义如何处理收到的请求,以及如何给客户端发送响应。发送响应的动作就是参数里的requestObserver.onNext(响应消息)
  3. 返回的xxxService类,会在第一步提供给netty以及grpc框架,收到消息时,会通过他的异步机制,分隔网络线程和业务线程,走到这边执行的地方。
    下面是官网摘抄的代码示例:
class    xxxServiceextendxxxServiceImpl{
@Override
publicvoidlistFeatures(Rectanglerequest,StreamObserverresponseObserver){
intleft=min(request.getLo().getLongitude(),request.getHi().getLongitude());
intright=max(request.getLo().getLongitude(),request.getHi().getLongitude());
inttop=max(request.getLo().getLatitude(),request.getHi().getLatitude());
intbottom=min(request.getLo().getLatitude(),request.getHi().getLatitude());

for(Featurefeature:features){
if(!RouteGuideUtil.exists(feature)){
continue;
}

intlat=feature.getLocation().getLatitude();
intlon=feature.getLocation().getLongitude();
if(lon>=left&&lon<=right&&lat>=bottom&&lat<=top){
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
}

 

点击关注,第一时间了解华为云新鲜技术~