protobuf和grpc进阶


protobuf和grpc进阶

1、基本类型、默认值

  • 当值不被传递时,使用默认值

  • 数组的实现

    message HelloReply {
        string message = 1;
        repeated int32 id = 2;
    }
    
  • Python数组类型,不能先定义后赋值,可以使用数据拼接或append添加

2、option go_package的作用

|-proto
|--common
|---stream
|----proto
//option go_package = ".;proto";
//option go_package = "common/stream/proto/v1";
option go_package = "../../common/stream/proto/v1";

3、当proto文件不同步的时候容易出现的问题

  • 数据序列化用序列号,不是用变量名

4、proto文件中引入其他的proto文件

// base.proto
syntax = "proto3";

//message Empty{}

message Pong{
  string id = 1;
}
// helloworld.proto
syntax = "proto3";
import "base.proto";

import "google/protobuf/empty.proto";

option go_package = ".;proto";

service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply);
    rpc Ping(google.protobuf.Empty) returns (Pong);
}
//将 sessionid放入 放入cookie中 http协议
message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
    repeated int32 id = 2;
}
from google.protobuf.empty_pb2 import Empty

5、嵌套的message对象

syntax = "proto3";

message Result{
  string name = 1;
  string url = 2;
}

message HelloReply{
  string message = 1;
  Result info = 2;
}
# 使用
result=HelloReply.Result()
package client

import (
	"fmt"
	"self_python_go/demo/proto_nested/proto"
)

func main() {
	message := proto.HelloReply{
		Message: "测试信息",
		Info: &proto.Result{
			Name: "xin",
			Url:  "www.baidu.com",
		},
	}
	fmt.Println(message)
}

6、protobuf中的enum枚举类型

enum Gender{
  MALE = 0;
  FEMALE = 1;
}

message HelloReply {
  string message = 1;
  Gender sex = 2;

  message Result {
    string name = 1;
    string url = 2;
  }
}
Gender_MALE

7、map类型

message HelloRequest {
    string name = 1;
    map  mp = 2;
}
map[string]string {
  "name": "xin",
  "company": "哈哈",
}

8、使用protobuf内置的timestamp类型

syntax = "proto3";
import "google/protobuf/timestamp.proto";
option go_package = ".;proto";

message HelloRequest {
    string name = 1;
    map  mp = 2;
    google.protobuf.Timestamp addTime = 3;
}
import (
	timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

{
  AddTime: timestamppb.New(time.Now())
}

9、grpc配合asyncio使用

需要python3.7+

  • pipenv install grpclib 多安装
syntax = "proto3";

service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}
  • python -m grpc_tools.protoc --python_out=. --grpclib_python_out=. -I. *.proto 命令有变化
import asyncio

from grpclib.utils import graceful_exit
from grpclib.server import Server

# generated by protoc
from .hello_pb2 import HelloReply
from .hello_grpc import GreeterBase


class Greeter(GreeterBase):
    async def SayHello(self, stream):
        request = await stream.recv_message()
        message = f'Hello, {request.name}!'
        await stream.send_message(HelloReply(message=message))


async def main(*, host='127.0.0.1', port=50051):
    server = Server([Greeter()])
    # Note: graceful_exit isn't supported in Windows
    # with  graceful_exit([server]):
    await server.start(host, port)
    print(f'Serving on {host}:{port}')
    await server.wait_closed()


if __name__ == '__main__':
    asyncio.run(main())
import asyncio

from grpclib.client import Channel

# generated by protoc
from .helloworld_pb2 import HelloRequest, HelloReply
from .helloworld_grpc import GreeterStub

async def main():
    async with Channel('127.0.0.1', 50051) as channel:
        greeter = GreeterStub(channel)
        reply = await greeter.SayHello(HelloRequest(name='Dr. Strange'))
        print(reply.message)

if __name__ == '__main__':
    asyncio.run(main())

10 、grpc的metadata机制-go

 // 第一种方式
md := metadata.New(map[string]string{"key1":"val1","key2":"val2","key3":"val3"})
// 第二种方式,key不区分大小写,统一转换为小写
md := metadata.Pairs(
	"key1","val1",
	"key2","val2",
	"key3","val3"
	)
//新建一个有metadata的context
ctx := metadata.NewOutgoingContext(context.Background(), md)
//单向RPC
response, err := client.SomeRPC(ctx, someRequest)xxxxxxxxxx // 第一种方式md := metadata.New(map[string]string{"key1":"val1","key2":"val2","key3":"val3"})// 第二种方式,key不区分大小写,统一转换为小写md := metadata.Pairs(    "key1","val1",    "key2","val2",    "key3","val3"    )

md, ok = metadata.FormIncomingContext(ctx)
//server.go
package main

