Commit b9835d72 authored by Administrator's avatar Administrator

token验证、token获取用户信息————余晋熹

parent 3e9d54ab
from jose import jwt from jose import jwt, JWTError
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from typing import Optional from typing import Optional
from datetime import timedelta, datetime from datetime import timedelta, datetime
from app.api.users.schemas import UserLoginForm, GoogleLogin from app.api.users.schemas import UserLoginForm, GoogleLogin
from libs.functions import md5 from libs.functions import md5
from libs.img_code import session from libs.img_code import session
from libs.result_format import HttpResultResponse
from models import users as users from models import users as users
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256" ALGORITHM = "HS256"
...@@ -86,4 +89,4 @@ def authenticate_pwd(db: Session, form_data: GoogleLogin): ...@@ -86,4 +89,4 @@ def authenticate_pwd(db: Session, form_data: GoogleLogin):
md5_password = md5(form_data.password) md5_password = md5(form_data.password)
if md5_password != user_data.hashed_password: if md5_password != user_data.hashed_password:
return {"result": False, "msg": "密码错误"} return {"result": False, "msg": "密码错误"}
return {"result": True, "msg": "验证通过"} return {"result": True, "msg": "验证通过"}
\ No newline at end of file
...@@ -12,6 +12,7 @@ from app.api.users.schemas import UserLoginForm, GoogleCode, GoogleLogin ...@@ -12,6 +12,7 @@ from app.api.users.schemas import UserLoginForm, GoogleCode, GoogleLogin
from libs.google_code import get_qrcode, google_verify_result from libs.google_code import get_qrcode, google_verify_result
from libs.img_code import imageCode from libs.img_code import imageCode
from libs.result_format import HttpResultResponse, HttpMessage from libs.result_format import HttpResultResponse, HttpMessage
from libs.token_verify import oauth2_scheme, auth_token, get_current_user
router = APIRouter() router = APIRouter()
...@@ -69,10 +70,17 @@ def create_user(data: schemas.UserCreate, db: Session = Depends(get_db)): ...@@ -69,10 +70,17 @@ def create_user(data: schemas.UserCreate, db: Session = Depends(get_db)):
return HttpResultResponse(data=res.id) return HttpResultResponse(data=res.id)
@router.get("/{user_id}") # @router.get("/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_db)): # def read_user(user_id: int, db: Session = Depends(get_db)):
"""查询单个用户信息""" # """查询单个用户信息"""
db_user = crud.get_user(db, user_id=user_id) # db_user = crud.get_user(db, user_id=user_id)
if db_user is None: # if db_user is None:
return HttpResultResponse(code=400, msg=HttpMessage.USER_NOT_EXIST) # return HttpResultResponse(code=400, msg=HttpMessage.USER_NOT_EXIST)
return HttpResultResponse(data=db_user) # return HttpResultResponse(data=db_user)
@router.get("/tokenUser")
def token_user(token=Depends(auth_token), db: Session = Depends(get_db)):
"""通过token,获取登录用户信息"""
user_data = get_current_user(db=db, token=token)
return HttpResultResponse(code=200, data=user_data)
\ No newline at end of file
import datetime
from sqlalchemy.orm import Session
from fastapi import Depends, APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
from typing import Union
from app import get_db
from app.api.users.login_verification import SECRET_KEY, ALGORITHM, get_user
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def auth_token(token: str = Depends(oauth2_scheme)):
try:
# 解密tokens
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从tokens的载荷payload中获取用户名
username: str = payload.get('username')
# 如果没有获取到,抛出异常
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='用户不存在')
now_time = int(datetime.datetime.now().timestamp())
if payload.get("exp") <= now_time:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='token过期')
return token
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f'{e}')
def get_current_user(db: Session, token: str = Depends(oauth2_scheme)):
'''获取当前用户信息,实际上是一个解密token的过程
:param token: 携带的token
:return:
'''
try:
# 解密tokens
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从tokens的载荷payload中获取用户名
username: str = payload.get('username')
user_data = get_user(db=db, username=username)
json_data = {
"username": user_data.username
}
return json_data
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f'{e}')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment