Commit 4cdfb57a authored by xupeng's avatar xupeng

代码修改

parent 69fc622c
......@@ -5,7 +5,7 @@ from typing import Optional
class RoleCreate(BaseModel):
role_name: str
authority: list
remark: str
remark: Optional[int] = None
class RoleUpdate(RoleCreate):
......
......@@ -33,7 +33,7 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
expire = datetime.utcnow() + expires_delta
else:
# 否则的话,就默认用15分钟
expire = datetime.utcnow() + timedelta(minutes=15)
expire = datetime.utcnow() + timedelta(minutes=60)
to_encode.update({'exp': expire})
# 编码,至此 JWT tokens诞生
encoded_jwt = jwt.encode(to_encode, env.SECRET_KEY, algorithm=env.ALGORITHM)
......
......@@ -7,7 +7,7 @@ class UserBase(BaseModel):
class UserCreate(UserBase):
remark: str
remark: Optional[str] = None
role_id: int
......
......@@ -10,7 +10,7 @@ from app.api.users.schemas import UserLoginForm, GoogleCode, GoogleLogin
from libs.google_code import get_qrcode, google_verify_result
from libs.img_code import imageCode
from libs.result_format import HttpResultResponse, HttpMessage
from libs.token_verify import auth_token, get_current_user
from libs.token_verify import auth_token, get_current_user, login_required
router = APIRouter()
......@@ -60,8 +60,9 @@ def token_user(token=Depends(auth_token), db: Session = Depends(get_db)):
@router.post("/create")
def create_user(data: schemas.UserCreate, db: Session = Depends(get_db)):
def create_user(data: schemas.UserCreate,token=Depends(login_required), db: Session = Depends(get_db)):
"""添加用户"""
print(data)
db_user = crud.get_user_by_name(db, username=data.username)
if db_user:
return HttpResultResponse(code=400, msg=HttpMessage.USER_EXIST)
......
File deleted
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import sys
region ='ap-guangzhou'
token = None
scheme ='https'
Bucket="fj-dc-test-1256890024" #测试桶
secret_id='AKIDra5Ur292g4FCzYrwmMhAOQFsHSP9wb3S'
secret_key='JRKyzpSr1wc5OXXUFsGPKtKfsvqcEcqw'
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config)
file_name = 'a.txt'
with open('./a.txt', 'rb') as fp:
response = client.put_object(
Bucket=Bucket, # Bucket 由 BucketName-APPID 组成
Body=fp,
Key=file_name,
StorageClass='STANDARD',
ContentType='text/html; charset=utf-8'
)
print(response['ETag'])
\ No newline at end of file
import datetime
from libs.functions import md5
from jose.exceptions import JWEError
from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from jose import jwt, JWTError, ExpiredSignatureError
from app import get_db
from app.api.role.crud import get_id_to_authority
from app.api.users import crud
from app.api.users.login_verification import get_user
from core.config.env import env
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def auth_token(token: str = Depends(oauth2_scheme)):
try:
# 解密tokens
payload = jwt.decode(token, env.SECRET_KEY, algorithms=[env.ALGORITHM])
payload = jwt.decode(token,env.SECRET_KEY, algorithms=[env.ALGORITHM])
# 从tokens的载荷payload中获取用户名
username: str = payload.get('username')
# 如果没有获取到,抛出异常
......@@ -54,3 +56,46 @@ def get_current_user(db: Session, token: str = Depends(oauth2_scheme)):
return json_data
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f'{e}')
def judgeToken(token):
"""
判断token
:param token: token串
:return: boolen
"""
try:
db: Session =next(get_db())
payload = jwt.decode(token, env.SECRET_KEY, algorithms=[env.ALGORITHM])
user_data = get_user(db,username= payload.get("username"))
old_password=md5(payload["password"])
if old_password == user_data.hashed_password:
print(md5(payload["password"]))
print(user_data.hashed_password)
return True
else:
print("token 身份错误")
return False
except ExpiredSignatureError as e:
print("token 过期了,{}".format(str(e)))
return False
except JWEError as e:
print("token 验证失败,{}".format(str(e)))
return False
def login_required(token=Depends(oauth2_scheme)):
"""
登录认证token
:param token:
:return:boolen
"""
credentials_exception = HTTPException(
status_code=status.HTTP_411_LENGTH_REQUIRED,
detail="Authenticate fail!",
headers={"WWW-Authenticate":"Bearer"}
)
if judgeToken(token):
return True
else:
raise credentials_exception
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment