Commit 52818038 authored by xianyang's avatar xianyang

角色相关接口

parent d26ee1d2
from fastapi import APIRouter
from app.api.users import views as u_view
from app.api.statement import views as s_view
from app.api.role import views as r_view
api_router = APIRouter()
# api_router.include_router(login.router, tags=["login"])
# api_router.include_router(items.router, prefix="/items", tags=["items"])
api_router.include_router(u_view.router, prefix="/users", tags=["users"])
api_router.include_router(s_view.router, prefix="/statement", tags=["statement"])
api_router.include_router(r_view.router, prefix="/role", tags=["role"])
from datetime import datetime
from sqlalchemy.orm import Session
from app.api.role import schemas
from models import roles as models
def get_role_by_name(db: Session, role_name: str):
return db.query(models.Role).filter(models.Role.role_name == role_name).first()
def get_roles(db: Session, param):
page = param.page if param.page else 1
size = param.size if param.size else 10
name = param.role_name if param.role_name else ""
if name:
return db.query(models.Role).filter(models.Role.role_name.like(f'%{name}%')).offset((int(page) - 1) * size).\
limit(page * size).all()
else:
return db.query(models.Role).offset((int(page) - 1) * size).limit(page * size).all()
def create_role(db: Session, user: schemas.RoleCreate):
try:
db_user = models.Role(role_name=user.role_name, authority=",".join(map(str, user.authority)),
remark=user.remark, create_time=datetime.now())
db.add(db_user)
db.commit()
db.refresh(db_user)
except Exception as e:
print(e)
return {}
return db_user
def update_role(db: Session, role_info):
if role_info.role_name:
db.query(models.Role).filter(models.Role.id == role_info.id).update(
{models.Role.role_name: role_info.role_name})
if role_info.remark:
db.query(models.Role).filter(models.Role.id == role_info.id).update(
{models.Role.remark: role_info.remark})
if role_info.authority:
db.query(models.Role).filter(models.Role.id == role_info.id).update(
{models.Role.authority: ','.join(map(str, role_info.authority))})
db.commit()
def delete_role(db: Session, role_id: int):
db.query(models.Role).filter(models.Role.id == role_id).delete()
db.commit()
from pydantic import BaseModel
from typing import Optional
from sqlalchemy.orm import Session
from models import roles as models
class RoleCreate(BaseModel):
role_name: str
authority: list
remark: str
class RoleUpdate(RoleCreate):
id: int
class RoleList(BaseModel):
page: Optional[int] = None
size: Optional[int] = None
role_name: Optional[str] = ""
from fastapi import Depends, APIRouter
from sqlalchemy.orm import Session
from app import get_db
from app.api.role import crud, schemas
from libs.result_format import HttpResultResponse, HttpMessage
router = APIRouter()
@router.post("/create")
def create_user(data: schemas.RoleCreate, db: Session = Depends(get_db)):
"""添加角色"""
db_role = crud.get_role_by_name(db, role_name=data.role_name)
if db_role:
return HttpResultResponse(code=400, msg=HttpMessage.ROLE_EXIST)
res = crud.create_role(db=db, user=data)
if not res:
return HttpResultResponse(code=500, msg=res)
return HttpResultResponse(data=res.id)
@router.post("/update")
def read_user(data: schemas.RoleUpdate, db: Session = Depends(get_db)):
"""角色权限分配"""
crud.update_role(db, data)
return HttpResultResponse()
@router.delete("/delete/{role_id}")
def delete_user(role_id: int, db: Session = Depends(get_db)):
"""角色删除"""
crud.delete_role(db, role_id)
return HttpResultResponse()
@router.post("/list")
def role_list(data: schemas.RoleList, db: Session = Depends(get_db)):
"""用户列表"""
result = crud.get_roles(db, data)
return {"code": 200, "msg": "操作成功", "count": len(result), "data": result}
import pandas as pd
from sqlalchemy.orm import Session
from core.config.env import env
from libs.db_link import LinkMysql
from models import statement as models
from starlette.responses import StreamingResponse
def get_statement(db: Session, user_id: int):
return db.query(models.Statement).filter(models.Statement.id == user_id)
# def get_statement(db: Session, user_id: int):
# return db.query(models.Statement).filter(models.Statement.id == user_id)
def get_statement_by_name(db: Session, username: str):
return db.query(models.Statement).filter(models.Statement.username == username).first()
# def get_statement_by_name(db: Session, username: str):
# return db.query(models.Statement).filter(models.Statement.username == username).first()
def get_statements(param):
......
from typing import Optional
from pydantic import BaseModel
......
......@@ -5,40 +5,32 @@ from libs.functions import md5, uuid
from models import users as models
def get_user(db: Session, user_id: int):
user = db.query(models.User).filter(models.User.id == user_id).first()
user_dict = {
"authority": user.authority,
"config_key": user.config_key,
"create_time": str(user.create_time),
"description": user.description,
"google_key": user.google_key,
"hashed_password": user.hashed_password,
"id": user.id,
"remaining_sum": user.remaining_sum,
"unique": user.unique,
"update_time": str(user.update_time),
"username": user.username,
"uuid": user.uuid,
}
return user_dict
def get_user_by_name(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def get_users(db: Session, param):
page = param.page if param.page else 1
size = param.size if param.size else 10
username = param.username if param.username else ""
if username:
return db.query(models.User).filter(models.User.username.like(f'%{username}%')).offset((int(page) - 1) * size).\
limit(page * size).all()
else:
return db.query(models.User).offset((int(page) - 1) * size).limit(page * size).all()
def create_user(db: Session, user: schemas.UserCreate):
db_user = models.User(username=user.username, description=user.description, unique=user.unique,
config_key=user.config_key, uuid=uuid(), hashed_password=md5("123456"),
create_time=datetime.now())
db.add(db_user)
db.commit()
db.refresh(db_user)
try:
db_user = models.User(username=user.username, description=user.description, unique=user.unique,
config_key=user.config_key, uuid=uuid(), hashed_password=md5("123456"),
create_time=datetime.now())
db.add(db_user)
db.commit()
db.refresh(db_user)
except Exception as e:
print(e)
return {}
return db_user
......
from pydantic import BaseModel
from typing import Optional
class UserBase(BaseModel):
......@@ -39,3 +40,9 @@ class PermissionCreate(BaseModel):
id: int
authority: list
class UserList(BaseModel):
page: Optional[int] = None
size: Optional[int] = None
username: Optional[str] = ""
......@@ -15,12 +15,6 @@ from libs.token_verify import auth_token, get_current_user
router = APIRouter()
@router.get("/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@router.get("/imgCode")
def img_code():
return HttpResultResponse(data=imageCode().getImgCode())
......@@ -31,7 +25,7 @@ def login(form_data: UserLoginForm, db: Session = Depends(get_db)):
user_info = authenticate_user(db=db, form_data=form_data)
if not user_info.get("result"):
return HttpResultResponse(code=500, msg=user_info.get("msg"), data={})
return HttpResultResponse(msg=HttpMessage.HFDU, data={"google_key": user_info.get("google_key")})
return HttpResultResponse(msg=HttpMessage.LOGIN_SUCCESS, data={"google_key": user_info.get("google_key")})
@router.post("/goodleCode")
......@@ -53,11 +47,18 @@ def google_login(data: GoogleLogin, db: Session = Depends(get_db)):
access_token_expires = timedelta(hours=12)
token_data = {"username": data.username, "password": data.password}
access_token = create_access_token(data=token_data, expires_delta=access_token_expires)
return HttpResultResponse(msg=HttpMessage.HFDU, data={"access_token": access_token, "token_type": "bearer"})
return HttpResultResponse(msg=HttpMessage.LOGIN_SUCCESS, data={"access_token": access_token, "token_type": "bearer"})
else:
return HttpResultResponse(code=500, msg="登录失败,谷歌动态码错误", data={})
@router.get("/tokenUser")
def token_user(token=Depends(auth_token), db: Session = Depends(get_db)):
"""通过token,获取登录用户信息"""
user_data = get_current_user(db=db, token=token)
return HttpResultResponse(code=200, data=user_data)
@router.post("/create")
def create_user(data: schemas.UserCreate, db: Session = Depends(get_db)):
"""添加用户"""
......@@ -65,18 +66,20 @@ def create_user(data: schemas.UserCreate, db: Session = Depends(get_db)):
if db_user:
return HttpResultResponse(code=400, msg=HttpMessage.USER_EXIST)
res = crud.create_user(db=db, user=data)
if not res:
return HttpResultResponse(code=500, msg=res)
return HttpResultResponse(data=res.id)
@router.get("/tokenUser")
def token_user(token=Depends(auth_token), db: Session = Depends(get_db)):
"""通过token,获取登录用户信息"""
user_data = get_current_user(db=db, token=token)
return HttpResultResponse(code=200, data=user_data)
@router.post("/permission/allocation")
def read_user(data: schemas.PermissionCreate, db: Session = Depends(get_db)):
"""用户添加权限"""
crud.update_user(db, data.id, data.authority)
return HttpResultResponse()
@router.post("/list")
def user_list(data: schemas.UserList, db: Session = Depends(get_db)):
"""用户列表"""
result = crud.get_users(db, data)
return {"code": 200, "msg": "操作成功", "count": len(result), "data": result}
......@@ -7,7 +7,9 @@ class HttpMessage(object):
SUCCESS = "操作成功"
USER_EXIST = "用户已存在"
USER_NOT_EXIST = "用户不存在"
HFDU = "登录成功"
LOGIN_SUCCESS = "登录成功"
ROLE_EXIST = "角色已存在"
class HttpResultResponse(Response):
......
......@@ -36,7 +36,7 @@ def get_current_user(db: Session, token: str = Depends(oauth2_scheme)):
"""
try:
# 解密tokens
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
payload = jwt.decode(token, env.SECRET_KEY, algorithms=[env.ALGORITHM])
# 从tokens的载荷payload中获取用户名
username: str = payload.get('username')
user_data = get_user(db=db, username=username)
......
from core.storage.db import engine
from models import users, statement
from models import users, roles
# 映射模型表
users.Base.metadata.create_all(bind=engine)
statement.Base.metadata.create_all(bind=engine)
roles.Base.metadata.create_all(bind=engine)
from sqlalchemy import Column, Integer, String, DateTime, Text
from core.storage.db import Base
class Role(Base):
__tablename__ = "role"
id = Column(Integer, primary_key=True, index=True)
role_name = Column(String(100), unique=True, index=True, comment="角色名")
authority = Column(String(225), comment="权限")
remark = Column(Text, comment="备注")
create_time = Column(DateTime, comment="创建时间")
from sqlalchemy import Column, Integer, String, DateTime, Float
from core.storage.db import Base
from datetime import datetime
class Statement(Base):
__tablename__ = "statement"
id = Column(Integer, primary_key=True, index=True)
order_number = Column(String(50), unique=True, index=True, comment='订单号')
user_id = Column(Integer, index=True, comment='用户ID')
zb_number = Column(String(50), comment='朱贝号')
nickname = Column(String(50), comment='昵称')
recharge_amount = Column(Float, comment='充值金额')
recharge_channel = Column(String(50), index=True, comment='充值渠道')
flowing_water_number = Column(String(50), unique=True, index=True, comment='交易流水号')
pay_time = Column(DateTime, comment='支付时间')
callback_time = Column(DateTime, comment='回调成功时间')
actual_pay_time = Column(DateTime, comment='实际支付时间')
create_time = Column(DateTime, default=datetime.now(), comment='创建时间')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment