Commit b0310bf0 authored by xianyang's avatar xianyang

用户提现,公会提现接口

parent 32ab311b
......@@ -39,4 +39,4 @@ def delete_user(role_id: int, db: Session = Depends(get_db)):
def role_list(data: schemas.RoleList, db: Session = Depends(get_db)):
"""用户列表"""
result = crud.get_roles(db, data)
return HttpResultResponse(count=len(result), data=result)
return HttpResultResponse(total=len(result), data=result)
import math
import threading
import pandas as pd
from sqlalchemy import and_, or_, func
from app.api.statement import schemas
from core.config.env import env
from libs.db_link import LinkMysql
from starlette.responses import StreamingResponse
from sqlalchemy.orm import Session
from models.recharge import Recharge, UserWC, GuildWC
locka = threading.Lock()
# 写入文件
......@@ -23,128 +26,187 @@ class RechargeStatement(object):
def __init__(self):
self.derive_list = []
def check_out(self, ch_sql):
once_res = LinkMysql(env.MysqlDB).query_mysql(ch_sql)
self.derive_list += once_res
def derive_query_data(self, condition_list, old_sql):
if not condition_list:
der_sql = f"select count(*) as total from orders o INNER JOIN v2_user v on o.userid = v.user_id order by o.id desc"
elif len(condition_list) == 1:
der_sql = f"select count(*) as total from orders o INNER JOIN v2_user v on o.userid = v.user_id where {condition_list[0]} order by o.id desc"
else:
der_sql = f"select count(*) as total from orders o INNER JOIN v2_user v on o.userid = v.user_id where {' and '.join(condition_list)} order by o.id desc"
total = LinkMysql(env.MysqlDB).query_mysql(der_sql)
if total:
to = total[0].get('total')
num = math.ceil(to / 10000)
# 多线程
ths = []
# 创建线程
for x in range(num):
query_sql = old_sql + f" LIMIT {x*10000},{10000}"
ths.append(threading.Thread(target=self.check_out, args=[(query_sql)]))
# 启动线程
for y in range(num):
ths[y].start()
# 等待子进程结束
for z in range(num):
ths[z].join()
def check_out(self, db, num):
info_list = []
locka.acquire() # 防止序号错误
once_res = db.query(Recharge).filter().offset(num * 10).limit(10).all()
locka.release()
for i in once_res:
info_dict = i.to_dict()
info_list.append(info_dict)
info_list.reverse()
self.derive_list += info_list
def query_all_data(self, db):
msg_count = db.query(func.count(Recharge.id)).scalar()
num = math.ceil(msg_count / 10)
# 创建线程
ths = []
# 创建线程
for x in range(num):
ths.append(threading.Thread(target=self.check_out, args=[db, x]))
# 启动线程
for y in range(num):
ths[y].start()
# 等待子进程结束
for z in range(num):
ths[z].join()
return self.derive_list
def get_statements(self, param, sp=None):
query_list = []
if not sp:
param: schemas.StatementList
def get_statements(self, db: Session, param, sp=None):
"""列表"""
not_null_filters = []
if param.order_number:
query_list.append(f"o.order_number LIKE '%{param.order_number}%'")
not_null_filters.append(Recharge.order_number == param.order_number)
if param.user_id:
query_list.append(f"o.userid={int(param.user_id)}")
not_null_filters.append(Recharge.user_id == param.user_id)
if param.sid:
query_list.append(f"o.sid LIKE '%{param.sid}%'")
not_null_filters.append(Recharge.sid.like(param.sid))
if param.paychannel:
query_list.append(f"o.paychannel LIKE '%{param.paychannel}%'")
not_null_filters.append(Recharge.pay_channel.like(param.pay_channel))
if param.start_time:
query_list.append(f"o.current>='{param.start_time}'")
not_null_filters.append(Recharge.current >= param.start_time)
if param.end_time:
query_list.append(f"o.current <= '{param.end_time}'")
fields = ["order_number", "userid", "user_number", "nick_name", "cast(money as char) as money", "paychannel",
"sid", "lastupdate", "cast(current as char) as current", "payment_time"]
if not query_list:
sql = f"select {','.join(fields)} from orders o INNER JOIN v2_user v on o.userid = v.user_id order by o.id desc"
elif len(query_list) == 1:
sql = f"select {','.join(fields)} from orders o INNER JOIN v2_user v on o.userid = v.user_id where {query_list[0]} order by o.id desc"
else:
sql = f"select {','.join(fields)} from orders o INNER JOIN v2_user v on o.userid = v.user_id where {' and '.join(query_list)} order by o.id desc"
not_null_filters.append(Recharge.current <= param.end_time)
# 判断有无条件
try:
if len(not_null_filters) > 0:
get_user_orm_sql = db.query(Recharge).filter(and_(*not_null_filters))
condition_data = db.execute(get_user_orm_sql).fetchall()
serializer_info = [i[0].to_dict() for i in condition_data]
serializer_info.reverse()
else:
serializer_info = self.query_all_data(db)
except Exception as e:
print(e)
return [], 0, 0 if sp else []
# 判断是列表还是导出接口
if sp:
sql += f" LIMIT {(int(param.page) - 1) * param.size},{param.page * param.size}"
query_res = LinkMysql(env.MysqlDB).query_mysql(sql)
else:
query_res = self.derive_query_data(query_list, sql)
if query_res:
df = pd.DataFrame(query_res)
print(df)
count = df.groupby("money").sum()
# amount_list = list(count["money"].values)
if serializer_info:
total = len(serializer_info)
df = pd.DataFrame(serializer_info)
count = df['money'].apply(lambda x: x).sum()
return serializer_info[(int(param.page) - 1) * param.size:param.size * param.page], total, count
return [], 0, 0
else:
count = 0
return query_res, count
return serializer_info
class WithdrawStatement(object):
"""提现报表"""
@staticmethod
def get_user_withdraw_cash(param, ty=None):
"""用户提现"""
def __init__(self):
self.derive_user_list = []
self.derive_guild_list = []
def thread_to_data(self, db, num):
user_list = []
locka.acquire()
once_res = db.query(UserWC).filter().offset(num * 10).limit(10).all()
locka.release()
for i in once_res:
info_dict = i.to_dict()
user_list.append(info_dict)
self.derive_user_list += user_list
def query_user_all_data(self, db):
msg_count = db.query(func.count(UserWC.id)).scalar()
num = math.ceil(msg_count / 10)
# 创建线程
ths = []
for x in range(num):
ths.append(threading.Thread(target=self.thread_to_data, args=[db, x]))
for y in range(num):
ths[y].start()
for z in range(num):
ths[z].join()
return self.derive_user_list
def get_user_withdraw_cash(self, db: Session, param):
"""用户提现"""
is_filters = []
if param.user_id:
user_list.append(f"o.userid={param.user_id}")
if param.status:
user_list.append(f"o.status={param.status}")
is_filters.append(UserWC.user_id == param.user_id)
if param.status or param.status == 0:
is_filters.append(UserWC.status == param.status)
if param.start_time:
user_list.append(f"o.current>='{param.start_time}'")
is_filters.append(UserWC.current >= param.start_time)
if param.end_time:
user_list.append(f"o.current <= '{param.end_time}'")
fields = ["id", "bank_code", "usernumber", "nickname", "truename", "idcard", "account", "status", "money",
"platformServiceFee", "thirdServiceFee", "final_money", "cast(current as char) as current"]
if not user_list:
sql = f"select {','.join(fields)} from tixian_order order by id desc"
elif len(user_list) == 1:
sql = f"select {','.join(fields)} from tixian_order where {user_list[0]} order by id desc"
else:
sql = f"select {','.join(fields)} from tixian_order where {' and '.join(user_list)} order by id desc"
if ty:
sql += f" LIMIT {(int(param.page)-1) * param.size},{param.page * param.size}"
query_res = LinkMysql(env.MysqlDB).query_mysql(sql)
return query_res
@staticmethod
def get_guild_withdraw_cash(cond, co):
is_filters.append(UserWC.current <= param.end_time)
# 判断有无条件
try:
if len(is_filters) > 0:
get_user_orm_sql = db.query(UserWC).filter(and_(*is_filters))
condition_data = db.execute(get_user_orm_sql).fetchall()
user_info = [i[0].to_dict() for i in condition_data]
else:
user_info = self.query_user_all_data(db)
except Exception as e:
print(e)
return [], 0, 0, 0
# 判断是列表还是导出接口
user_info.reverse()
if user_info:
total = len(user_info)
df = pd.DataFrame(user_info)
count = df['money'].apply(lambda x: x).sum()
final_count = df['final_money'].apply(lambda x: float(x)).sum()
return user_info[(int(param.page) - 1) * param.size:param.size * param.page], total, count, final_count
return [], 0, 0, 0
def dispose_guild_to_data(self, db, num):
user_list = []
locka.acquire()
once_res = db.query(GuildWC).filter().offset(num * 10).limit(10).all()
locka.release()
for i in once_res:
info_dict = i.to_dict()
user_list.append(info_dict)
self.derive_user_list += user_list
def query_guild_all_data(self, db):
msg_count = db.query(func.count(UserWC.id)).scalar()
num = math.ceil(msg_count / 10)
# 创建线程
ths = []
for x in range(num):
ths.append(threading.Thread(target=self.dispose_guild_to_data, args=[db, x]))
for y in range(num):
ths[y].start()
for z in range(num):
ths[z].join()
return self.derive_user_list
def get_guild_withdraw_cash(self, db: Session, cond):
"""公会提现"""
guild_list = []
is_guild_filters = []
if cond.guild_id:
guild_list.append(f"o.guild_id={cond.guild_id}")
if cond.status:
guild_list.append(f"o.status LIKE '%{cond.status}%'")
is_guild_filters.append(GuildWC.guild_id == cond.guild_id)
if cond.status or cond.status == 0:
is_guild_filters.append(GuildWC.status == cond.status)
if cond.start_time:
guild_list.append(f"o.current>='{cond.start_time}'")
is_guild_filters.append(GuildWC.update_time >= cond.start_time)
if cond.end_time:
guild_list.append(f"o.current <= '{cond.end_time}'")
gui_fields = ["account", "finalMoney", "guild_name", "x.id", "money", "parentid", "taxRate", "thirdServiceFee",
"x.type", "x.update_time"]
if not guild_list:
sql = f"select {','.join(gui_fields)} from guild_ti_xian x INNER JOIN guild g on x.guild_id = g.id order by o.id desc"
elif len(guild_list) == 1:
sql = f"select {','.join(gui_fields)} from guild_ti_xian x INNER JOIN guild g on x.guild_id = g.id where {guild_list[0]} order by o.id desc"
else:
sql = f"select {','.join(gui_fields)} from guild_ti_xian x INNER JOIN guild g on x.guild_id = g.id where {' and '.join(guild_list)} order by o.id desc"
if co:
sql += f" LIMIT {(int(cond.page) - 1) * cond.size},{cond.page * cond.size}"
query_res = LinkMysql(env.MysqlDB).query_mysql(sql)
return query_res
is_guild_filters.append(GuildWC.update_time <= cond.end_time)
# 判断有无条件
try:
if len(is_guild_filters) > 0:
get_user_orm_sql = db.query(GuildWC).filter(and_(*is_guild_filters))
condition_data = db.execute(get_user_orm_sql).fetchall()
guild_info = [i[0].to_dict() for i in condition_data]
else:
guild_info = self.query_guild_all_data(db)
except Exception as e:
print(e)
return [], 0, 0, 0
# 判断是列表还是导出接口
guild_info.reverse()
if guild_info:
total = len(guild_info)
df = pd.DataFrame(guild_info)
count = df['money'].apply(lambda x: x).sum()
final_count = df['final_money'].apply(lambda x: float(x)).sum()
return guild_info[(int(cond.page) - 1) * cond.size:cond.size * cond.page], total, count, final_count
return [], 0, 0, 0
......@@ -41,3 +41,12 @@ class UserWithdrawalList(BaseModel):
end_time: Optional[str] = ""
status: Optional[int] = None
user_id: Optional[int] = None
class GuildWithdrawalList(BaseModel):
page: Optional[int] = None
size: Optional[int] = None
start_time: Optional[str] = ""
end_time: Optional[str] = ""
status: Optional[int] = None
guild_id: Optional[int] = None
from app.api.statement import crud, schemas
from fastapi import APIRouter
from app import get_db
from fastapi import Depends, APIRouter
from sqlalchemy.orm import Session
from app.api.statement.crud import RechargeStatement, WithdrawStatement
from libs.result_format import HttpResultResponse
......@@ -8,28 +9,29 @@ router = APIRouter()
@router.post("/recharge/list")
def statement_recharge_list(data: schemas.StatementList):
def statement_recharge_list(data: schemas.StatementList, db: Session = Depends(get_db)):
"""充值报表列表"""
statement_list, money = RechargeStatement().get_statements(data, 1)
return HttpResultResponse(count=len(statement_list), total=money, data=statement_list)
statement_list, total, money = RechargeStatement().get_statements(db, data, 1)
return HttpResultResponse(total=total, count=float(money), data=statement_list)
@router.post("/derive/excel")
def statement_derive_excel(data: schemas.StatementList):
def statement_derive_excel(data: schemas.StatementList, db: Session = Depends(get_db)):
"""充值报表导出"""
statement_list = RechargeStatement().get_statements(data)
statement_list = RechargeStatement().get_statements(db, data)
return crud.data_to_file(statement_list, "充值报表")
@router.post("/userWithdrawal/list")
def user_withdrawal_list(data: schemas.UserWithdrawalList):
def user_withdrawal_list(data: schemas.UserWithdrawalList, db: Session = Depends(get_db)):
"""用户提现列表"""
statement_list = WithdrawStatement().get_user_withdraw_cash(data, 1)
return HttpResultResponse(data=statement_list)
statement_list, total, money, final_money = WithdrawStatement().get_user_withdraw_cash(db, data)
return HttpResultResponse(total=total, count=float(money), actual_count=final_money, data=statement_list)
@router.post("/guildWithdrawal/list")
def guild_withdrawal_list(data: schemas.GuildWithdrawalList, db: Session = Depends(get_db)):
"""公会提现列表"""
guild_list, total, money, final_money = WithdrawStatement().get_guild_withdraw_cash(db, data)
return HttpResultResponse(total=total, count=float(money), actual_count=final_money, data=guild_list)
@router.post("/userWithdrawal/excel")
def user_withdrawal_excel(data: schemas.UserWithdrawalList):
"""用户提现导出"""
user_list = WithdrawStatement().get_user_withdraw_cash(data)
return crud.data_to_file(user_list, "用户提现")
......@@ -82,7 +82,7 @@ def read_user(data: schemas.PermissionCreate, db: Session = Depends(get_db)):
def user_list(data: schemas.UserList, db: Session = Depends(get_db)):
"""用户列表"""
result = crud.get_users(db, data)
return HttpResultResponse(count=len(result), data=result)
return HttpResultResponse(total=len(result), data=result)
@router.delete("/delete/{user_id}")
......
from core.storage.db import engine
from models import users, roles
from models import users, roles, recharge
# 映射模型表
users.Base.metadata.create_all(bind=engine)
roles.Base.metadata.create_all(bind=engine)
recharge.Base.metadata.create_all(bind=engine)
from sqlalchemy import Column, Integer, String, Float, DateTime
from core.storage.db import Base
from sqlalchemy_serializer import SerializerMixin
class Recharge(Base, SerializerMixin):
"""充值表"""
__tablename__ = "recharge"
id = Column(Integer, primary_key=True, index=True)
order_number = Column(String(255), comment="订单编号")
user_id = Column(Integer, comment="用户ID")
user_number = Column(Integer, comment="朱贝号")
nick_name = Column(String(255), comment="昵称")
money = Column(Float, comment="金额")
pay_channel = Column(Integer, comment="充值渠道(-1:公会后台 0:后台添加,1:支付宝,2:网银,4:神州行充值卡,5:联通充值,3:骏网充值,"
"6第三方充值,7利用公会充值账号充值)")
sid = Column(String(255), comment="订单序列号(在后台充值记录的是添加的备注,通过网银等充值记录的是充值成功第三方返回的订单id)")
last_update = Column(Integer, comment="最后更新时间")
current = Column(DateTime, comment="回调成功时间")
payment_time = Column(Integer, comment="实际第三方订单支付时间")
class UserWC(Base, SerializerMixin):
"""用户提现"""
__tablename__ = "user_withdraw_cash"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, comment="用户ID")
bank_code = Column(String(255), comment="银行编号")
user_number = Column(Integer, comment="用户的视频号(朱贝号)")
nick_name = Column(String(255), comment="昵称")
true_name = Column(String(255), comment="用户真实姓名")
id_card = Column(String(255), comment="用户的身份证号码")
account = Column(String(255), comment="提现账号")
status = Column(Integer, comment="申请的进度,0发起申请,1处理中,3成功,4失败,5已到账,6未到账")
money = Column(Float, comment="金额")
platform_service_fee = Column(String(50), comment="平台服务费")
third_service_fee = Column(String(50), comment="第三方费用")
final_money = Column(String(50), comment="实得收益")
current = Column(DateTime, comment="回调成功时间")
class GuildWC(Base, SerializerMixin):
"""公会提现"""
__tablename__ = "guild_withdraw_cash"
id = Column(Integer, primary_key=True, index=True)
guild_id = Column(Integer, comment="公会ID")
merchants_id = Column(Integer, comment="招商ID")
account = Column(String(255), comment="账号")
guild_name = Column(String(255), comment="公会名称")
money = Column(Float, comment="提现金额")
status = Column(Integer, comment="提现状态,1为处理中,2为成功,3为拒绝")
platform_service_fee = Column(String(50), comment="平台服务费")
third_service_fee = Column(String(50), comment="第三方费用")
final_money = Column(String(50), comment="实得收益")
create_time = Column(DateTime, comment="创建时间")
update_time = Column(DateTime, comment="修改时间")
class Settlement(Base, SerializerMixin):
"""公会结算"""
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment