Commit d26ee1d2 authored by xianyang's avatar xianyang

用户代码优化,用户权限接口

parent b9835d72
......@@ -14,6 +14,13 @@ def statement_recharge_list(data: schemas.StatementList):
@router.post("/derive/excel")
def statement_recharge_list(data: schemas.StatementList):
"""导出"""
"""充值报表导出"""
statement_list = crud.get_statements(data)
return crud.data_to_file(statement_list, "充值报表")
@router.post("/userWithdrawal/list")
def statement_recharge_list(data: schemas.StatementList):
"""用户提现列表"""
statement_list = crud.get_statements(data)
return HttpResultResponse(data=statement_list)
......@@ -40,3 +40,10 @@ def create_user(db: Session, user: schemas.UserCreate):
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, permissions: list):
db.query(models.User).filter(models.User.id == user_id).update({models.User.authority: ','.join(map(str,
permissions))})
db.commit()
from jose import jwt, JWTError
from jose import jwt
from sqlalchemy.orm import Session
from typing import Optional
from datetime import timedelta, datetime
from app.api.users.schemas import UserLoginForm, GoogleLogin
from core.config.env import env
from libs.functions import md5
from libs.img_code import session
from libs.result_format import HttpResultResponse
from models import users as users
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
def get_user(db, username: str):
'''查询用户
"""
查询用户
:param db: 模拟的数据库
:param username: 用户名
:return: 返回一个用户的BaseModel(其实就是字典的BaseModel对象,二者可互相转换)
'''
"""
return db.query(users.User).filter(users.User.username == username).first()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
'''创建tokens函数
"""
创建tokens函数
:param data: 对用JWT的Payload字段,这里是tokens的载荷,在这里就是用户的信息
:param expires_delta: 缺省参数,截止时间
:return:
'''
"""
# 深拷贝data
to_encode = data.copy()
# 如果携带了截至时间,就单独设置tokens的过期时间
......@@ -40,18 +36,17 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({'exp': expire})
# 编码,至此 JWT tokens诞生
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
encoded_jwt = jwt.encode(to_encode, env.SECRET_KEY, algorithm=env.ALGORITHM)
return encoded_jwt
def authenticate_user(db: Session, form_data: UserLoginForm):
'''验证用户
"""
验证用户
:param db: 存储用户的数据库
:param username: 用户名
:param password: 密码
:param verify: 验证码
:param form_data: 用户信息
:return:
'''
"""
user_data = get_user(db=db, username=form_data.username)
# 如果获取为空,返回False
if not user_data:
......
......@@ -33,3 +33,9 @@ class GoogleLogin(BaseModel):
password: str
google_key: str
google_code: str
class PermissionCreate(BaseModel):
id: int
authority: list
import time
from typing import List
import pyotp
from typing import List
from fastapi import Depends, APIRouter
from datetime import timedelta
from sqlalchemy.orm import Session
......@@ -12,7 +10,7 @@ from app.api.users.schemas import UserLoginForm, GoogleCode, GoogleLogin
from libs.google_code import get_qrcode, google_verify_result
from libs.img_code import imageCode
from libs.result_format import HttpResultResponse, HttpMessage
from libs.token_verify import oauth2_scheme, auth_token, get_current_user
from libs.token_verify import auth_token, get_current_user
router = APIRouter()
......@@ -24,7 +22,7 @@ def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
@router.get("/imgCode")
def imgCode():
def img_code():
return HttpResultResponse(data=imageCode().getImgCode())
......@@ -37,7 +35,7 @@ def login(form_data: UserLoginForm, db: Session = Depends(get_db)):
@router.post("/goodleCode")
def goodleCode(data: GoogleCode, db: Session = Depends(get_db)):
def goodle_code(data: GoogleCode, db: Session = Depends(get_db)):
user_data = get_user(db, data.username)
google_key = pyotp.random_base32(64)
google_img = get_qrcode(username=user_data.username, gtoken=google_key).decode('utf-8')
......@@ -45,7 +43,7 @@ def goodleCode(data: GoogleCode, db: Session = Depends(get_db)):
@router.post("/googleLogin")
def googleLogin(data: GoogleLogin, db: Session = Depends(get_db)):
def google_login(data: GoogleLogin, db: Session = Depends(get_db)):
auth_info = authenticate_pwd(db=db, form_data=data)
if not auth_info.get("result"):
return HttpResultResponse(code=500, msg=auth_info.get("msg"))
......@@ -70,17 +68,15 @@ def create_user(data: schemas.UserCreate, db: Session = Depends(get_db)):
return HttpResultResponse(data=res.id)
# @router.get("/{user_id}")
# def read_user(user_id: int, db: Session = Depends(get_db)):
# """查询单个用户信息"""
# db_user = crud.get_user(db, user_id=user_id)
# if db_user is None:
# return HttpResultResponse(code=400, msg=HttpMessage.USER_NOT_EXIST)
# return HttpResultResponse(data=db_user)
@router.get("/tokenUser")
def token_user(token=Depends(auth_token), db: Session = Depends(get_db)):
"""通过token,获取登录用户信息"""
user_data = get_current_user(db=db, token=token)
return HttpResultResponse(code=200, data=user_data)
@router.post("/permission/allocation")
def read_user(data: schemas.PermissionCreate, db: Session = Depends(get_db)):
"""用户添加权限"""
crud.update_user(db, data.id, data.authority)
return HttpResultResponse()
......@@ -20,6 +20,8 @@ class Env(BaseSettings):
"pwd": "c1ea602311a369f6",
"user": "root"
}
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
class TestingEnv(Env):
......
......@@ -3,11 +3,9 @@ from sqlalchemy.orm import Session
from fastapi import Depends, APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
from typing import Union
from app import get_db
from app.api.users.login_verification import SECRET_KEY, ALGORITHM, get_user
from app.api.users import crud
from app.api.users.login_verification import get_user
from core.config.env import env
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
......@@ -15,7 +13,7 @@ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def auth_token(token: str = Depends(oauth2_scheme)):
try:
# 解密tokens
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
payload = jwt.decode(token, env.SECRET_KEY, algorithms=[env.ALGORITHM])
# 从tokens的载荷payload中获取用户名
username: str = payload.get('username')
# 如果没有获取到,抛出异常
......@@ -30,18 +28,24 @@ def auth_token(token: str = Depends(oauth2_scheme)):
def get_current_user(db: Session, token: str = Depends(oauth2_scheme)):
'''获取当前用户信息,实际上是一个解密token的过程
"""
获取当前用户信息,实际上是一个解密token的过程
:param db: 数据模型
:param token: 携带的token
:return:
'''
"""
try:
# 解密tokens
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从tokens的载荷payload中获取用户名
username: str = payload.get('username')
user_data = get_user(db=db, username=username)
user_info = crud.get_user_by_name(db, user_data.username)
auth_list = user_info.authority.split(',')
json_data = {
"username": user_data.username
"username": user_data.username,
"authority": [int(i) for i in auth_list],
"type": 0 if '0' in auth_list else 1
}
return json_data
except JWTError as e:
......
......@@ -12,7 +12,7 @@ class User(Base):
hashed_password = Column(String(50), comment="密码")
config_key = Column(String(50))
uuid = Column(String(50))
authority = Column(Integer, comment="权限")
authority = Column(String(50), comment="权限")
google_key = Column(String(255), comment="谷歌动态码secret标识")
remaining_sum = Column(Float, comment="账户余额")
entry_account = Column(String(255), comment="入账类目")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment