Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
F
financial-system
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
xianyang
financial-system
Commits
0af1e7cb
Commit
0af1e7cb
authored
Feb 10, 2023
by
Administrator
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
用户登录、验证码获取、谷歌二维码、谷歌验证(修改第二次)————余晋熹
parent
44328aeb
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
40 additions
and
18 deletions
+40
-18
crud.py
app/api/users/crud.py
+1
-1
login_verification.py
app/api/users/login_verification.py
+19
-5
views.py
app/api/users/views.py
+18
-11
google_code.py
libs/google_code.py
+2
-1
No files found.
app/api/users/crud.py
View file @
0af1e7cb
...
...
@@ -23,7 +23,7 @@ def get_users(db: Session, skip: int = 0, limit: int = 100):
def
create_user
(
db
:
Session
,
user
:
schemas
.
UserCreate
):
db_user
=
models
.
User
(
username
=
user
.
username
,
description
=
user
.
description
,
unique
=
user
.
unique
,
config_key
=
user
.
config_key
,
uuid
=
uuid
(),
hashed_password
=
md5
(
"123456"
),
google_key
=
pyotp
.
random_base32
(
64
),
create_time
=
datetime
.
now
())
create_time
=
datetime
.
now
())
db
.
add
(
db_user
)
db
.
commit
()
db
.
refresh
(
db_user
)
...
...
app/api/users/login_verification.py
View file @
0af1e7cb
import
pyotp
from
jose
import
jwt
from
sqlalchemy.orm
import
Session
from
typing
import
Optional
...
...
@@ -52,14 +52,28 @@ def authenticate_user(db: Session, form_data: UserLoginForm):
user_data
=
get_user
(
db
=
db
,
username
=
form_data
.
username
)
# 如果获取为空,返回False
if
not
user_data
:
return
False
,
"用户不存在"
return
{
"result"
:
False
,
"msg"
:
"用户不存在"
}
# 如果密码不正确,也是返回False
md5_password
=
md5
(
form_data
.
password
)
if
md5_password
!=
user_data
.
hashed_password
:
return
False
,
"密码错误"
return
{
"result"
:
False
,
"msg"
:
"密码错误"
}
# 验证码检查
if
form_data
.
verify
.
lower
()
!=
session
.
headers
.
get
(
"verify"
)
.
lower
():
return
False
,
"验证码错误"
return
True
,
False
return
{
"result"
:
False
,
"msg"
:
"验证码错误"
}
if
user_data
.
google_key
:
return
{
"result"
:
True
,
"msg"
:
"验证通过"
,
"google_key"
:
1
}
else
:
return
{
"result"
:
True
,
"msg"
:
"验证通过"
,
"google_key"
:
0
}
def
add_google_key
(
db
:
Session
,
user_id
:
int
):
updata
=
{
"google_key"
:
pyotp
.
random_base32
(
64
),
"update_time"
:
datetime
.
now
()
}
try
:
db
.
query
(
users
.
User
)
.
filter
(
users
.
User
.
id
==
user_id
)
.
update
(
updata
)
db
.
commit
()
return
{
"result"
:
True
,
"google_key"
:
updata
.
get
(
"google_key"
)}
except
Exception
as
e
:
return
{
"result"
:
False
}
\ No newline at end of file
app/api/users/views.py
View file @
0af1e7cb
...
...
@@ -4,7 +4,7 @@ from datetime import timedelta
from
sqlalchemy.orm
import
Session
from
app
import
get_db
from
app.api.users
import
crud
,
schemas
from
app.api.users.login_verification
import
authenticate_user
,
create_access_token
,
get_user
from
app.api.users.login_verification
import
authenticate_user
,
create_access_token
,
get_user
,
add_google_key
from
app.api.users.schemas
import
UserLoginForm
,
GoogleCode
,
GoogleLogin
from
libs.google_code
import
get_qrcode
,
google_verify_result
from
libs.img_code
import
imageCode
...
...
@@ -26,19 +26,19 @@ def imgCode():
@
router
.
post
(
"/login"
)
async
def
login
(
form_data
:
UserLoginForm
,
db
:
Session
=
Depends
(
get_db
)):
user
,
msg
=
authenticate_user
(
db
=
db
,
form_data
=
form_data
)
if
not
user
:
return
HttpResultResponse
(
code
=
500
,
msg
=
msg
,
data
=
{})
# 定义tokens过期时间
access_token_expires
=
timedelta
(
hours
=
12
)
access_token
=
create_access_token
(
data
=
form_data
.
dict
(),
expires_delta
=
access_token_expires
)
return
HttpResultResponse
(
msg
=
HttpMessage
.
HFDU
,
data
=
{
"access_token"
:
access_token
,
"token_type"
:
"bearer"
})
user_info
=
authenticate_user
(
db
=
db
,
form_data
=
form_data
)
if
not
user_info
.
get
(
"result"
):
return
HttpResultResponse
(
code
=
500
,
msg
=
user_info
.
get
(
"msg"
),
data
=
{})
return
HttpResultResponse
(
msg
=
HttpMessage
.
HFDU
,
data
=
{
"google_key"
:
user_info
.
get
(
"google_key"
)})
@
router
.
post
(
"/goodleCode"
)
async
def
goodleCode
(
data
:
GoogleCode
,
db
:
Session
=
Depends
(
get_db
)):
user_data
=
get_user
(
db
,
data
.
username
)
return
get_qrcode
(
username
=
user_data
.
username
,
gtoken
=
user_data
.
google_key
)
update_info
=
add_google_key
(
db
=
db
,
user_id
=
user_data
.
id
)
if
not
update_info
.
get
(
"result"
):
return
HttpResultResponse
(
code
=
500
,
msg
=
"谷歌二维码生成失败"
)
return
get_qrcode
(
username
=
user_data
.
username
,
gtoken
=
update_info
.
get
(
"google_key"
))
@
router
.
post
(
"/googleLogin"
)
...
...
@@ -46,9 +46,16 @@ async def googleLogin(data: GoogleLogin, db: Session = Depends(get_db)):
user_data
=
get_user
(
db
,
data
.
username
)
verify
=
google_verify_result
(
secret_key
=
user_data
.
google_key
,
google_code
=
data
.
google_code
)
if
verify
:
return
HttpResultResponse
(
msg
=
HttpMessage
.
HFDU
)
# 定义tokens过期时间
access_token_expires
=
timedelta
(
hours
=
12
)
token_data
=
{
"username"
:
user_data
.
username
,
"google_key"
:
user_data
.
google_key
}
access_token
=
create_access_token
(
data
=
token_data
,
expires_delta
=
access_token_expires
)
return
HttpResultResponse
(
msg
=
HttpMessage
.
HFDU
,
data
=
{
"access_token"
:
access_token
,
"token_type"
:
"bearer"
})
else
:
return
HttpResultResponse
(
msg
=
"登录失败,谷歌动态码错误"
)
return
HttpResultResponse
(
code
=
500
,
msg
=
"登录失败,谷歌动态码错误"
,
data
=
{}
)
@
router
.
post
(
"/create"
)
...
...
libs/google_code.py
View file @
0af1e7cb
...
...
@@ -9,7 +9,6 @@ from six import BytesIO
def
get_qrcode
(
username
:
str
,
gtoken
:
str
):
# gtoken = pyotp.random_base32(64)
# dirpath = os.path.join(os.getcwd())
data
=
pyotp
.
totp
.
TOTP
(
gtoken
)
.
provisioning_uri
(
username
,
issuer_name
=
"IAM MFA Code"
)
qr
=
QRCode
(
version
=
1
,
...
...
@@ -20,6 +19,8 @@ def get_qrcode(username: str, gtoken: str):
qr
.
add_data
(
data
)
qr
.
make
(
fit
=
True
)
img
=
qr
.
make_image
()
# 图片保存
# dirpath = os.path.join(os.getcwd())
# filepath = dirpath + os.sep + username + '.png'
# img.save(filepath) # 保存条形码图片
# 图片以二进制形式写入
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment