Commit 856a6109 authored by Administrator's avatar Administrator

用户登录、验证码获取、谷歌二维码、谷歌验证(修改第三次)————余晋熹

parent 226350cd
import pyotp
from jose import jwt
from sqlalchemy.orm import Session
from typing import Optional
from datetime import timedelta, datetime
from app.api.users.schemas import UserLoginForm
from app.api.users.schemas import UserLoginForm, GoogleLogin
from libs.functions import md5
from libs.img_code import session
from models import users as users
......@@ -61,19 +61,29 @@ def authenticate_user(db: Session, form_data: UserLoginForm):
if form_data.verify.lower() != session.headers.get("verify").lower():
return {"result": False, "msg": "验证码错误"}
if user_data.google_key:
return {"result": True, "msg": "验证通过", "google_key": 1}
return {"result": True, "msg": "验证通过", "google_key": "1"}
else:
return {"result": True, "msg": "验证通过", "google_key": 0}
return {"result": True, "msg": "验证通过", "google_key": "0"}
def add_google_key(db: Session, user_id: int):
def add_google_key(db: Session, username: str, google_key: str):
updata = {
"google_key": pyotp.random_base32(64),
"google_key": google_key,
"update_time": datetime.now()
}
try:
db.query(users.User).filter(users.User.id == user_id).update(updata)
db.query(users.User).filter(users.User.username == username).update(updata)
db.commit()
return {"result": True, "google_key": updata.get("google_key")}
except Exception as e:
return {"result": False}
\ No newline at end of file
return {"result": False}
def authenticate_pwd(db: Session, form_data: GoogleLogin):
"""只验证密码"""
user_data = get_user(db=db, username=form_data.username)
# 如果密码不正确,也是返回False
md5_password = md5(form_data.password)
if md5_password != user_data.hashed_password:
return {"result": False, "msg": "密码错误"}
return {"result": True, "msg": "验证通过"}
......@@ -30,4 +30,6 @@ class GoogleCode(BaseModel):
class GoogleLogin(BaseModel):
username: str
password: str
google_key: str
google_code: str
import time
from typing import List
import pyotp
from fastapi import Depends, APIRouter
from datetime import timedelta
from sqlalchemy.orm import Session
from app import get_db
from app.api.users import crud, schemas
from app.api.users.login_verification import authenticate_user, create_access_token, get_user, add_google_key
from app.api.users.login_verification import authenticate_user, create_access_token, get_user, authenticate_pwd
from app.api.users.schemas import UserLoginForm, GoogleCode, GoogleLogin
from libs.google_code import get_qrcode, google_verify_result
from libs.img_code import imageCode
......@@ -25,7 +28,7 @@ def imgCode():
@router.post("/login")
async def login(form_data: UserLoginForm, db: Session = Depends(get_db)):
def login(form_data: UserLoginForm, db: Session = Depends(get_db)):
user_info = authenticate_user(db=db, form_data=form_data)
if not user_info.get("result"):
return HttpResultResponse(code=500, msg=user_info.get("msg"), data={})
......@@ -33,25 +36,23 @@ async def login(form_data: UserLoginForm, db: Session = Depends(get_db)):
@router.post("/goodleCode")
async def goodleCode(data: GoogleCode, db: Session = Depends(get_db)):
def goodleCode(data: GoogleCode, db: Session = Depends(get_db)):
user_data = get_user(db, data.username)
update_info = add_google_key(db=db, user_id=user_data.id)
if not update_info.get("result"):
return HttpResultResponse(code=500, msg="谷歌二维码生成失败")
return get_qrcode(username=user_data.username, gtoken=update_info.get("google_key"))
google_key = pyotp.random_base32(64)
google_img = get_qrcode(username=user_data.username, gtoken=google_key).decode('utf-8')
return HttpResultResponse(data={"google_img": google_img, "google_key": google_key})
@router.post("/googleLogin")
async def googleLogin(data: GoogleLogin, db: Session = Depends(get_db)):
user_data = get_user(db, data.username)
verify = google_verify_result(secret_key=user_data.google_key, google_code=data.google_code)
def googleLogin(data: GoogleLogin, db: Session = Depends(get_db)):
auth_info = authenticate_pwd(db=db, form_data=data)
if not auth_info.get("result"):
return HttpResultResponse(code=500, msg=auth_info.get("msg"))
verify = google_verify_result(db=db, data=data)
if verify:
# 定义tokens过期时间
access_token_expires = timedelta(hours=12)
token_data = {
"username": user_data.username,
"google_key": user_data.google_key
}
token_data = {"username": data.username, "password": data.password}
access_token = create_access_token(data=token_data, expires_delta=access_token_expires)
return HttpResultResponse(msg=HttpMessage.HFDU, data={"access_token": access_token, "token_type": "bearer"})
else:
......
import base64
from sqlalchemy.orm import Session
import pyotp
import os
import traceback
from qrcode import QRCode, constants
from six import BytesIO
from app.api.users.login_verification import get_user, add_google_key
from app.api.users.schemas import GoogleLogin
def get_qrcode(username: str, gtoken: str):
# gtoken = pyotp.random_base32(64)
......@@ -34,9 +37,16 @@ def get_qrcode(username: str, gtoken: str):
return False
def google_verify_result(secret_key, google_code):
def google_verify_result(db: Session, data: GoogleLogin):
"""谷歌动态码效验"""
if data.google_key == "1":
user_data = get_user(db, data.username)
secret_key = user_data.google_key
else:
secret_key = data.google_key
t = pyotp.TOTP(secret_key)
result = t.verify(google_code) # 对输入验证码进行校验,正确返回True
result = t.verify(data.google_code) # 对输入验证码进行校验,正确返回True
msg = result if result is True else False
if msg and data.google_key != "1":
update_info = add_google_key(db=db, username=data.username, google_key=data.google_key)
return msg
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment