FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证
FastAPI(59)- 详解使用 OAuth2PasswordBearer + JWT 认证
- JSON Web Tokens
- 它是一个将 JSON 对象编码为密集且没有空格的长字符串的标准
- 使用 JWT token 和安全密码 hash 使应用程序真正安全
JWT 小栗子
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 它还没有加密,因此任何人都可以从该字符串中恢复信息
- 但是已经加签了,因此,当收到发出的 token 时,可以验证是否实际发出了它
- 创建一个有效期为 1 周的 token,然后当用户第二天带着 token 回来时,知道该用户仍然登录到系统中
- 一周后,令牌将过期,用户将无法获得授权,必须重新登录以获取新的 token
- 如果用户(或第三方)试图修改 token 以更改过期时间,将能够发现它,因为签名不匹配
前提
pip install python-jose
pip install cryptography
JWT 流程
前提
pip install passlib
pip install bcrypt
包含的功能
生成用于签名 JWT token 的随机密钥
> openssl rand -hex 32
dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c
常量池
# 常量池
# 通过 openssl rand -hex 32 生成的随机密钥
SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"
# 加密算法
ALGORITHM = "HS256"
# 过期时间,分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30
创建生成 JWT token 需要用的 Pydantic Model
# 返回给客户端的 Token Model
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
生成 JWT token
# 导入 JWT 相关库
from jose import JWTError, jwt
# 用户名、密码验证成功后,生成 token
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# 加密
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
修改 get_current_user
# 导入 JWT 相关库
from jose import JWTError, jwt
# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 1、解码收到的 token
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
# 2、拿到 username
username: str = payload.get("sub")
if not username:
# 3、若 token 失效,则返回错误码
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
# 4、获取用户
user = get_user(fake_users_db, username=token_data.username)
if not user:
raise credentials_exception
# 5、返回用户
return user
修改获取 token 的路径操作函数
# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 1、获取客户端传过来的用户名、密码
username = form_data.username
password = form_data.password
# 2、验证用户
user = authenticate_user(fake_users_db, username, password)
if not user:
# 3、验证失败,返回错误码
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 4、生成 token
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
# 5、返回 JSON 响应
return {"access_token": access_token, "token_type": "bearer"}
sub 的是什么?
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
- 它还没有加密,因此任何人都可以从该字符串中恢复信息
- 但是已经加签了,因此,当收到发出的 token 时,可以验证是否实际发出了它
- 创建一个有效期为 1 周的 token,然后当用户第二天带着 token 回来时,知道该用户仍然登录到系统中
- 一周后,令牌将过期,用户将无法获得授权,必须重新登录以获取新的 token
- 如果用户(或第三方)试图修改 token 以更改过期时间,将能够发现它,因为签名不匹配
前提
pip install python-jose
pip install cryptography
JWT 流程
前提
pip install passlib
pip install bcrypt
包含的功能
pip install passlib
pip install bcrypt
包含的功能
生成用于签名 JWT token 的随机密钥
> openssl rand -hex 32
dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c
常量池
# 常量池
# 通过 openssl rand -hex 32 生成的随机密钥
SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"
# 加密算法
ALGORITHM = "HS256"
# 过期时间,分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30
创建生成 JWT token 需要用的 Pydantic Model
# 返回给客户端的 Token Model
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
生成 JWT token
# 导入 JWT 相关库
from jose import JWTError, jwt
# 用户名、密码验证成功后,生成 token
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# 加密
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
修改 get_current_user
# 导入 JWT 相关库
from jose import JWTError, jwt
# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 1、解码收到的 token
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
# 2、拿到 username
username: str = payload.get("sub")
if not username:
# 3、若 token 失效,则返回错误码
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
# 4、获取用户
user = get_user(fake_users_db, username=token_data.username)
if not user:
raise credentials_exception
# 5、返回用户
return user
修改获取 token 的路径操作函数
# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 1、获取客户端传过来的用户名、密码
username = form_data.username
password = form_data.password
# 2、验证用户
user = authenticate_user(fake_users_db, username, password)
if not user:
# 3、验证失败,返回错误码
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 4、生成 token
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
# 5、返回 JSON 响应
return {"access_token": access_token, "token_type": "bearer"}
sub 的是什么?
# 常量池
# 通过 openssl rand -hex 32 生成的随机密钥
SECRET_KEY = "dc393487a84ddf9da61fe0180ef295cf0642ecbc5d678a1589ef2e26b35fce9c"
# 加密算法
ALGORITHM = "HS256"
# 过期时间,分钟
ACCESS_TOKEN_EXPIRE_MINUTES = 30
创建生成 JWT token 需要用的 Pydantic Model
# 返回给客户端的 Token Model
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
生成 JWT token
# 导入 JWT 相关库
from jose import JWTError, jwt
# 用户名、密码验证成功后,生成 token
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# 加密
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
修改 get_current_user
# 导入 JWT 相关库
from jose import JWTError, jwt
# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 1、解码收到的 token
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
# 2、拿到 username
username: str = payload.get("sub")
if not username:
# 3、若 token 失效,则返回错误码
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
# 4、获取用户
user = get_user(fake_users_db, username=token_data.username)
if not user:
raise credentials_exception
# 5、返回用户
return user
修改获取 token 的路径操作函数
# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 1、获取客户端传过来的用户名、密码
username = form_data.username
password = form_data.password
# 2、验证用户
user = authenticate_user(fake_users_db, username, password)
if not user:
# 3、验证失败,返回错误码
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 4、生成 token
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
# 5、返回 JSON 响应
return {"access_token": access_token, "token_type": "bearer"}
sub 的是什么?
# 导入 JWT 相关库
from jose import JWTError, jwt
# 用户名、密码验证成功后,生成 token
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# 加密
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
修改 get_current_user
# 导入 JWT 相关库
from jose import JWTError, jwt
# 根据当前用户的 token 获取用户,token 已失效则返回错误码
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 1、解码收到的 token
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
# 2、拿到 username
username: str = payload.get("sub")
if not username:
# 3、若 token 失效,则返回错误码
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
# 4、获取用户
user = get_user(fake_users_db, username=token_data.username)
if not user:
raise credentials_exception
# 5、返回用户
return user
修改获取 token 的路径操作函数
# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 1、获取客户端传过来的用户名、密码
username = form_data.username
password = form_data.password
# 2、验证用户
user = authenticate_user(fake_users_db, username, password)
if not user:
# 3、验证失败,返回错误码
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 4、生成 token
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
# 5、返回 JSON 响应
return {"access_token": access_token, "token_type": "bearer"}
sub 的是什么?
# OAuth2 获取 token 的请求路径
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 1、获取客户端传过来的用户名、密码
username = form_data.username
password = form_data.password
# 2、验证用户
user = authenticate_user(fake_users_db, username, password)
if not user:
# 3、验证失败,返回错误码
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# 4、生成 token
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
# 5、返回 JSON 响应
return {"access_token": access_token, "token_type": "bearer"}
sub 的是什么?
- 本文作者: 小菠萝测试笔记
- 本文链接: