Commit 3e717ec8 authored by xianyang's avatar xianyang

第二版提交

parent 233a2c9b
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor
from datetime import datetime
import pandas as pd
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from app.api.account import schemas
from libs.functions import wrapper_out
from core.config.env import env
from libs.db_link import LinkMysql
from libs.functions import wrapper_out, get_now_timestamp, uuid, get_before_timestamp, time_str_to_timestamp, \
get_yesterday_timestamp
from libs.orm import QueryAllData
from models import account as models
from models.account import AccountFinance, AccountFinanceDetails, AccountType
def get_account(db: Session, name: str):
"""查询单个"""
return db.query(models.Account).filter(models.Account.name == name).first()
def get_account(name):
"""查询单个账户"""
sql = f"select id from fi_account where name='{name}'"
return LinkMysql(env.DB_3YV2).query_mysql(sql)
def get_id_to_authority(db: Session, role_id: int):
return db.query(models.Account).filter(models.Account.id == role_id).first()
def get_account_list(db: Session, param):
"""列表"""
result_list = []
class HDUd():
def __init__(self):
self.result_list = []
if param.name:
count = db.query(func.count(models.Account.id)).filter(models.Account.name.like(f'%{param.name}%')).scalar()
query_res = db.query(models.Account).filter(models.Account.name.like(f'%{param.name}%')).order_by(models.Account.id.desc()).\
offset((int(param.page) - 1) * param.size).limit(param.size).all()
else:
count = db.query(func.count(models.Account.id)).scalar()
query_res = db.query(models.Account).order_by(models.Account.id.desc()).offset((int(param.page) - 1) * param.size).limit(param.size).all()
if not query_res:
return []
for i in query_res:
serializer_info = i.to_dict()
if i.income:
income_list = serializer_info.get('income').split(',')
serializer_info['income'] = [int(i) for i in income_list]
def thead_task(self, i):
if i.get("income"):
income_list = i.get('income').split(',')
i['income'] = [int(i) for i in income_list]
else:
i['income'] = []
if i.get("output"):
output_list = i.get('output').split(',')
i['output'] = [int(i) for i in output_list]
else:
serializer_info['income'] = []
if i.output:
output_list = serializer_info.get('output').split(',')
serializer_info['output'] = [int(i) for i in output_list]
i['output'] = []
i["remark"] = i.get("description")
if i['unique_tag']:
start, end = get_yesterday_timestamp()
if i['unique_tag'] == 'guild_account':
sql = f"select sum(initial_money) as number from v3_guild_account_statistics where create_time>={start} and create_time<{end}"
elif i['unique_tag'] == 'anchor_account':
sql = f"select sum(initial_money) as number from v3_user_account_statistics where date>={start} and date<{end}"
elif i['unique_tag'] == 'user_account':
sql = f"select sum(initial_money) as number from finance_data_calculation_sum where type=1 and calculation_time>={start} and calculation_time<{end}"
elif i['unique_tag'] == 'knapsack_account':
sql = f"select sum(initial_money) as number from finance_data_calculation_sum where type=4 and calculation_time>={start} and calculation_time<{end}"
elif i['unique_tag'] == 'pledgeDeduction':
sql = f"select sum(initial_money) as number from finance_data_calculation_sum where type=5 and calculation_time>={start} and calculation_time<{end}"
else:
sql = f"select sum(initial_money) as number from v2_system_account_statistics where create_time>={start} and create_time<{end}"
money_res = LinkMysql(env.DB_3YV2).query_mysql(sql)
if money_res and money_res[0]['number']:
i['consumable'] = money_res[0]['number']
else:
i['consumable'] = 0
else:
i['consumable'] = 0
self.result_list.append(i)
def get_account_list(self, name, page, size):
"""账户列表,查询"""
if name:
count_sql = f"select count(id) as num from fi_account where name like '%{name}%'"
number = LinkMysql(env.DB_3YV2).query_mysql(count_sql)
if number:
count = number[0].get("num")
else:
count = 0
data_sql = f"select id,name,unique_tag,uuid,beneficiary,description,create_time, income, output from fi_account where name like '%{name}%' ORDER BY id DESC LIMIT {int(page) - 1},{size}"
query_res = LinkMysql(env.DB_3YV2).query_mysql(data_sql)
else:
serializer_info['output'] = []
result_list.append(serializer_info)
return result_list, count
count_sql = f"select count(id) as num from fi_account"
number = LinkMysql(env.DB_3YV2).query_mysql(count_sql)
if number:
count = number[0].get("num")
else:
count = 0
data_sql = f"select id,name,unique_tag,uuid,beneficiary,description,create_time, income, output from fi_account ORDER BY id DESC LIMIT {int(page) - 1},{size}"
query_res = LinkMysql(env.DB_3YV2).query_mysql(data_sql)
if not query_res:
return []
# 多线程
ths = []
# 创建线程
for x in range(len(query_res)):
ths.append(threading.Thread(target=self.thead_task, args=[query_res[x]]))
# 启动线程
for y in range(len(query_res)):
ths[y].start()
# 等待全部结束,再结束
for z in range(len(query_res)):
ths[z].join()
self.result_list.sort(key=lambda q: q['id'], reverse=True)
return self.result_list, count
def get_gift_type(db: Session):
def get_gift_type():
"""礼物类型"""
out_list = []
income_list = []
output = db.query(models.AccountType).all()
for i in output:
try:
zer_info = i.to_dict(only=('id', 'key_name', 'key_value', 'type'))
except:
zer_info = i
if zer_info.get("type") == 1:
income_list.append(zer_info)
gift_sql = "select id,keyName,keyValue,type from fi_account_type"
output = LinkMysql(env.DB_3YV2).query_mysql(gift_sql)
for info in output:
if info.get("type") == 1:
income_list.append(info)
else:
out_list.append(zer_info)
out_list.append(info)
return {"income": income_list, "output": out_list}
def create_account(db: Session, param: schemas.AccountCreate):
"""创建"""
def create_account(param):
"""创建账户"""
try:
db_account = models.Account(name=param.name, unique_tag=param.unique_tag, config_key=param.config_key,
remark=param.remark, income=','.join(map(str, param.income)),
output=','.join(map(str, param.output)), create_time=datetime.now())
db.add(db_account)
db.commit()
db.refresh(db_account)
income = ','.join(map(str, param.income))
output = ','.join(map(str, param.output))
sql = f"insert into fi_account(name, unique_tag, config_key, description, uuid, income, output, create_time) " \
f"values('{param.name}', '{param.unique_tag}', '{param.config_key}', '{param.remark}', '{uuid()}', '{income}', '{output}', {get_now_timestamp()});"
account = LinkMysql(env.DB_3YV2).perform_mysql(sql)
except Exception as e:
print(e)
return {}
return db_account
return ''
return account
def update_account_info(db: Session, old_data):
"""修改"""
db.query(models.Account).filter(models.Account.id == old_data.id).update(
{models.Account.name: old_data.name,
models.Account.remark: old_data.remark,
models.Account.income: ','.join(map(str, old_data.income)),
models.Account.output: ','.join(map(str, old_data.output))})
db.commit()
def update_account_info(old_data):
"""修改账户"""
income = ','.join(map(str, old_data.income))
output = ','.join(map(str, old_data.output))
try:
sql = f"update fi_account set name='{old_data.name}',income='{income}', output='{output}', " \
f"description='{old_data.remark}' where id = {old_data.id}"
LinkMysql(env.DB_3YV2).perform_mysql(sql)
except Exception as e:
print(e)
def get_finance_info(db, data, is_list=None):
def get_finance_info(unique_tag, page, size, start_time, end_time, is_list=None):
"""账户财务信息"""
finance_condition = []
finance_condition.append(AccountFinance.account_id == data.aid)
if data.start_time:
finance_condition.append(AccountFinance.create_time >= data.start_time)
if data.end_time:
finance_condition.append(AccountFinance.create_time <= data.end_time)
try:
get_finance_orm = db.query(AccountFinance).filter(and_(*finance_condition))
condition_data = db.execute(get_finance_orm).fetchall()
serializer_info = [i[0].to_dict() for i in condition_data]
serializer_info.reverse()
except Exception as e:
print(e)
return [], 0 if is_list else []
if start_time:
finance_condition.append(f"create_time >= {time_str_to_timestamp(start_time + ' 00:00:00')} ")
if end_time:
finance_condition.append(f"create_time < {time_str_to_timestamp(end_time + ' 23:59:59')} ")
if unique_tag == 'guild_account':
if finance_condition:
count_sql = f"select count(id) as num from v3_guild_account_statistics_copy where {' and '.join(finance_condition)}"
data_sql = f"select id,initial_money as balance,income,outcome,create_time from v3_guild_account_statistics_copy where {' and '.join(finance_condition)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from v3_guild_account_statistics_copy"
data_sql = f"select id,initial_money as balance,income,outcome,create_time from v3_guild_account_statistics_copy order by id DESC limit {(int(page) - 1) * size},{size}"
elif unique_tag == 'anchor_account':
if finance_condition:
condition = [i.replace('create_time', 'date') for i in finance_condition]
count_sql = f"select count(id) as num from v3_user_account_statistics where type=2 and {' and '.join(condition)}"
data_sql = f"select id,initial_money as balance, income,pay as outcome,date as create_time from v3_user_account_statistics where type=2 and {' and '.join(condition)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from v3_user_account_statistics where type=2"
data_sql = f"select id,initial_money as balance, income,pay as outcome,date as create_time from v3_user_account_statistics where type=2 order by id DESC limit {(int(page) - 1) * size},{size}"
elif unique_tag == 'user_account':
if finance_condition:
condition = [i.replace('create_time', 'calculation_time') for i in finance_condition]
count_sql = f"select count(id) as num from finance_data_calculation_sum where type=1 and {' and '.join(condition)}"
data_sql = f"select id,initial_money as balance,income,outcome,calculation_time as create_time from finance_data_calculation_sum where type=1 and {' and '.join(condition)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from finance_data_calculation_sum where type=1"
data_sql = f"select id,initial_money as balance,income,outcome,calculation_time as create_time from finance_data_calculation_sum where type=1 order by id DESC limit {(int(page) - 1) * size},{size}"
elif unique_tag == 'knapsack_account':
if finance_condition:
condition = [i.replace('create_time', 'calculation_time') for i in finance_condition]
count_sql = f"select count(id) as num from finance_data_calculation_sum where type=4 and {' and '.join(condition)}"
data_sql = f"select id,initial_money as balance,income,outcome,calculation_time as create_time from finance_data_calculation_sum where type=4 and {' and '.join(condition)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from finance_data_calculation_sum where type=4"
data_sql = f"select id,initial_money as balance,income,outcome,calculation_time as create_time from finance_data_calculation_sum where type=4 order by id DESC limit {(int(page) - 1) * size},{size}"
elif unique_tag == 'pledgeDeduction':
if finance_condition:
condition = [i.replace('create_time', 'calculation_time') for i in finance_condition]
count_sql = f"select count(id) as num from finance_data_calculation_sum where type=5 and {' and '.join(condition)}"
data_sql = f"select id,initial_money as balance,income,outcome,calculation_time as create_time from finance_data_calculation_sum where type=5 and {' and '.join(condition)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from finance_data_calculation_sum where type=5"
data_sql = f"select id,initial_money as balance,income,outcome,calculation_time as create_time from finance_data_calculation_sum where type=5 order by id DESC limit {(int(page) - 1) * size},{size}"
else:
if finance_condition:
count_sql = f"select count(id) as num from v2_system_account_statistics where {' and '.join(finance_condition)}"
data_sql = f"select id,initial_money as balance,income,outcome,create_time from v2_system_account_statistics where {' and '.join(finance_condition)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from v2_system_account_statistics"
data_sql = f"select id,initial_money as balance,income,outcome,create_time from v2_system_account_statistics order by id DESC limit {(int(page) - 1) * size},{size}"
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, count_sql)
future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, data_sql)
count = future1.result()
res = future2.result()
# 判断是列表还是导出接口
if is_list:
return serializer_info[(int(data.page) - 1) * data.size:data.size * data.page], len(serializer_info)
return res, count[0]
else:
return serializer_info
return res
def get_finance_details(db, data, is_list=None):
def get_finance_details(page, size, uuid, start_time, end_time, type, gift_type, is_list=None):
"""账户财务明细"""
finance_condition = []
if data.type or data.type == 0:
finance_condition.append(AccountFinanceDetails.type == data.type)
if data.gift_type:
finance_condition.append(AccountFinanceDetails.gift_type == data.gift_type)
if data.start_time:
finance_condition.append(AccountFinanceDetails.create_time >= data.start_time + " 00:00:00")
if data.end_time:
finance_condition.append(AccountFinanceDetails.create_time <= data.end_time + " 23:59:59")
try:
get_details_orm = db.query(AccountFinanceDetails).filter(and_(*finance_condition))
condition_data = db.execute(get_details_orm).fetchall()
serial_info = [i[0].to_dict() for i in condition_data]
serial_info.reverse()
except Exception as e:
print(e)
return [], 0, 0 if is_list else []
details_condition = [f" uuid='{uuid}' "]
if type or type == 0:
details_condition.append(f" type={type}")
if gift_type:
details_condition.append(f" reference_type like '%{gift_type}%'")
if start_time:
details_condition.append(f" create_time >= {time_str_to_timestamp(start_time + ' 00:00:00')} ")
if end_time:
details_condition.append(f" create_time <= {time_str_to_timestamp(end_time + ' 23:59:59')} ")
year_month = datetime.now().strftime('%Y%m')
if details_condition:
count_sql = f"select count(id) as num from assets_log_{year_month} where {' and '.join(details_condition)}"
data_sql = f"select id,order_number,reference_type,amount/1000 as amount,create_time from assets_log_{year_month} where {' and '.join(details_condition)} order by id DESC limit {(int(page) - 1) * size},{size}"
amount_sql = f"select sum(cast(amount as decimal(20,6)))/1000 as total_amount from assets_log_{year_month} where {' and '.join(details_condition)}"
else:
count_sql = f"select count(id) as num from assets_log_{year_month}"
data_sql = f"select id,order_number,reference_type,amount/1000 as amount,create_time from assets_log_{year_month} order by id DESC limit {(int(page) - 1) * size},{size}"
amount_sql = f"select sum(cast(amount as decimal(20,6)))/1000 as total_amount from assets_log_{year_month}"
with ThreadPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(LinkMysql(env.DB_HISTORY).query_mysql, count_sql)
future2 = pool.submit(LinkMysql(env.DB_HISTORY).query_mysql, data_sql)
future3 = pool.submit(LinkMysql(env.DB_HISTORY).query_mysql, amount_sql)
total = future1.result()
res = future2.result()
amount_res = future3.result()
# 判断是列表还是导出接口
if is_list:
if not serial_info:
if not res:
return [], 0, 0
df = pd.DataFrame(serial_info)
count = df['amount'].apply(lambda x: x).sum()
return serial_info[(int(data.page) - 1) * data.size:data.size * data.page], len(serial_info), count
return res, total[0]['num'], amount_res[0]['total_amount']
else:
return serial_info
return res
def get_account_type(db: Session, data):
def get_account_type(db: Session, **data):
"""礼物类型配置列表"""
finance_filters = []
if data.get("key_name"):
......@@ -151,10 +254,6 @@ def get_account_type(db: Session, data):
finance_filters.append(AccountType.key_value == data.get("key_value"))
if data.get("type") or data.get("type") == 0:
finance_filters.append(AccountType.type == data.get("type"))
if data.get("start_time"):
finance_filters.append(AccountType.create_time >= data.get("start_time") + " 00:00:00")
if data.get("end_time"):
finance_filters.append(AccountType.create_time <= data.get("end_time") + " 23:59:59")
querydata, count = QueryAllData(db, AccountType, data, finance_filters).query_data()
data = [QueryAllData.serialization(item) for item in querydata]
return data, count
......
from typing import Optional
from pydantic import BaseModel
from fastapi import HTTPException
from pydantic import BaseModel, validator
class PublicModel(BaseModel):
......@@ -33,17 +35,18 @@ class AccountUpdate(BaseModel):
class FinanceInfo(PublicModel):
aid: int
unique_tag: str
class FinanceDetails(PublicModel):
type: Optional[int] = None
gift_type: Optional[str] = None
gift_type: Optional[str] = ""
uuid: str
class FixTable(BaseModel):
type: int
account_id: int
unique_tag: str
money: float
amount_type: str = ""
remark: str
......@@ -75,10 +78,16 @@ class RecoveryTable(BaseModel):
uuid: str
type: int
out_money: int
entry_money: float
entry_money: int
from_time: str
cont: str
@validator('cont')
def cont_must_contain_space(cls, c):
if len(c) > 200:
raise HTTPException(status_code=500, detail="备注太长,限制200个字符")
return c
class RecoveryupdateTable(RecoveryTable):
id: int
from fastapi import Depends, APIRouter, Request
from typing import Optional
from fastapi import Depends, APIRouter, Request, Query
from sqlalchemy.orm import Session
from app import get_db
from app.api.account import schemas, crud
......@@ -10,45 +12,51 @@ router = APIRouter()
@router.post("/create")
def create_account(data: schemas.AccountCreate, token=Depends(login_required), db: Session = Depends(get_db)):
def create_account(data: schemas.AccountCreate, token=Depends(login_required)):
"""添加账户"""
db_info = crud.get_account(db, name=data.name)
db_info = crud.get_account(data.name)
if db_info:
return HttpResultResponse(code=400, msg=HttpMessage.ACCOUNT_EXIST)
res = crud.create_account(db=db, param=data)
res = crud.create_account(data)
if not res:
return HttpResultResponse(code=500, msg=res)
return HttpResultResponse(data=res.id)
return HttpResultResponse(data=res)
@router.post("/list")
def user_list(data: schemas.AccountList, token=Depends(login_required), db: Session = Depends(get_db)):
@router.get("/list")
def user_list(page: int, size: int, name: Optional[str] = None, token=Depends(login_required)):
"""账户列表"""
result, num = crud.get_account_list(db, data)
result, num = crud.HDUd().get_account_list(name, page, size)
return HttpResultResponse(total=num, data=result)
@router.get("/gift/type")
def gift_type_list(db: Session = Depends(get_db), token=Depends(login_required)):
def gift_type_list(token=Depends(login_required)):
"""礼物类型配置返回"""
result = crud.get_gift_type(db)
result = crud.get_gift_type()
return HttpResultResponse(data=result)
@router.post("/update")
def read_account(data: schemas.AccountUpdate, token=Depends(login_required), db: Session = Depends(get_db)):
def read_account(data: schemas.AccountUpdate, token=Depends(login_required)):
"""账户修改"""
if not data.name:
return HttpResultResponse(code=500, msg="账户名不能为空")
crud.update_account_info(db, data)
crud.update_account_info(data)
return HttpResultResponse()
@router.post("/finance/info")
def finance_information(data: schemas.FinanceInfo, token=Depends(login_required), db: Session = Depends(get_db)):
@router.get("/finance/info")
def finance_information(unique_tag: str,
page: int,
size: int,
start_time: Optional[str] = "",
end_time: Optional[str] = "",
token=Depends(login_required)):
# unique_tag:Optional[str] = Query(None, min_length=3, max_length=50, regex="^xiao\d+$") 参数验证
"""账户财务信息"""
res, total = crud.get_finance_info(db, data, 1)
return HttpResultResponse(total=total, data=res[int(data.page-1)*data.size:data.page*data.size])
res, total = crud.get_finance_info(unique_tag, page, size, start_time, end_time, 1)
return HttpResultResponse(total=total.get("num"), data=res)
@router.post("/finance/info/excel")
......@@ -56,30 +64,42 @@ def finance_info_excel(data: schemas.FinanceInfo, request: Request,
token=Depends(login_required), db: Session = Depends(get_db)):
"""账户财务信息导出"""
headers = request.get("headers")
statement_list = crud.get_finance_info(db, data)
statement_list = crud.get_finance_info(data)
return statement_crud.data_to_file(db, statement_list, "财务信息", headers)
@router.post("/finance/details")
def finance_details(data: schemas.FinanceDetails, token=Depends(login_required), db: Session = Depends(get_db)):
"""账户财务明细"""
res, total, count = crud.get_finance_details(db, data, 1)
@router.get("/finance/details")
def finance_details(page: int,
size: int,
uuid: str,
start_time: Optional[str] = "",
end_time: Optional[str] = "",
type: Optional[int] = None,
gift_type: Optional[str] = "",
token=Depends(login_required)):
"""账户财务详情"""
res, total, count = crud.get_finance_details(page, size, uuid, start_time, end_time, type, gift_type, is_list=1)
return HttpResultResponse(total=total, data=res, count=count)
@router.post("/finance/details/excel")
def finance_info_excel(data: schemas.FinanceDetails, request: Request,
token=Depends(login_required), db: Session = Depends(get_db)):
"""账户财务明细导出"""
"""账户财务详情导出"""
headers = request.get("headers")
statement_list = crud.get_finance_details(db, data)
return statement_crud.data_to_file(db, statement_list, "财务明细", headers)
@router.post("/type")
def finance_fix(data: schemas.AccountTypeList, token=Depends(login_required), db: Session = Depends(get_db)):
@router.get("/type")
def finance_fix(page: int,
size: int,
key_name: str = "",
key_value: str = "",
type: int = None,
token=Depends(login_required), db: Session = Depends(get_db)):
"""出入账目配置列表"""
res, num = crud.get_account_type(db, data.dict(exclude_none=True))
res, num = crud.get_account_type(db, key_name=key_name, key_value=key_value, type=type, page=page, size=size)
return HttpResultResponse(total=num, data=res)
......
......@@ -11,15 +11,16 @@ import pandas as pd
from starlette.responses import StreamingResponse
def get_export_list(db: Session, param):
def get_export_list(db: Session, source, start_time, end_time, page, size):
"""导出列表"""
export_filters = []
if param.get("source"):
export_filters.append(ExportFile.source == param.get("source"))
if param.get("start_time"):
export_filters.append(ExportFile.create_time >= param.get("start_time") + " 00:00:00")
if param.get("end_time"):
export_filters.append(ExportFile.create_time < param.get("end_time") + " 24:00:00")
if source:
export_filters.append(ExportFile.source == source)
if start_time:
export_filters.append(ExportFile.create_time >= start_time + " 00:00:00")
if end_time:
export_filters.append(ExportFile.create_time < end_time + " 24:00:00")
param = {"source": source, "start_time": start_time, "end_time": end_time, "page": page, "size": size}
querydata, count = QueryAllData(db, ExportFile, param, export_filters).query_data()
data = [QueryAllData.serialization(item) for item in querydata]
return data, count
......@@ -74,8 +75,6 @@ class CalculationMonthlyBill(object):
def month_statistics_task(self, date, key_type, name, page, size):
"""主函数"""
db = env.MysqlDB
db["database"] = env.DB_HISTORY
if key_type and not name:
sql = f"SELECT reference_type, type, SUM(cast(amount as decimal(20,6)))/1000 as money FROM {date} where reference_type='{key_type}' GROUP BY reference_type, type ORDER BY reference_type"
if name and not key_type:
......@@ -89,7 +88,7 @@ class CalculationMonthlyBill(object):
if not name and not key_type:
sql = f"SELECT reference_type, type, SUM(cast(amount as decimal(20,6)))/1000 as money FROM {date} GROUP BY reference_type, type ORDER BY reference_type"
try:
res_data = LinkMysql(db).query_mysql(sql)
res_data = LinkMysql(env.DB_HISTORY).query_mysql(sql)
except Exception as e:
return [], 0
for res in res_data:
......@@ -131,23 +130,20 @@ class CalculationMonthlyDetails(object):
result = LinkMysql(db).query_mysql(sql)
return len(result)
def statement_income_expenditure(self, param):
def statement_income_expenditure(self, **param):
database = env.MysqlDB
database["database"] = env.DB_HISTORY
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(self.data_query, database, 'assets_log_' + param.date, param.key, param.is_income, param.page, param.size)
future2 = pool.submit(self.acquired_total, database, 'assets_log_' + param.date, param.key, param.is_income)
future1 = pool.submit(self.data_query, env.DB_HISTORY, 'assets_log_' + param.get("date"), param.get("key"), param.get("is_income"), param.get("page"), param.get("size"))
future2 = pool.submit(self.acquired_total, env.DB_HISTORY, 'assets_log_' + param.get("date"), param.get("key"), param.get("is_income"))
data = future1.result()
num = future2.result()
return data, num
@staticmethod
def query_error_data(date, reference_type):
database = env.MysqlDB
database["database"] = env.DB_HISTORY
group_sql = f"SELECT order_id, COUNT(order_id) as num FROM {date} where reference_type='{reference_type}' GROUP BY order_id"
result = LinkMysql(database).query_mysql(group_sql)
result = LinkMysql(env.DB_HISTORY).query_mysql(group_sql)
error_list = [str(i.get("order_id")) for i in result if i.get("num") != 2]
if len(error_list) == 1:
sql = f"SELECT uuid,reference_type,order_number,order_id,type,cast(amount as decimal(20,6))/1000 as money,amount_type,create_time FROM {date} where order_id={error_list[0]}"
......@@ -155,7 +151,7 @@ class CalculationMonthlyDetails(object):
sql = f"SELECT uuid,reference_type,order_number,order_id,type,cast(amount as decimal(20,6))/1000 as money,amount_type,create_time FROM {date} where order_id in({','.join(error_list)})"
if len(error_list) == 0:
return []
result = LinkMysql(database).query_mysql(sql)
result = LinkMysql(env.DB_HISTORY).query_mysql(sql)
return result
......@@ -176,14 +172,12 @@ class MonthDataDerive(object):
return StreamingResponse(file, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
def derive_data(self, date, key_type):
db = env.MysqlDB
db["database"] = env.DB_HISTORY
if key_type:
sql = f"SELECT reference_type, type, SUM(cast(amount as decimal(20,6)))/1000 as money FROM {date} where reference_type='{key_type}' GROUP BY reference_type, type ORDER BY reference_type"
else:
sql = f"SELECT reference_type, type, SUM(cast(amount as decimal(20,6)))/1000 as money FROM {date} GROUP BY reference_type, type ORDER BY reference_type"
try:
res_data = LinkMysql(db).query_mysql(sql)
res_data = LinkMysql(env.DB_HISTORY).query_mysql(sql)
except Exception as e:
return [], 0
for res in res_data:
......
from datetime import datetime
from typing import Optional
from dateutil.relativedelta import relativedelta
from fastapi import Depends, APIRouter, Request
from sqlalchemy.orm import Session
......@@ -11,10 +13,15 @@ from libs.token_verify import login_required
router = APIRouter()
@router.post("/list")
def export_list(data: schemas.ExportList, token=Depends(login_required), db: Session = Depends(get_db)):
@router.get("/list")
def export_list(page: int,
size: int,
source: Optional[str] = "",
start_time: Optional[str] = "",
end_time: Optional[str] = "",
token=Depends(login_required), db: Session = Depends(get_db)):
"""导出记录列表"""
result, total = crud.get_export_list(db, data.dict(exclude_none=True))
result, total = crud.get_export_list(db, source, start_time, end_time, page, size)
return HttpResultResponse(total=total, data=result)
......@@ -43,23 +50,33 @@ def export_source_query(db: Session = Depends(get_db), token=Depends(login_requi
return HttpResultResponse(data=result)
@router.post("/month/total")
def month_query_total_export(param: schemas.MonthStatistics, token=Depends(login_required)):
@router.get("/month/total")
def month_query_total_export(page: int,
size: int,
date: str = "",
type: str = "",
name: str = "",
token=Depends(login_required)):
"""月度表计算"""
if not param.date:
if not date:
month_date = datetime.now().date() - relativedelta(months=1)
param.date = month_date.strftime("%Y%m")
result, num = crud.CalculationMonthlyBill().month_statistics_task('assets_log_' + param.date, param.type, param.name,
param.page, param.size)
date = month_date.strftime("%Y%m")
result, num = crud.CalculationMonthlyBill().month_statistics_task('assets_log_' + date, type, name,
page, size)
return HttpResultResponse(total=num, data=result)
@router.post("/month/details")
def month_query_total_export(param: schemas.MonthDetails, token=Depends(login_required)):
@router.get("/month/details")
def month_query_total_export(key: str,
is_income: int,
page: int,
size: int,
date: str = "",
token=Depends(login_required)):
"""月度计算,详情"""
if not param.date:
if not date:
return HttpResultResponse(code=500, msg='查询月份不能为空')
result, num = crud.CalculationMonthlyDetails().statement_income_expenditure(param)
result, num = crud.CalculationMonthlyDetails().statement_income_expenditure(key=key, is_income=is_income, page=page, size=size, date=date)
return HttpResultResponse(total=num, data=result)
......
from sqlalchemy.orm import Session
import json
import socket
from concurrent.futures.thread import ThreadPoolExecutor
from libs.functions import get_now_datetime
from libs.orm import QueryAllData
from models.margin import GuildMargin
from models.guild import Guild, GuildPledgeRecord
from models.margin import GuildMargin
from app.api.statement.guild import query_token
from core.config.env import env
from libs.db_link import LinkMysql
from libs.functions import time_str_to_timestamp, get_now_timestamp, get_order, search, get_now_datetime
from libs.token_verify import get_current_user
def get_margin(db: Session, param):
def get_margin(guild_id, status, page, size, start_time, end_time):
"""保证金列表,查询"""
guild_filters = []
if param.get("guild_id"):
guild_filters.append(GuildMargin.guild_id == param.get("guild_id"))
if param.get("status"):
guild_filters.append(GuildMargin.status == param.get("status"))
if param.get("start_time"):
guild_filters.append(GuildMargin.create_time >= param.get("start_time") + " 00:00:00")
if param.get("end_time"):
guild_filters.append(GuildMargin.create_time <= param.get("end_time") + " 23:59:59")
querydata, count = QueryAllData(db, GuildMargin, param, guild_filters).query_data()
data = [QueryAllData.serialization(item) for item in querydata]
return data, count
def margin_dispose(db: Session, param):
# 查询公会
guid_info = db.query(Guild).filter(Guild.id == param.get("guild_id")).first()
if not guid_info:
return '公会不存在'
# 拒绝
if param.get("examine_status") == -1:
db.query(GuildMargin).filter(GuildMargin.id == param.get("id")).update({GuildMargin.status: 5})
db.commit()
return '拒绝成功'
try:
# 修改公会保证金记录 状态
db.query(GuildMargin).filter(GuildMargin.id == param.get("id")).update({GuildMargin.status: 4})
db.commit()
except Exception as e:
print(e)
# 给公会添加保证金
db.query(Guild).filter(Guild.id == param.get("guild_id")).update({Guild.margin: guid_info.margin + param.get("amount")})
# 添加保证金添加记录
try:
gpr = GuildPledgeRecord(guild_id=param.get("guild_id"), before_pears=guid_info.margin, after_pears=guid_info.margin + param.get("amount"),
create_time=get_now_datetime(), update_time=get_now_datetime(), tixian_id=0,
is_handle=1, pledge_pearl=0, margin=param.get("amount"))
db.add(gpr)
db.commit()
db.refresh(gpr)
except Exception as e:
print(e)
if guild_id:
guild_filters.append(f" m.guild_id={guild_id}")
if status:
guild_filters.append(f" m.status={status}")
if start_time:
guild_filters.append(f" m.addtime >= {time_str_to_timestamp(start_time + ' 00:00:00')} ")
if end_time:
guild_filters.append(f" m.addtime <= {time_str_to_timestamp(end_time + ' 23:59:59')} ")
if guild_filters:
count_sql = f"select count(id) as num from guild_margin_history as m where {' and '.join(guild_filters)}"
data_sql = f"select m.id,m.guild_id,g.guild_name,amount,m.status,m.addtime as create_time from guild_margin_history m inner join guild g on m.guild_id=g.id where {' and '.join(guild_filters)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from guild_margin_history"
data_sql = f"select m.id,m.guild_id,g.guild_name,amount,m.status,m.addtime as create_time from guild_margin_history m inner join guild g on m.guild_id=g.id order by id DESC limit {(int(page) - 1) * size},{size}"
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, count_sql)
future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, data_sql)
total = future1.result()
res = future2.result()
return res, total[0]
class GuildMargin(object):
def __init__(self, db, header):
self.db = db
user = query_token(self.db, header)
self.user_id = user.get("user_id")
def margin_dispose(self, param):
# 查询公会
guild_sql = f"select id,margin,ice_uuid,guild_level_id,withdrawal_time,pay_status,is_lucky_gift,is_binding,bind_authority,pledge_rate from guild where id={param.guild_id} limit 0,1"
guid_list = LinkMysql(env.DB_3YV2).query_mysql(guild_sql)
if not guid_list:
return '公会不存在'
guid_info = guid_list[0]
# 拒绝
if param.examine_status == -1:
refuse_sql = f"update guild_margin_history set status=5 where id={param.id}"
LinkMysql(env.DB_3YV2).perform_mysql(refuse_sql)
return '拒绝成功'
try:
# 修改公会保证金记录 状态
agree_sql = f"update guild_margin_history set status=4 where id={param.id}"
LinkMysql(env.DB_3YV2).perform_mysql(agree_sql)
except Exception as e:
print(e)
return '操作失败'
# 给公会添加保证金
agree_sql = f"update guild set margin={float(guid_info.get('margin')) + param.amount} where id={param.guild_id}"
LinkMysql(env.DB_3YV2).perform_mysql(agree_sql)
# 添加保证金添加记录
try:
add_sql = f"insert into guild_pledge_record(guild_id, before_pears, after_pears, create_time, update_time, tixian_id, is_handle, pledge_pearl,margin) " \
f"values({param.guild_id}, {guid_info.get('margin')}, {float(guid_info.get('margin')) + param.amount}, {get_now_timestamp()}, {get_now_timestamp()}, 0, 1, 0,{param.amount});"
LinkMysql(env.DB_3YV2).perform_mysql(add_sql)
except Exception as e:
print(e)
return '操作失败'
# 对接财务 给用户充值
self.recharge_user(guid_info.get('ice_uuid'), param.amount, param.guild_id)
# 调整公会权限
self.guild_authority(param.guild_id, param.amount, guid_info)
return '操作成功'
def recharge_user(self, ice_uuid, money, guild_id, reference_type='marginRecharge', timestamp=None):
timestamp = timestamp if timestamp else get_now_timestamp()
reference_number = get_order()
ip = socket.gethostbyname(socket.gethostname())
request_data = {
'uuid': ice_uuid,
'timestamp': timestamp,
'ip': ip,
'amount': money,
'amount_type': 'withdrawable',
'reference_number': reference_number,
'reference_type': 'reference_type',
'reference_info': '保证金充值',
}
res = search(request_data, 'Server.UserExecute.Recharge')
insert_sql = f"insert into all_record_table(user_id, type, status, uuid, reference_number, money, amount_type, money_data, is_add, create_time, errmsg) " \
f"values({guild_id}, '{reference_type}', 2, '{ice_uuid}', {reference_number}, {money * 10}, 1, '保证金充值',1,{timestamp}, '{json.dumps(res)}');"
if res['status'] == 9:
insert_sql = f"insert into all_record_table(user_id, type, status, uuid, reference_number, money, amount_type, money_data, is_add, create_time, errmsg) " \
f"values({guild_id}, '{reference_type}', 9, '{ice_uuid}', {reference_number}, {money * 10}, 1, '保证金充值',1,{timestamp}, '{json.dumps(res)}');"
if res['status'] == False:
insert_sql = f"insert into all_record_table(user_id, type, status, uuid, reference_number, money, amount_type, money_data, is_add, create_time, errmsg) " \
f"values({guild_id}, '{reference_type}', 3, '{ice_uuid}', {reference_number}, {money * 10}, 1, '保证金充值',1,{timestamp}, '{json.dumps(res)}');"
print(f'保证金错误,订单号:{reference_number}')
reference = LinkMysql(env.DB_3YV2).perform_mysql(insert_sql)
if not reference:
print('充值失败')
def guild_authority(self, guild_id, amount, guild_info):
if not guild_id:
return False
global_sql = "select v from global_config where k='guild_level' limit 0,1"
res = LinkMysql(env.DB_3YV2).query_mysql(global_sql)
if not res:
return False
con = res[0]['v']
config = json.loads(con)
if (float(guild_info.get('margin')) - amount) < float(config['margin']) and float(guild_info.get('margin')) >= float(config['margin']):
content_dict = {
'withdrawal_time': {'name': '提现周期', 'type': 'value'},
'pay_status': {'name': '充值权限', 'type': 'radio'},
'is_lucky_gift': {'name': '幸运奖励权限', 'type': 'radio'},
'is_binding': {'name': '红包绑定权限', 'type': 'radio'},
'bind_authority': {'name': '分享者绑定权限', 'type': 'radio'},
'pledge_rate': {'name': '质押金比例', 'type': 'value'},
}
# 获取配置的key
temp_key = guild_info.get('guild_level_id') - 1
update = {}
content = ''
for k, v in content_dict.items():
if guild_info[k]:
if v['type'] == 'radio':
while config['high'][k][temp_key]:
if config['high'][k][temp_key] == '0':
if guild_info[k] != 0:
update[k] = 0
content += f"{v['name']}由开启修改为关闭,"
break
if config['high'][k][temp_key] == '1':
break
if config['high'][k][temp_key] == '2':
if guild_info[k] != 1:
content += f"{v['name']}由关闭自动修改为开启,"
break
else:
if guild_info[k] != config['high'][k][temp_key]:
update[k] = config['high'][k][temp_key]
content += f"{v['name']}由{guild_info[k]}修改为{config['high'][k][temp_key]},"
if update:
update_list = [f"{k}={float(v)}" for k, v in update.items()]
update_sql = f"update guild set {','.join(update_list)} where id={guild_id}"
LinkMysql(env.DB_3YV2).perform_mysql(update_sql)
insert_guild_sql = f"insert into guild_authority_record(content, json, create_time, admin_id, guild_id, type, reason) " \
f"values('{content}', '{json.dumps(update)}', {get_now_timestamp()}, {self.user_id}, {guild_id}, 2, '公会缴纳保证金{amount}后保证金总数为{float(guild_info['margin']) + amount}高于配置金额,权限由低位调整到高位');"
LinkMysql(env.DB_3YV2).perform_mysql(insert_guild_sql)
from fastapi import Depends, APIRouter
from typing import Optional
from fastapi import Depends, APIRouter, Request
from sqlalchemy.orm import Session
from app import get_db
from app.api.margin import schemas, crud
......@@ -8,15 +10,23 @@ from libs.token_verify import login_required
router = APIRouter()
@router.post("/list")
def guild_margin_list(data: schemas.GuildMarginList, token=Depends(login_required), db: Session = Depends(get_db)):
@router.get("/list")
def guild_margin_list(page: int,
size: int,
guild_id: Optional[int] = None,
status: Optional[int] = None,
start_time: Optional[str] = "",
end_time: Optional[str] = "",
token=Depends(login_required)):
"""保证金列表"""
result, total = crud.get_margin(db, data.dict(exclude_none=True))
return HttpResultResponse(total=total, data=result)
# result, total = crud.get_margin(data.dict(exclude_none=True))
result, total = crud.get_margin(guild_id, status, page, size, start_time, end_time)
return HttpResultResponse(total=total.get('num'), data=result)
@router.post("/examine")
def margin_examine(data: schemas.MarginExamine, token=Depends(login_required), db: Session = Depends(get_db)):
def margin_examine(data: schemas.MarginExamine, request: Request, db: Session = Depends(get_db), token=Depends(login_required)):
"""保证金确认,拒绝"""
result = crud.margin_dispose(db, data.dict(exclude_none=True))
header_list = request.get("headers")
result = crud.GuildMargin(db, header_list).margin_dispose(data)
return HttpResultResponse(msg=result)
......@@ -15,15 +15,15 @@ def get_id_to_authority(db: Session, role_id: int):
return db.query(models.Role).filter(models.Role.id == role_id).first()
def get_roles(db: Session, param):
def get_roles(db: Session, **param):
result_list = []
name = param.role_name if param.role_name else ""
name = param.get("role_name") if param.get("role_name") else ""
if name:
query_res = db.query(models.Role).filter(models.Role.role_name.like(f'%{name}%')).order_by(models.Role.id.desc()).\
offset((int(param.page) - 1) * param.size).limit(param.size).all()
offset((int(param.get("page")) - 1) * param.get("size")).limit(param.get("size")).all()
count = db.query(func.count(models.Role.id)).filter(models.Role.role_name.like(f'%{name}%')).scalar()
else:
query_res = db.query(models.Role).order_by(models.Role.id.desc()).offset((int(param.page) - 1) * param.size).limit(param.size).all()
query_res = db.query(models.Role).order_by(models.Role.id.desc()).offset((int(param.get("page")) - 1) * param.get("size")).limit(param.get("size")).all()
count = db.query(func.count(models.Role.id)).scalar()
if not query_res:
return [], 0
......
from typing import Optional
from fastapi import Depends, APIRouter
from sqlalchemy.orm import Session
from app import get_db
......@@ -36,8 +38,11 @@ def delete_user(role_id: int, token=Depends(login_required), db: Session = Depen
return HttpResultResponse()
@router.post("/list")
def role_list(data: schemas.RoleList, token=Depends(login_required), db: Session = Depends(get_db)):
@router.get("/list")
def role_list(page: int,
size: int,
role_name: Optional[str] = "",
token=Depends(login_required), db: Session = Depends(get_db)):
"""角色列表"""
result, total = crud.get_roles(db, data)
result, total = crud.get_roles(db, role_name=role_name, page=page, size=size)
return HttpResultResponse(total=total, data=result)
import math
import threading
from concurrent.futures.thread import ThreadPoolExecutor
import pandas as pd
from sqlalchemy import and_, func
from app.api.statement.guild import query_token
from starlette.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.api.export import crud
from core.config.env import env
from libs.db_link import LinkMysql
from libs.functions import time_str_to_timestamp
from libs.orm import QueryAllData
from models.recharge import Recharge, UserWC, GuildWC, FinanceFixLog
......@@ -225,17 +230,39 @@ class WithdrawStatement(object):
class FinanceFix(object):
@staticmethod
def get_finance_fix_data(db: Session, data):
def get_finance_fix_data(page, size, start_time, end_time):
"""财务修复"""
# total = db.query(func.count(FinanceFixLog.id)).scalar()
# output = db.query(FinanceFixLog).order_by(FinanceFixLog.id.desc()).offset((data.page - 1) *
# data.size).limit(data.size).all()
# return [i.to_dict() for i in output], total
finance_filters = []
if data.get("start_time"):
finance_filters.append(FinanceFixLog.create_time >= data.get("start_time") + " 00:00:00")
if data.get("end_time"):
finance_filters.append(FinanceFixLog.create_time <= data.get("end_time") + " 23:59:59")
querydata, count = QueryAllData(db, FinanceFixLog, data, finance_filters).query_data()
data = [QueryAllData.serialization(item) for item in querydata]
return data, count
if start_time:
finance_filters.append(f" create_time <= {time_str_to_timestamp(start_time + ' 00:00:00')} ")
if end_time:
finance_filters.append(f" create_time <= {time_str_to_timestamp(end_time + ' 23:59:59')} ")
if finance_filters:
count_sql = f"select count(id) as num from finance_fix_log where {' and '.join(finance_filters)}"
data_sql = f"select id,type,money,unique_tag,amount_type,operator,create_time,remark from finance_fix_log where {' and '.join(finance_filters)} order by id DESC limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select count(id) as num from finance_fix_log"
data_sql = f"select id,type,money,unique_tag,amount_type,operator,create_time,remark from finance_fix_log order by id DESC limit {(int(page) - 1) * size},{size}"
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, count_sql)
future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, data_sql)
total = future1.result()
res = future2.result()
if res:
result = []
for i in res:
if i['type'] != 0:
# user_sql = f"select nick_name from v2_user where uuid='{i['unique_tag']}' limit 0,1"
# v2_user_res = LinkMysql(env.DB_3YV2).query_mysql(user_sql)
# if not v2_user_res:
# guild_sql = f"select guild_name as name from guild where uuid='{i['unique_tag']}' limit 0,1"
# guild_res = LinkMysql(env.DB_3YV2).query_mysql(guild_sql)
# if not guild_res:
# i['name'] = i['unique_tag']
# i['name'] = guild_res[0]['guild_name']
# continue
# i['name'] = v2_user_res[0]['nick_name']
i['name'] = i['unique_tag']
result.append(i)
return result, total[0]['num']
return [], 0
import json
import math
import socket
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from sqlalchemy import and_, func, engine
from sqlalchemy.orm import Session
from libs.functions import get_now_timestamp, get_now_datetime
from core.config.env import env
from libs.db_link import LinkMysql
from libs.functions import get_now_timestamp, get_now_datetime, search, get_order
from libs.orm import QueryAllData
from libs.token_verify import get_current_user
from models.recharge import Settlement, Fitransferlog, FinanceFixLog, Account_log
......@@ -101,7 +106,7 @@ def paymentset_guild_data(db: Session, dbname, params):
def outon_account_data(db: Session, dbname, params):
querydata, count = QueryAllData(db, dbname, params, None).query_data()
data = [QueryAllData.serialization(item,
remove={'operator', 'unique_tag', 'create_time', 'beneficiary', 'description',
remove={'operator', 'create_time', 'beneficiary', 'description',
'create_time', 'config_key', 'income', 'output', 'operator_id'}) for item
in querydata]
return data, count
......@@ -219,65 +224,137 @@ def transfer_money(db: Session, param, h_list):
def create_fix_table(db: Session, param, h_list):
"""增加修复报表"""
user = query_token(db, h_list)
if param.type == 0:
param.amount_type = 'consumable'
try:
present = FinanceFixLog(type=param.type, account_id=param.account_id, money=param.money,
remark=param.remark, operator=user.get("username"), amount_type=param.amount_type,
create_time=get_now_datetime())
db.add(present)
db.commit()
db.refresh(present)
except Exception as e:
return {}
return present
if param.amount_type == 'backpack':
# if (db('guild')->where(['uuid'= > $uuid])->count()) {
# return ['status'= > 0, 'msg' = > '公会账户不能更改背包账户'];
# }
# $pack_account = db('v2_user_config')->where(['user_id' = > $user['user_id']])->value('pack_account');
# if (!$pack_account) {
# return ['status'= > 0, 'msg' = > '该用户不存在背包账户'];
# $user = db('v2_user')->where(['uuid'=>$uuid])->field('user_id,nick_name,user_number')->find();
guild_sql = f"select count(id) as num from guild where uuid='{param.unique_tag}'"
user_sql = f"select user_id,nick_name,user_number from v2_user where uuid='{param.unique_tag}'"
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, guild_sql)
future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, user_sql)
guild_info = future1.result()
user_info = future2.result()
if not guild_info and not user_info:
return 'uuid不存在'
if guild_info:
if guild_info[0]['num']:
return '公会账户不能更改背包账户'
pack_sql = f"select pack_account from v2_user_config where user_id={user_info[0]['user_id']}"
pack_account = LinkMysql(env.DB_3YV2).query_mysql(pack_sql)
if pack_account:
if pack_account[0]['pack_account'] == 0:
return '该用户不存在背包账户'
reference_number = get_order()
ip = socket.gethostbyname(socket.gethostname())
if param.money > 0:
method = 'Server.BaseExecute.Increase'
data = {
"ip": ip,
"dst_uuid": param.unique_tag,
"dst_amount_type": param.amount_type,
"amount": param.money,
"fee": 0,
"reference_number": reference_number,
"reference_type": "finance_admin_fix",
"reference_info": [],
"timestamp": get_now_timestamp()
}
else:
method = 'Server.BaseExecute.Reduce'
data = {
"ip": ip,
"src_uuid": param.unique_tag,
"src_amount_type": param.amount_type,
"amount": param.money,
"fee": 0,
"reference_number": reference_number,
"reference_type": "finance_admin_fix",
"reference_info": [],
"timestamp": get_now_timestamp()
}
clearing_res = search(data, method)
print(clearing_res)
if clearing_res['status']:
user = query_token(db, h_list)
try:
insert_fix_sql = f"insert into finance_fix_log(type, money, create_time, unique_tag, amount_type, remark, operator,operator_id) " \
f"values({param.type}, {param.money}, {get_now_timestamp()}, '{param.unique_tag}', '{param.amount_type}', '{param.remark}', '{user.get('username')}', {user.get('user_id')});"
LinkMysql(env.DB_3YV2).perform_mysql(insert_fix_sql)
except Exception as e:
return e
return ''
return '财务系统出错,请稍后再试!'
def recovery_fix_data(db: Session, dbname, params):
def recovery_fix_data(**params):
recovery_list = []
if params.get("uuid"):
recovery_list.append(dbname.uuid == params.get("uuid"))
recovery_list.append(f"e.uuid = '{params.get('uuid')}'")
if params.get("start_time"):
recovery_list.append(dbname.create_time >= params.get("start_time"))
recovery_list.append(f"e.create_time >= '{params.get('start_time')}'")
if params.get("end_time"):
recovery_list.append(dbname.create_time <= params.get("end_time"))
querydata, count = QueryAllData(db, dbname, params, recovery_list).query_data()
data = [QueryAllData.serialization(item) for item in querydata]
for item in data:
accout_filters = []
accout_filters.append(dbname.uuid == item.get("uuid"))
names = QueryAllData(db, Account, {}, accout_filters).query_filter()
if names:
item['name'] = names.name
item['uuid'] = names.uuid
else:
item['name'] = ''
return data, count
def create_recovery_table(db: Session, param):
recovery_list.append(f"e.create_time < '{params.get('end_time')}'")
if recovery_list:
sum_sql = f"select count(e.id) as num from finance_error_transverse_log as e inner join fi_account as f on e.uuid=f.uuid where {' and '.join(recovery_list)}"
query_sql = f"select e.id,f.name,e.cont,e.out_money,e.entry_money,e.uuid,e.type,e.create_time,e.from_time from finance_error_transverse_log as e inner join fi_account as f on e.uuid=f.uuid where {' and '.join(recovery_list)} order by e.id DESC limit {(int(params.get('page')) - 1) * params.get('size')},{params.get('size')}"
else:
query_sql = f"select e.id,f.name,e.cont,e.out_money,e.entry_money,e.uuid,e.type,e.create_time,e.from_time from finance_error_transverse_log as e inner join fi_account as f on e.uuid=f.uuid order by e.id DESC limit {(int(params.get('page')) - 1) * params.get('size')},{params.get('size')}"
sum_sql = f"select count(e.id) as num from finance_error_transverse_log as e inner join fi_account as f on e.uuid=f.uuid "
result = LinkMysql(env.DB_3YV2).query_mysql(query_sql)
sum = LinkMysql(env.DB_3YV2).query_mysql(sum_sql)
if result:
for i in result:
i['out_money'] = float(i['out_money'])
i['entry_money'] = float(i['entry_money'])
i['create_time'] = str(i['create_time'])
i['from_time'] = str(i['from_time'])
return result, sum[0]['num']
return [], 0
def create_recovery_table(param):
"""增加修复报表"""
query_sql = f"select out_money,entry_money,befor_out_money,befor_entry_money from finance_error_transverse_log where uuid='{param.uuid}' and from_time < '{param.from_time}' and type={param.type} ORDER BY id DESC limit 0,1"
old_info = LinkMysql(env.DB_3YV2).query_mysql(query_sql)
if old_info:
befor_out_money = old_info[0]['befor_out_money'] + param.out_money
befor_entry_money = old_info[0]['befor_entry_money'] + param.entry_money
else:
befor_out_money = param.out_money
befor_entry_money = param.entry_money
# 限制一天没事,对同一个账户修复多条
query_today_sql = f"select id from finance_error_transverse_log where uuid='{param.uuid}' and from_time = '{param.from_time}' and type={param.type} ORDER BY id DESC limit 0,1"
today_info = LinkMysql(env.DB_3YV2).query_mysql(query_today_sql)
if today_info:
return '单个账户每天只能存在一条数据'
try:
present = Account_log(type=param.type, uuid=param.uuid, out_money=param.out_money,
cont=param.cont, entry_money=param.entry_money, from_time=param.from_time,
create_time=get_now_datetime())
db.add(present)
db.commit()
db.refresh(present)
add_sql = f"insert into finance_error_transverse_log(type, uuid, out_money, entry_money, cont, from_time, create_time, befor_out_money, befor_entry_money) " \
f"values({param.type}, '{param.uuid}', {param.out_money}, {param.entry_money}, '{param.cont}', '{param.from_time}',now(), {befor_out_money}, {befor_entry_money});"
LinkMysql(env.DB_3YV2).perform_mysql(add_sql)
except Exception as e:
return {}
return present
return e
return False
def update_recovery_table(db: Session, data):
def update_recovery_table(data):
"""修改修复报表"""
# 查询当前数据的历史数据,看看变动了哪些
old_sql = f"select out_money,entry_money,befor_out_money,befor_entry_money from finance_error_transverse_log where id={data.id} limit 0,1"
old_this_info = LinkMysql(env.DB_3YV2).query_mysql(old_sql)
if not old_this_info:
return '修改数据不存在,请检查'
# 获取到改变的值, 说明修改前跟修改后相差这多值
change_out_price = data.out_money - old_this_info[0]['out_money']
change_entry_money = data.entry_money - old_this_info[0]['entry_money']
try:
accout_filters = []
accout_filters.append(Account_log.id == data.get("id"))
db.query(Account_log).filter(Account_log.id == data.get("id")).update(data)
db.commit()
update_sql = f"update finance_error_transverse_log set uuid='{data.uuid}',type={data.type},out_money={data.out_money},entry_money={data.entry_money},befor_out_money=befor_out_money+{change_out_price},befor_entry_money=befor_entry_money+{change_entry_money},from_time='{data.from_time}',cont='{data.cont}' where id={data.id}"
LinkMysql(env.DB_3YV2).perform_mysql(update_sql)
except Exception as e:
print(e)
return {}
return data
return e
return ''
import json
from datetime import datetime
from typing import Optional
from app.api.statement import crud, schemas, guild
from app.api.account import schemas as acc_schemas
from app import get_db
......@@ -131,10 +133,14 @@ def guild_update_list(data: schemas.GuildUpdate, token=Depends(login_required),
return HttpResultResponse()
@router.post("/finance/fix")
def finance_fix(data: acc_schemas.PublicModel, token=Depends(login_required), db: Session = Depends(get_db)):
"""账户修复报表"""
res, num = crud.FinanceFix.get_finance_fix_data(db, data.dict(exclude_none=True))
@router.get("/finance/fix")
def finance_fix(page: int,
size: int,
start_time: Optional[str] = "",
end_time: Optional[str] = "",
token=Depends(login_required)):
"""账户修复报表列表"""
res, num = crud.FinanceFix.get_finance_fix_data(page, size, start_time, end_time)
return HttpResultResponse(total=num, data=res)
......@@ -144,25 +150,36 @@ def finance_fix(data: acc_schemas.FixTable, request: Request,
"""新增修复报表"""
header_list = request.get("headers")
res = guild.create_fix_table(db, data, header_list)
return HttpResultResponse(data=res.id)
if res:
return HttpResultResponse(code=500, msg=res)
return HttpResultResponse()
@router.post("/data/recovery")
def recovery_fix(data: acc_schemas.Recovery_fix, token=Depends(login_required), db: Session = Depends(get_db)):
"""异常数据修复"""
recovery_list, total = guild.recovery_fix_data(db, Account_log, data.dict(exclude_none=True))
@router.get("/data/recovery")
def recovery_fix(page: int,
size: int,
uuid: Optional[str] = '',
start_time: Optional[str] = "",
end_time: Optional[str] = "",
token=Depends(login_required)):
"""异常数据修复列表"""
recovery_list, total = guild.recovery_fix_data(page=page, size=size, uuid=uuid, start_time=start_time, end_time=end_time)
return HttpResultResponse(total=total, data=recovery_list)
@router.post("/submit/recovery")
def finance_fix(data: acc_schemas.RecoveryTable, token=Depends(login_required), db: Session = Depends(get_db)):
"""新增异常数据修复"""
res = guild.create_recovery_table(db, data)
return HttpResultResponse(data=res.id)
res = guild.create_recovery_table(data)
if res:
return HttpResultResponse(code=500, msg=res)
return HttpResultResponse()
@router.post("/recovery/fix")
def recovery_fix(data: acc_schemas.RecoveryupdateTable, token=Depends(login_required), db: Session = Depends(get_db)):
"""异常数据修复"""
res = guild.update_recovery_table(db, data.dict(exclude_none=True))
return HttpResultResponse(data=res.get("id"))
def recovery_fix(data: acc_schemas.RecoveryupdateTable, token=Depends(login_required)):
"""修改异常数据"""
res = guild.update_recovery_table(data)
if res:
return HttpResultResponse(code=500, msg=res)
return HttpResultResponse()
......@@ -14,18 +14,18 @@ def get_user_by_name(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username).first()
def get_users(db: Session, param):
def get_users(db: Session, **param):
"""
分组统计求和 from sqlalchemy import func as fc
r = db.query(fc.sum(r_model.Rolesfsas.aority), fc.sum(r_model.Rolesfsas.remark)).having(r_model.Rolesfsas.goods >
10).group_by(r_model.Rolesfsas.goods).all()
"""
username = param.username if param.username else ""
username = param.get("username") if param.get("username") else ""
if username:
res = db.query(models.User).filter(models.User.username.like(f'%{username}%')).order_by(models.User.id.desc()).offset((int(param.page) - 1) * param.size).limit(param.size)
res = db.query(models.User).filter(models.User.username.like(f'%{username}%')).order_by(models.User.id.desc()).offset((int(param.get("page")) - 1) * param.get("size")).limit(param.get("size"))
count = db.query(func.count(models.User.id)).filter(models.User.username.like(f'%{username}%')).scalar()
else:
res = db.query(models.User).order_by(models.User.id.desc()).offset((int(param.page) - 1) * param.size).limit(param.size)
res = db.query(models.User).order_by(models.User.id.desc()).offset((int(param.get("page")) - 1) * param.get("size")).limit(param.get("size"))
count = db.query(func.count(models.User.id)).scalar()
if not res:
return [], 0
......
......@@ -28,7 +28,7 @@ class User(UserBase):
class UserLoginForm(BaseModel):
username: str
password: str
verify: str
verify: Optional[str] = ''
class GoogleCode(BaseModel):
......
import pyotp
import socket
from typing import List
from typing import List, Optional
from fastapi import Depends, APIRouter, Request
from datetime import timedelta
from sqlalchemy.orm import Session
......@@ -25,6 +25,8 @@ def img_code():
@router.post("/login")
def login(form_data: UserLoginForm, db: Session = Depends(get_db)):
if not form_data.verify:
return HttpResultResponse(code=500, msg="请输入验证码")
user_info = authenticate_user(db=db, form_data=form_data)
# 函数 gethostname() 返回当前正在执行 Python 的系统主机名
res = socket.gethostbyname(socket.gethostname())
......@@ -90,10 +92,13 @@ def read_user(data: schemas.PermissionCreate, token=Depends(login_required), db:
return HttpResultResponse()
@router.post("/list")
def user_list(data: schemas.UserList, token=Depends(login_required), db: Session = Depends(get_db)):
@router.get("/list")
def user_list(page: int,
size: int,
username: Optional[str] = "",
token=Depends(login_required), db: Session = Depends(get_db)):
"""用户列表"""
result, total = crud.get_users(db, data)
result, total = crud.get_users(db, username=username, page=page, size=size)
return HttpResultResponse(total=total, data=result)
......
......@@ -37,7 +37,14 @@ class Env(BaseSettings):
DATABASE_URI: str = 'sqlite://:memory:'
DATABASE_USER: str = ''
DATABASE_PWD: str = ''
MysqlDB: dict = {
DB_HISTORY: dict = {
"database": "3y_history",
"host": "106.55.103.148",
"port": 3398,
"pwd": "c1ea602311a369f6",
"user": "root"
}
DB_3YV2: dict = {
"database": "3yakj_v2",
"host": "106.55.103.148",
"port": 3398,
......@@ -53,7 +60,7 @@ class Env(BaseSettings):
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256"
PASSWORD: str = "fj123456"
DB_HISTORY = "3y_history"
CLEARING_CENTER_URL: str = 'http://106.55.103.148:6464/'
class TestingEnv(Env):
......
import hashlib
import json
import time
import uuid as u
from datetime import datetime
from datetime import datetime, timedelta
import requests
def get_now_timestamp():
......@@ -17,11 +20,33 @@ def get_now_timestamp():
def get_now_datetime():
"""
获取现在时间
return int eg:1667664000
"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def get_before_timestamp(n):
"""获取n天前时间"""
data_str = datetime.now() - timedelta(days=int(n))
before_n_days = data_str.strftime("%Y-%m-%d")
return before_n_days
def get_yesterday_timestamp():
"""获取昨天时间戳"""
data_str = datetime.now() - timedelta(days=1)
before_n_days = data_str.strftime("%Y-%m-%d")
time_array = time.strptime(f"{before_n_days} 00:00:00", "%Y-%m-%d %H:%M:%S")
today_str = datetime.now().strftime('%Y-%m-%d')
today_array = time.strptime(f"{today_str} 00:00:00", "%Y-%m-%d %H:%M:%S")
return int(time.mktime(time_array)), int(time.mktime(today_array))
def time_str_to_timestamp(time_str):
"""时间字符串类型 转为int"""
time_array = time.strptime(time_str, "%Y-%m-%d %H:%M:%S")
return int(time.mktime(time_array))
def md5(s):
"""md5加密"""
sign_str = hashlib.md5()
......@@ -34,6 +59,12 @@ def uuid():
return str(u.uuid4())
def get_order():
"""生成订单号"""
timestamp = get_now_timestamp()
return datetime.now().strftime('%Y%m%d%H%M%S') + str(timestamp)
# 页数页码校验
def wrapper_out():
def wrapper(func):
......@@ -44,3 +75,24 @@ def wrapper_out():
return ret
return inner
return wrapper
def search(params, method):
"""
调用清算接口
:param params: 传入参数
:param method: 传入方法
"""
url = "http://106.55.103.148:6464"
header = {
"RPC-METHOD": method,
"RPC-ID": '161',
"RPC-PROTOCOL": "jsonrpc",
"RPC-VERSION": "2.0"
}
response = requests.post(url=url, headers=header, json=params)
if response.status_code != 200:
return {"status": 0}
text = json.loads(response.text)
print(text)
return {"status": 1, 'data': text.get('result')}
......@@ -133,7 +133,6 @@ class FinanceFixLog(Base, SerializerMixin):
__table_args__ = {'comment': '财务修复日志'}
class Account_log(Base, SerializerMixin):
"""账户修复记录表"""
__tablename__ = "account_log"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment