python grpc
grpc 远程过程调用
优势:采用protobuf二进制消息,效率高;序列化后消息体积小;节省网络流量;序列化和反序列化直接对应程序中数据类;
劣势:protobuf二进制可读性差
使用protobuf数据传输,protobuf是一种数据交换格式,由三部分构成
proto 文件 :使用的proto语法的文本文件,定义数据格式
protoc:protobuf编译器,将proto文件编译成不同语言的实现,用以数据交互
protobuf运行时:protobuf运行时所需要的库,和protoc编译生成的代码进行交互
使用protobuf过程
编写proto文件->使用protoc编译->添加protobuf运行时->项目中集成
更新:
修改proto文件>使用proto重新编译>项目中修改集成的地方
python 1.安装grpc模块 pip install grpcio --------------------------------------------------------------------------------------------- 2.安装编译器 pip install grpcio-tools ----------------------------------------------------------------------------------------------- 3.编写proto文件 //创建文件test.proto // [python quickstart](https://grpc.io/docs/quickstart/python.html#run-a-grpc-application) // python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. test.proto // test.proto //proto语法有proto2和proto3,后者简洁明了 syntax = "proto3"; //定义服务 //有两个函数,参数为CParams和GParams,输出为Data service Test { rpc TestCreate(CParams) returns (Data) {} rpc TestGet(GParams) returns (Data) {} } message CParams { string name = 1; } message GParams { string id = 1; } message Data { string message = 1; string data =2; } ------------------------------------------------------------------------------------------- 4.编译proto文件 python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. test.proto python -m grpc_tools.protoc: python 下的 protoc 编译器通过 python 模块(module) 实现, 所以说这一步非常省心 --python_out=. : 编译生成处理 protobuf 相关的代码的路径, 这里生成到当前目录 --grpc_python_out=. : 编译生成处理 grpc 相关的代码的路径, 这里生成到当前目录 -I. test.proto : proto 文件的路径, 这里的 proto 文件在当前目录 --------------------------------------------------------------------------------------------- 5.生成文件 test_pub2.py和test_pub2_grpc.py test_pub2.py # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: test.proto """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( name='test.proto', package='', syntax='proto3', serialized_options=None, create_key=_descriptor._internal_create_key, serialized_pb=b'\n\ntest.proto\"\x17\n\x07\x43Params\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x15\n\x07GParams\x12\n\n\x02id\x18\x01 \x01(\t\"%\n\x04\x44\x61ta\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\t2E\n\x04Test\x12\x1f\n\nTestCreate\x12\x08.CParams\x1a\x05.Data\"\x00\x12\x1c\n\x07TestGet\x12\x08.GParams\x1a\x05.Data\"\x00\x62\x06proto3' ) _CPARAMS = _descriptor.Descriptor( name='CParams', full_name='CParams', filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='name', full_name='CParams.name', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], nested_types=[], enum_types=[ ], serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], serialized_start=14, serialized_end=37, ) _GPARAMS = _descriptor.Descriptor( name='GParams', full_name='GParams', filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='id', full_name='GParams.id', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], nested_types=[], enum_types=[ ], serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], serialized_start=39, serialized_end=60, ) _DATA = _descriptor.Descriptor( name='Data', full_name='Data', filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='message', full_name='Data.message', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='data', full_name='Data.data', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], nested_types=[], enum_types=[ ], serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], serialized_start=62, serialized_end=99, ) DESCRIPTOR.message_types_by_name['CParams'] = _CPARAMS DESCRIPTOR.message_types_by_name['GParams'] = _GPARAMS DESCRIPTOR.message_types_by_name['Data'] = _DATA _sym_db.RegisterFileDescriptor(DESCRIPTOR) CParams = _reflection.GeneratedProtocolMessageType('CParams', (_message.Message,), { 'DESCRIPTOR' : _CPARAMS, '__module__' : 'test_pb2' # @@protoc_insertion_point(class_scope:CParams) }) _sym_db.RegisterMessage(CParams) GParams = _reflection.GeneratedProtocolMessageType('GParams', (_message.Message,), { 'DESCRIPTOR' : _GPARAMS, '__module__' : 'test_pb2' # @@protoc_insertion_point(class_scope:GParams) }) _sym_db.RegisterMessage(GParams) Data = _reflection.GeneratedProtocolMessageType('Data', (_message.Message,), { 'DESCRIPTOR' : _DATA, '__module__' : 'test_pb2' # @@protoc_insertion_point(class_scope:Data) }) _sym_db.RegisterMessage(Data) _TEST = _descriptor.ServiceDescriptor( name='Test', full_name='Test', file=DESCRIPTOR, index=0, serialized_options=None, create_key=_descriptor._internal_create_key, serialized_start=101, serialized_end=170, methods=[ _descriptor.MethodDescriptor( name='TestCreate', full_name='Test.TestCreate', index=0, containing_service=None, input_type=_CPARAMS, output_type=_DATA, serialized_options=None, create_key=_descriptor._internal_create_key, ), _descriptor.MethodDescriptor( name='TestGet', full_name='Test.TestGet', index=1, containing_service=None, input_type=_GPARAMS, output_type=_DATA, serialized_options=None, create_key=_descriptor._internal_create_key, ), ]) _sym_db.RegisterServiceDescriptor(_TEST) DESCRIPTOR.services_by_name['Test'] = _TEST # @@protoc_insertion_point(module_scope) test_pub2_grpc.py # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc import test_pb2 as test__pb2 class TestStub(object): """Missing associated documentation comment in .proto file.""" def __init__(self, channel): """Constructor. Args: channel: A grpc.Channel. """ self.TestCreate = channel.unary_unary( '/Test/TestCreate', request_serializer=test__pb2.CParams.SerializeToString, response_deserializer=test__pb2.Data.FromString, ) self.TestGet = channel.unary_unary( '/Test/TestGet', request_serializer=test__pb2.GParams.SerializeToString, response_deserializer=test__pb2.Data.FromString, ) class TestServicer(object): """Missing associated documentation comment in .proto file.""" def TestCreate(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def TestGet(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def add_TestServicer_to_server(servicer, server): rpc_method_handlers = { 'TestCreate': grpc.unary_unary_rpc_method_handler( servicer.TestCreate, request_deserializer=test__pb2.CParams.FromString, response_serializer=test__pb2.Data.SerializeToString, ), 'TestGet': grpc.unary_unary_rpc_method_handler( servicer.TestGet, request_deserializer=test__pb2.GParams.FromString, response_serializer=test__pb2.Data.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'Test', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) # This class is part of an EXPERIMENTAL API. class Test(object): """Missing associated documentation comment in .proto file.""" @staticmethod def TestCreate(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/Test/TestCreate', test__pb2.CParams.SerializeToString, test__pb2.Data.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @staticmethod def TestGet(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary(request, target, '/Test/TestGet', test__pb2.GParams.SerializeToString, test__pb2.Data.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) ----------------------------------------------------------------------------------------------- 6.编写server.py from concurrent import futures import time import json import grpc import test_pb2 import test_pb2_grpc # 实现 proto 文件中定义的 TestServicer class Test(test_pb2_grpc.TestServicer): # 继承并重写 # 实现 proto 文件中定义的 rpc 调用 # 通过request接收参数 def TestCreate(self, request, context): data={ '加速度':0.01 } return test_pb2.Data(message = f'hello {request.name}',data=json.dumps(data)) def TestGet(self, request, context): data = { '速度': 0.1 } return test_pb2.Data(message=f'hello {request.id}', data=json.dumps(data)) def serve(): # 启动 rpc 服务 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) test_pb2_grpc.add_TestServicer_to_server(Test(), server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(60*60*24) # one day in seconds except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve() -------------------------------------------------------------------------------------------- 7.编写client.py import grpc import test_pb2 import test_pb2_grpc import json def run(): # 连接 rpc 服务器 channel = grpc.insecure_channel('localhost:50051') # 调用 rpc 服务 stub = test_pb2_grpc.TestStub(channel) # 调用方法,传入参数 response = stub.TestCreate(test_pb2.CParams(name='hh')) # response包含 # message: "hello \351\203\235\346\205\247\346\230\216" # data: "{\"\\u52a0\\u901f\\u5ea6\": 0.01}" print(response) # hello hhprint(response.message,type(response.data)) # 返回嵌套dict类型,可以json实现dumps,loads response = stub.TestGet(test_pb2.GParams(id='go')) data=json.loads(response.data) print(data['速度']) if __name__ == '__main__': run()
#运行 python server.py python client.py