flask restful接口开发之序列化规则


1.自定义序列化

Result={}
# 序列化 def serializer(self, obj_list): result = [obj.__dict__ for obj in obj_list] for res in result: res.pop('_sa_instance_state') Result['data'] = result return Result

 以上自定义序列化函数,只适合简单的,没有嵌套的序列化结构

2.使用flask_restful的 @marshal_with 装饰器进行序列化

详细介绍:官方文档

相关博客:、Flask_Restful

使用第一步,编写序列化规则

from flask_restful import fields
import random

# 自定义字段
class RandomNumber(fields.Raw):
    def output(self, key, obj):
        return random.random()


# 自定义序列化规则
user_fields = {
    'id': fields.Integer,
    'name': fields.String(attribute='username'),  #重命名属性
    'password': fields.String,
    'greeting': fields.FormattedString('Hello {username}'),
    'random': RandomNumber  # 使用自定义字段
}

resource_fields = {
    'status': fields.Integer,
    'userList': fields.List(fields.Nested(user_fields)),  #嵌套结构
    'userCount': fields.Integer,
    'msg': fields.String,

    # 默认情况下,fields.Url 返回一个相对的 uri,绝对路径,需要在字段声明的时候传入 absolute=True
    'url': fields.Url(endpoint='userInfo', absolute=True)
}

def serializer(user_list):
    return {
        'status': 200,
        'userList': user_list,
        'userCount': len(user_list),
        'msg': 'success'
    }

第二步使用序列化规则

from common.models.model import Account
from flask import jsonify, request
from flask_restful import Resource, reqparse
from flask_restful import marshal_with, abort
from flask_jwt_extended import create_access_token
from flask_jwt_extended import jwt_required
from werkzeug.security import check_password_hash as check_password
from werkzeug.security import generate_password_hash as generate_password
from web.serializer.Account import resource_fields, serializer
import re

# 创建解析参数对象
parser = reqparse.RequestParser()

# 指明需要解析的参数
parser.add_argument('username', type=str, required=True, help='please input username')
parser.add_argument('password', type=str, required=True, help='please input password')

# parser.add_argument('is_activate', choices=(0, 1), type=int, default=0)

class RegisterView(Resource):
    def post(self):
        data = parser.parse_args()
        username = data.get('username')
        password = data.get('password')

        if not re.match(r'^[a-zA-Z]\w*$', username):
            return jsonify(status=400, msg='用户名格式错误')

        if not re.match(r'^[0-9a-zA-Z]{6,32}$', password):
            return jsonify(status=400, msg='密码格式错误')

        if Account.query.filter_by(username=username).first():
            return jsonify(status=400, msg='用户名已存在')

        user = Account()
        user.username = username
        user.password = generate_password(password)
        user.save()
        return jsonify(status=200, msg='Register Success!')

class LoginView(Resource):
    def post(self):
        data = parser.parse_args()
        username = data.get('username')
        password = data.get('password')

        user = Account.query.filter_by(username=username).first()

        if not user:
            return jsonify(status=400, msg='用户名或密码错误')

        if not check_password(user.password, password):
            return jsonify(status=400, msg='用户名或密码错误')

        # 生成token
        token = create_access_token(
            identity={'id': user.id, 'username': user.username}
        )
        return jsonify(status=200, token=token, msg='Login Success!')

class UserInfoView(Resource):
    method_decorators = [jwt_required()]

    @marshal_with(resource_fields)
    def get(self, id=None):
        if id:
            user = Account.query.get(id)
            if not user:
                abort(404, status=404,msg="查无此人")
            return serializer([user])
        else:
            data = request.args
            try:
                user_list = Account.query.filter_by(**data).all()
            except:
                abort(400,status=400,msg='查询条件错误')
            else:
                if not user_list:
                    abort(404, status=404,msg="查无此人")
                return serializer(user_list)

    def put(self, id):
        data = parser.parse_args()
        username = data.get('username')
        password = data.get('password')

        if not re.match(r'^[a-zA-Z]\w*$', username):
            return jsonify(status=400, msg='用户名格式错误')

        if not re.match(r'^[0-9a-zA-Z]{6,32}$', password):
            return jsonify(status=400, msg='密码格式错误')

        if Account.query.filter_by(username=username).first():
            return jsonify(status=400, msg='用户名已存在')

        user = Account.query.get(id)
        if not user:
            abort(404, status=404, msg="查无此人")

        user.password = generate_password(password)
        user.username = username
        user.update()
        return jsonify(status=200, msg='Update Success!')

    def delete(self, id):
        user = Account.query.get(id)
        if not user:
            abort(404, status=404,msg="查无此人")
        user.delete()
        return jsonify(status=200, msg='Delete Success!')