import (
	"context"
	"fmt"
	"go_xin/grpc_go/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	"net"
)

type Server struct{}

func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
	md , ok := metadata.FromIncomingContext(ctx)

	if ok {
		fmt.Println("get Error")
	}
	for key, val := range md{
		fmt.Println(key, val)
	}

	if name, ok := md["name"]; ok {
		fmt.Println(name)
	}

	return &proto.HelloReply{
		Message: "Hello " + req.Name,
	}, nil
}

func main() {
	//实例一个grpc的server
	g := grpc.NewServer()
	// 注册
	proto.RegisterGreeterServer(g, &Server{})
	// 启动
	lis, err := net.Listen("tcp", "0.0.0.0:1234")
	if err != nil {
		panic("failed to listen" + err.Error())
	}
	err = g.Serve(lis)
	if err != nil {
		panic("failed to start grpc:" + err.Error())
	}
}
//client.go
package main

import (
	"context"
	"fmt"
	"go_xin/grpc_go/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:1234", grpc.WithInsecure()) //拨号
	if err != nil{
		panic(err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)
	//md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	md := metadata.New(map[string]string{
		"name": "xin",
		"password": "12345678",
	})
	ctx := metadata.NewOutgoingContext(context.Background(), md)

	r, err := c.SayHello(ctx, &proto.HelloRequest{Name: "xin"})
	if err != nil{
		panic(err)
	}
	fmt.Println(r.Message)
}

11、python操作metada

syntax = "proto3";
option go_package=".;proto";

// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {
    }
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}
# server.py
from __future__ import print_function
from concurrent import futures
import logging

import grpc

import helloworld_pb2
import helloworld_pb2_grpc


class Greeter(helloworld_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        for key, value in context.invocation_metadata():
            print('Received initial metadata: key=%s value=%s' % (key, value))

        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()
from __future__ import print_function
import logging

import grpc

import helloworld_pb2
import helloworld_pb2_grpc


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response, call = stub.SayHello.with_call(
            helloworld_pb2.HelloRequest(name='you'),
            metadata=(
                ('name', 'bobby'),
                ('password','imooc')
            )

    print("Greeter client received: " + response.message)

if __name__ == '__main__':
    run()

12、grpc拦截器 - go

package main

import (
	"context"
	"fmt"
	"go_xin/grpc_go/proto"
	"google.golang.org/grpc"
	"net"
)

type Server struct{}

func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
	return &proto.HelloReply{
		Message: "Hello " + req.Name,
	}, nil
}

func main() {
	// 生成拦截器
	interceptor := func (ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error){
		fmt.Println("接收到新的请求")
		//return handler(ctx, req)
		res, err := handler(ctx, req)
		fmt.Println("请求已经完成")
		return res, err
	}
	opt := grpc.UnaryInterceptor(interceptor)
	//实例一个grpc的server
	g := grpc.NewServer(opt)
	// 注册
	proto.RegisterGreeterServer(g, &Server{})
	// 启动
	lis, err := net.Listen("tcp", "0.0.0.0:1234")
	if err != nil {
		panic("failed to listen" + err.Error())
	}
	err = g.Serve(lis)
	if err != nil {
		panic("failed to start grpc:" + err.Error())
	}
}
package main

import (
	"context"
	"fmt"
	"go_xin/grpc_go/proto"
	"google.golang.org/grpc"
	"time"
)

func main() {
	interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error{
		start := time.Now()
		err := invoker(ctx, method, req, reply, cc, opts...)
		fmt.Printf("耗时:%s\n", time.Since(start))
		return err
	}

	opt := grpc.WithUnaryInterceptor(interceptor)

	conn, err := grpc.Dial("127.0.0.1:1234", grpc.WithInsecure(), opt) //拨号
	if err != nil{
		panic(err)
	}
	defer conn.Close()

	c := proto.NewGreeterClient(conn)
	r, err := c.SayHello(context.Background(), &proto.HelloRequest{Name: "xin"})
	if err != nil{
		panic(err)
	}
	fmt.Println(r.Message)
}

很多拦截器的使用场景

13、python实现grpc的拦截器

from concurrent import futures

import grpc

from grpc_interceptor.proto import hello_pb2, hello_pb2_grpc


class Greeter(hello_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return hello_pb2.HelloReply(message=f"您好,{request}")


class LogInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        print("请求开始")
        print(type(handler_call_details))
        rsp = continuation(handler_call_details)
        print("请求结束")
        return rsp


if __name__ == '__main__':
    interceptor = LogInterceptor()
    # 1. 实例化server
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=(interceptor,))  # 设置线程池 及拦截器
    # 2. 注册逻辑到server中
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    # 3. 启动server
    server.add_insecure_port('0.0.0.0:1234')
    server.start()
    server.wait_for_termination()  # 不要立即结束主程序
import grpc

from base.proto import hello_pb2, hello_pb2_grpc


class DefaultIntercept(grpc.UnaryUnaryClientInterceptor):
    def intercept_unary_unary(self, continuation, client_call_details, request):
        from datetime import datetime
        start = datetime.now()
        rsp = continuation(client_call_details, request)
        print((datetime.now() - start).microseconds / 1000)
        return rsp


if __name__ == '__main__':
    default_intercept = DefaultIntercept()
    with grpc.insecure_channel("127.0.0.1:1234") as channel:  # 建立链接
        intercept_channel = grpc.intercept_channel(channel, default_intercept)
        stub = hello_pb2_grpc.GreeterStub(intercept_channel)
        rsp: hello_pb2.HelloReply = stub.SayHello(hello_pb2.HelloRequest(name="xin"))
        print(rsp.message)

14、通过拦截器和metadata实现grpc的auth认证

package main

import (
	"context"
	"fmt"
	"go_xin/grpc_go/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
	"net"
)

type Server struct{}

func (s *Server) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
	return &proto.HelloReply{
		Message: "Hello " + req.Name,
	}, nil
}

func main() {
	// 生成拦截器
	interceptor := func (ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error){
		fmt.Println("接收到新的请求")
		//return handler(ctx, req)
		md, ok := metadata.FromIncomingContext(ctx)
		fmt.Println(md)
		if !ok{
			return resp,status.Error(codes.Unauthenticated, "无token认证信息")
			fmt.Println("get metadata error")
		}
		var (
			appid string
			appkey string
		)
		if va1, ok := md["appid"]; ok{
			appid=va1[0]
		}
		if va1, ok := md["appkey"]; ok{
			appkey=va1[0]
		}
		if appid != "xin" || appkey != "10011"{
			return resp,status.Error(codes.Unauthenticated, "无token认证信息")
		}

		res, err := handler(ctx, req)
		fmt.Println("请求已经完成")
		return res, err
	}
	opt := grpc.UnaryInterceptor(interceptor)
	//实例一个grpc的server
	g := grpc.NewServer(opt)
	// 注册
	proto.RegisterGreeterServer(g, &Server{})
	// 启动
	lis, err := net.Listen("tcp", "0.0.0.0:1234")
	if err != nil {
		panic("failed to listen" + err.Error())
	}
	err = g.Serve(lis)
	if err != nil {
		panic("failed to start grpc:" + err.Error())
	}
}
package main

import (
	"context"
	"fmt"
	"go_xin/grpc_go/proto"
	"google.golang.org/grpc"
)


//方式2
type customCredential struct {}
func(c customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error){
	return map[string]string{
		"appid":  "xin",
		"appkey": "10011",
	}, nil
}

func(c customCredential) RequireTransportSecurity() bool{
	return false
}


func main() {
	////方式1
	//interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	//	start := time.Now()
	//	md := metadata.New(map[string]string{
	//		"appid":  "xin",
	//		"appkey": "10011",
	//	})
	//	ctx = metadata.NewOutgoingContext(context.Background(), md) // ctx发生变化
	//	err := invoker(ctx, method, req, reply, cc, opts...)
	//	fmt.Printf("耗时:%s\n", time.Since(start))
	//	return err
	//}
	//	interceptor := grpc.WithPerRPCCredentials()


	//方式2
	opt := grpc.WithPerRPCCredentials(customCredential{})
	conn, err := grpc.Dial("127.0.0.1:1234", grpc.WithInsecure(), opt) //拨号
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	c := proto.NewGreeterClient(conn)
	r, err := c.SayHello(context.Background(), &proto.HelloRequest{Name: "xin"})
	if err != nil {
		panic(err)
	}
	fmt.Println(r.Message)
}

15、grpc的验证器

