Commit dd27929f authored by Administrator's avatar Administrator

用户登录、验证码获取、谷歌二维码、谷歌验证————余晋熹

parent e97183af
from datetime import datetime from datetime import datetime
import pyotp
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.api.users import schemas from app.api.users import schemas
...@@ -22,7 +23,7 @@ def get_users(db: Session, skip: int = 0, limit: int = 100): ...@@ -22,7 +23,7 @@ def get_users(db: Session, skip: int = 0, limit: int = 100):
def create_user(db: Session, user: schemas.UserCreate): def create_user(db: Session, user: schemas.UserCreate):
db_user = models.User(username=user.username, description=user.description, unique=user.unique, db_user = models.User(username=user.username, description=user.description, unique=user.unique,
config_key=user.config_key, uuid=uuid(), hashed_password=md5("123456"), config_key=user.config_key, uuid=uuid(), hashed_password=md5("123456"),
create_time=datetime.now()) google_key=pyotp.random_base32(64), create_time=datetime.now())
db.add(db_user) db.add(db_user)
db.commit() db.commit()
db.refresh(db_user) db.refresh(db_user)
......
from jose import jwt
from sqlalchemy.orm import Session
from typing import Optional
from datetime import timedelta, datetime
from app.api.users.schemas import UserLoginForm
from libs.functions import md5
from libs.img_code import session
from models import users as users
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
def get_user(db, username: str):
'''查询用户
:param db: 模拟的数据库
:param username: 用户名
:return: 返回一个用户的BaseModel(其实就是字典的BaseModel对象,二者可互相转换)
'''
return db.query(users.User).filter(users.User.username == username).first()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
'''创建tokens函数
:param data: 对用JWT的Payload字段,这里是tokens的载荷,在这里就是用户的信息
:param expires_delta: 缺省参数,截止时间
:return:
'''
# 深拷贝data
to_encode = data.copy()
# 如果携带了截至时间,就单独设置tokens的过期时间
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
# 否则的话,就默认用15分钟
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({'exp': expire})
# 编码,至此 JWT tokens诞生
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def authenticate_user(db: Session, form_data: UserLoginForm):
'''验证用户
:param db: 存储用户的数据库
:param username: 用户名
:param password: 密码
:param verify: 验证码
:return:
'''
user_data = get_user(db=db, username=form_data.username)
# 如果获取为空,返回False
if not user_data:
return False, "用户不存在"
# 如果密码不正确,也是返回False
md5_password = md5(form_data.password)
if md5_password != user_data.hashed_password:
return False, "密码错误"
# 验证码检查
if form_data.verify.lower() != session.headers.get("verify").lower():
return False, "验证码错误"
return True, False
...@@ -16,3 +16,18 @@ class User(UserBase): ...@@ -16,3 +16,18 @@ class User(UserBase):
class Config: class Config:
orm_mode = True orm_mode = True
class UserLoginForm(BaseModel):
username: str
password: str
verify: str
class GoogleCode(BaseModel):
username: str
class GoogleLogin(BaseModel):
username: str
google_code: str
from app import get_db
from typing import List from typing import List
from fastapi import Depends, APIRouter
from datetime import datetime, timedelta
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.api.users import crud, schemas from app.api.users import crud, schemas
from fastapi import Depends, APIRouter, HTTPException from app.api.users.login_verification import authenticate_user, create_access_token, get_user
from app.api.users.schemas import UserLoginForm, GoogleCode, GoogleLogin
from core.storage.db import SessionLocal
from libs.google_code import get_qrcode, google_verify_result
from libs.img_code import imageCode
from libs.result_format import HttpResultResponse, HttpMessage from libs.result_format import HttpResultResponse, HttpMessage
router = APIRouter() router = APIRouter()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# @router.post("/", response_model=schemas.User)
# def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# db_user = crud.get_user_by_email(db, email=user.email)
# if db_user:
# raise HTTPException(status_code=400, detail="Email already registered")
# return crud.create_user(db=db, user=user)
@router.get("/", response_model=List[schemas.User]) @router.get("/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit) users = crud.get_users(db, skip=skip, limit=limit)
return users return users
@router.get("/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)): @router.get("/imgCode")
db_user = crud.get_user(db, user_id=user_id) def imgCode():
if db_user is None: return imageCode().getImgCode()
raise HTTPException(status_code=404, detail="User not found")
return db_user
@router.post("/login")
async def login(form_data: UserLoginForm, db: Session = Depends(get_db)):
@router.post("/login", tags=["POST"]) user, msg = authenticate_user(db=db, form_data=form_data)
def login(): if not user:
""" return HttpResultResponse(code=500, msg=msg, data={})
OAuth2 compatible token login, get an access token for future requests. # 定义tokens过期时间
""" access_token_expires = timedelta(hours=12)
# bucket = get_default_bucket() access_token = create_access_token(data=form_data.dict(), expires_delta=access_token_expires)
# user = crud.user.authenticate( return HttpResultResponse(msg=HttpMessage.HFDU, data={"access_token": access_token, "token_type": "bearer"})
# bucket, username=form_data.username, password=form_data.password
# )
@router.post("/goodleCode")
# if not user: async def goodleCode(data: GoogleCode, db: Session = Depends(get_db)):
# raise HTTPException(status_code=400, detail="Incorrect email or password") user_data = get_user(db, data.username)
# elif not crud.user.is_active(user): return get_qrcode(username=user_data.username, gtoken=user_data.google_key)
# raise HTTPException(status_code=400, detail="Inactive user")
# access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
return { @router.post("/googleLogin")
async def googleLogin(data: GoogleLogin, db: Session = Depends(get_db)):
"token_type": "bearer" user_data = get_user(db, data.username)
} verify = google_verify_result(secret_key=user_data.google_key, google_code=data.google_code)
if verify:
return HttpResultResponse(msg=HttpMessage.HFDU)
else:
return HttpResultResponse(msg="登录失败,谷歌动态码错误")
@router.post("/create") @router.post("/create")
......
import base64
import pyotp
import os
import traceback
from qrcode import QRCode, constants
from six import BytesIO
def get_qrcode(username: str, gtoken: str):
# gtoken = pyotp.random_base32(64)
# dirpath = os.path.join(os.getcwd())
data = pyotp.totp.TOTP(gtoken).provisioning_uri(username, issuer_name="IAM MFA Code")
qr = QRCode(
version=1,
error_correction=constants.ERROR_CORRECT_L,
box_size=6,
border=4, )
try:
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image()
# filepath = dirpath + os.sep + username + '.png'
# img.save(filepath) # 保存条形码图片
# 图片以二进制形式写入
buf = BytesIO()
img.save(buf, 'jpeg')
buf_str = buf.getvalue()
img_b = b"data:image/png;base64," + base64.b64encode(buf_str)
return img_b
except Exception as e:
traceback.print_exc()
return False
def google_verify_result(secret_key, google_code):
"""谷歌动态码效验"""
t = pyotp.TOTP(secret_key)
result = t.verify(google_code) # 对输入验证码进行校验,正确返回True
msg = result if result is True else False
return msg
\ No newline at end of file
import base64
import os.path
import random
import string
import requests
from PIL import Image, ImageFont, ImageDraw
from six import BytesIO
session = requests.session()
class imageCode():
'''
验证码处理
'''
def rndColor(self):
'''随机颜色'''
return (random.randint(32, 127), random.randint(32, 127), random.randint(32, 127))
def geneText(self):
'''生成4位验证码'''
return ''.join(random.sample(string.ascii_letters + string.digits, 4)) # ascii_letters是生成所有字母 digits是生成所有数字0-9
def drawLines(self, draw, num, width, height):
'''划线'''
for num in range(num):
x1 = random.randint(0, width / 2)
y1 = random.randint(0, height / 2)
x2 = random.randint(0, width)
y2 = random.randint(height / 2, height)
draw.line(((x1, y1), (x2, y2)), fill='black', width=1)
def getVerifyCode(self):
'''生成验证码图形'''
code = self.geneText()
# 图片大小120×50
width, height = 120, 50
# 新图片对象
im = Image.new('RGB', (width, height), 'white')
# 字体
font_file = os.path.join(os.getcwd(), "libs", "DejaVuSans-BoldOblique.ttf")
font = ImageFont.truetype(font_file, 40)
# draw对象
draw = ImageDraw.Draw(im)
# 绘制字符串
for item in range(4):
draw.text((5 + random.randint(-3, 3) + 23 * item, 5 + random.randint(-3, 3)),
text=code[item], fill=self.rndColor(), font=font)
# 划线
self.drawLines(draw, 2, width, height)
return im, code
def getImgCode(self):
image, code = self.getVerifyCode()
# 图片以二进制形式写入
buf = BytesIO()
image.save(buf, 'jpeg')
buf_str = buf.getvalue()
img = b"data:image/png;base64," + base64.b64encode(buf_str)
session.headers['verify'] = code
print(code)
return img
...@@ -6,6 +6,7 @@ from fastapi.responses import Response ...@@ -6,6 +6,7 @@ from fastapi.responses import Response
class HttpMessage(object): class HttpMessage(object):
SUCCESS = "操作成功" SUCCESS = "操作成功"
USER_EXIST = "用户已存在" USER_EXIST = "用户已存在"
HFDU = "登录成功"
class HttpResultResponse(Response): class HttpResultResponse(Response):
......
...@@ -6,24 +6,25 @@ class User(Base): ...@@ -6,24 +6,25 @@ class User(Base):
__tablename__ = "users" __tablename__ = "users"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
username = Column(String(100), unique=True, index=True) username = Column(String(100), unique=True, index=True, comment="用户名")
description = Column(String(255)) description = Column(String(255), comment="描述")
unique = Column(String(50), unique=True) unique = Column(String(50), unique=True, comment="唯一标识")
hashed_password = Column(String(50)) hashed_password = Column(String(50), comment="密码")
config_key = Column(String(50)) config_key = Column(String(50))
uuid = Column(String(50)) uuid = Column(String(50))
authority = Column(Integer) authority = Column(Integer, comment="权限")
remaining_sum = Column(Float) google_key = Column(String(255), comment="谷歌动态码secret标识")
entry_account = Column(String(255)) remaining_sum = Column(Float, comment="账户余额")
out_account = Column(String(255)) entry_account = Column(String(255), comment="入账类目")
create_time = Column(DateTime) out_account = Column(String(255), comment="出账类目")
update_time = Column(DateTime) create_time = Column(DateTime, comment="创建时间")
update_time = Column(DateTime, comment="修改时间")
class Authority(Base): class Authority(Base):
__tablename__ = "authority" __tablename__ = "authority"
id = Column(Integer, primary_key=True, index=True) id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), index=True) name = Column(String(50), index=True, comment="权限名称")
up_one_level = Column(Integer, index=True) up_one_level = Column(Integer, index=True, comment="上级权限id")
create_time = Column(DateTime) create_time = Column(DateTime, comment="创建时间")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment