from fastapi import FastAPI, Request, Response, Header, Body, Form, Cookie
from fastapi.responses import JSONResponse, ORJSONResponse, FileResponse, HTMLResponse, RedirectResponse, StreamingResponse, PlainTextResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import uvicorn
class Vane(BaseModel):
name: str
price: float
tax: float | None = None
description: str | None = None
app = FastAPI(default_response_class = ORJSONResponse)
# Path Parameters & Query Parameters
@app.get('/users/{user_id}/items/{item_id}')
async def read_user_item(user_id: int, item_id: str, q: str | None = None, short: bool = False):
item = {'item_id': item_id, 'user_id': user_id}
if q:
item.update({'q': q})
if not short:
item.update(description = 'short is False')
return item
# Headers
@app.get('/headers/')
async def read_items(strange_header: str | None = Header(None, convert_underscores = False),
x_token: list[str] | None = Header(None),
user_agent: str = Header(None)
):
return {'strange_header': strange_header, 'X-Token': x_token, 'User-Agent': user_agent}
# Cookie
@app.get('/cookie')
async def read_cookie(sessionid: str | None = Cookie(None), csrftoken: str | None = Cookie(None)):
return {'sessionid': sessionid, 'csrftoken': csrftoken}
# Form
@app.get('/form')
async def read_form(username: str | None = Form(None), password: str | None = Form(None)):
return {'username': username, 'password': password}
# application/json (pydantic)
@app.post('/vane/{vane_id}')
async def read_vane(vane_id: int, q: str | None = None, vane: Vane = None):
result = {'vane_id': vane_id, **vane.dict()}
if vane.tax:
result.update(price_with_tax = vane.price * vane.tax)
if q:
result.update(q = q)
return result
# application/xml
@app.get('/xml')
async def xml():
data = """<?xml version="1.0"?>
You'll have to use soap here.
"""
return Response(content = data, media_type = "application/xml")
# plain text Response
@app.get('/plain')
async def plain(request: Request):
return PlainTextResponse(str(request.headers), status_code = 207)
# RedirectResponse
@app.get('/redirect', response_class = RedirectResponse)
async def redirect():
"""
async def redirect():
return RedirectResponse('http://x.com')
"""
return 'http://x.com'
# StreamingResponse
@app.get('/streaming')
async def streaming():
async def fake_video_streamer():
for i in range(5):
yield b'some fake video bytes'
return StreamingResponse(fake_video_streamer())
@app.get('/video')
async def video():
def itervideo():
video = 'fred.mp4'
with open(video, mode = 'r+b') as f:
yield from f
return StreamingResponse(itervideo(), media_type = 'video/mp4')
@app.get('/file', response_class = FileResponse)
async def file():
"""
return FileResponse(path)
:return:
"""
path = 'ASCII.jpg'
return FileResponse(path, filename = 'custom_name.jpg') # 有filename参数时, 浏览器会下载
# Body & JSONResponse
@app.api_route('/valor', methods = ['get', 'post'])
async def valor(request: Request, body = Body(None)):
print(f'body = {body}')
print(request.url)
return JSONResponse(content = {'request': request.client}, status_code = 202, headers = {'custom-header': 'vandalism'}, media_type = 'application/json')
# HTMLResponse
@app.get('/html', response_class = HTMLResponse)
async def generate_html_response():
html_content = """
Some HTML <span style="color: rgba(0, 0, 255, 1)">in</span> here
Look ma! HTML!
"""
return HTMLResponse(content = html_content, status_code = 206, media_type = 'text/html')
if __name__ == '__main__':
uvicorn.run(app)