使用的库

  • 使用和引用这个文件【https://github.com/envoyproxy/protoc-gen-validate/blob/main/validate/validate.proto】

  • 在unix上

    # fetches this repo into $GOPATH
    go get -d github.com/envoyproxy/protoc-gen-validate
    
    # installs PGV into $GOPATH/bin
    make build
    
  • 在win下

    使用 protoc-gen-validate.zip
    C:\Go\bin
    
  • 生成python源码

    protoc -I .  --go_out=plugins=grpc:. --validate_out="lang=go:." helloworld.proto
    
    syntax = "proto3";
    
    import "validate.proto";
    option go_package=".;proto";
      service Greeter {
          rpc SayHello (Person) returns (Person);
      }
    
      message Person {
          uint64 id    = 1 [(validate.rules).uint64.gt    = 999];
            string email = 2 [(validate.rules).string.email = true];
      string name  = 3 [(validate.rules).string = {
                        pattern:   "^[3456789]\\d{9}$"}];
                          }
    
    //server
    package main
    import (
    	"context"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	"net"
    	"google.golang.org/grpc"
    	"start/pgv_test/proto"
    )
    type Server struct{}
    
    func (s *Server) SayHello(ctx context.Context, request *proto.Person) (*proto.Person,
    	error){
    	return &proto.Person{
    		Id: 32,
    	}, nil
    }
    
    type Validator interface {
    	Validate() error
    }
    
    func main(){
    	var interceptor grpc.UnaryServerInterceptor
    	interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    		// 继续处理请求
    		if r, ok := req.(Validator); ok {
    			if err := r.Validate(); err != nil {
    				return nil, status.Error(codes.InvalidArgument, err.Error())
    			}
    		}
        	return handler(ctx, req)
    }
    var opts []grpc.ServerOption
    opts = append(opts, grpc.UnaryInterceptor(interceptor))
    
    g := grpc.NewServer(opts...)
    proto.RegisterGreeterServer(g, &Server{})
    lis, err := net.Listen("tcp", "0.0.0.0:50051")
    if err != nil{
    	panic("failed to listen:"+err.Error())
    }
    err = g.Serve(lis)
    if err != nil{
    	panic("failed to start grpc:"+err.Error())
    }
      }
    
    ?`````go
    //client.go
    package  main
    

import (
"context"
"fmt"
"google.golang.org/grpc"
"start/pgv_test/proto"
)

type customCredential struct{}

func main() {
var opts []grpc.DialOption

//opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
opts = append(opts, grpc.WithInsecure())

conn, err := grpc.Dial("localhost:50051", opts...)
if err != nil {
	panic(err)
}
defer conn.Close()

c := proto.NewGreeterClient(conn)
//rsp, _ := c.Search(context.Background(), &empty.Empty{})
rsp, err := c.SayHello(context.Background(), &proto.Person{
	Email: "bobby",
})
if err != nil {
	panic(err)
}
fmt.Println(rsp.Id)

## grpc中的错误处理

[错误码](https://github.com/grpc/grpc/blob/master/doc/statuscodes.md)

### python 中

?````
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_detaild("记录不存在")
?````

?````python
from concurrent import futures

import grpc

from base.proto import hello_pb2, hello_pb2_grpc


class Greeter(hello_pb2_grpc.GreeterServicer):
  def SayHello(self, request, context):
      context.set_code(grpc.StatusCode.NOT_FOUND) # 设置错误码
      context.set_details("记录不存在")
      return hello_pb2.HelloReply(message=f"您好,{request}")


if __name__ == '__main__':
  # 1. 实例化server
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # 设置线程池
  # 2. 注册逻辑到server中
  hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
  # 3. 启动server
  server.add_insecure_port('0.0.0.0:1234')
  server.start()
  server.wait_for_termination()  # 不要立即结束主程序
?````

?`````python
import grpc

from base.proto import hello_pb2, hello_pb2_grpc


class DefaultIntercept(grpc.UnaryUnaryClientInterceptor):
  def intercept_unary_unary(self, continuation, client_call_details, request):
      from datetime import datetime
      start = datetime.now()
      rsp = continuation(client_call_details, request)
      print((datetime.now() - start).microseconds / 1000)
      return rsp


if __name__ == '__main__':
  default_intercept = DefaultIntercept()
  with grpc.insecure_channel("127.0.0.1:1234") as channel:  # 建立链接
      intercept_channel = grpc.intercept_channel(channel, default_intercept)
      stub = hello_pb2_grpc.GreeterStub(intercept_channel)
      try:
          rsp: hello_pb2.HelloReply = stub.SayHello(hello_pb2.HelloRequest(name="xin"))
      except grpc.RpcError as e:
          print(e)
          d = e.details()
          status_code = e.code()
          print(status_code.name)  # NOT_FOUND
          print(status_code.value)  # (5, 'not found')
          print(f"code: {status_code.name}, detail: {d}") # code: NOT_FOUND, detail: 记录不存在
      print(rsp.message)

16、go 中

func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
	i := 0
	for {
		i++
		_ = res.Send(&proto.StreamResData{
			Data: fmt.Sprintf("%v", time.Now().Unix()),
		})
		//time.Sleep(time.Second)
		if i > 10 {
			break
		}
	}
	return status.Error(codes.InvalidArgument, "错误")
}
	allstr, err := c.AllStream(context.Background())
	if err != nil{
		st, ok := status.FormError(err)
		if !ok {
			Panic("解析错误失败")
		}
		st.Message()
		st.Code()
	}

17、grpc的超时机制

rsp: hello_pb2.HelloReply = stub.SayHello(hello_pb2.HelloRequest(name="xin"), timeout=3)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)