Commit 227fae2e authored by xianyang's avatar xianyang

Merge branch 'development'

# Conflicts:
#	core/config/env.py
parents ce181c18 950a59a1
import threading import threading
import time import time
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from datetime import datetime
import pandas as pd import pandas as pd
from sqlalchemy import and_, func
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.api.account import schemas
from core.config.env import env from core.config.env import env
from libs.business import TYPE_NAME, query_fi_account_type from libs.business import TYPE_NAME, GUILD_NAME
from libs.db_link import LinkMysql from libs.db_link import LinkMysql
from libs.functions import wrapper_out, get_now_timestamp, uuid, get_before_timestamp, time_str_to_timestamp, \ from libs.functions import get_now_timestamp, time_str_to_timestamp, \
get_yesterday_timestamp, get_last_month, get_date_list, send_json_rpc_request, ymd_str_to_timestamp get_yesterday_timestamp, get_last_month, get_date_list, send_json_rpc_request, ymd_str_to_timestamp, \
judge_time_period
from libs.log_utils import Logger from libs.log_utils import Logger
from libs.orm import QueryAllData
from models import account as models from models import account as models
from models.account import AccountFinance, AccountFinanceDetails, AccountType
locka = threading.Lock() locka = threading.Lock()
...@@ -48,7 +45,7 @@ class HDUd(): ...@@ -48,7 +45,7 @@ class HDUd():
if i['unique_tag']: if i['unique_tag']:
start, end = get_yesterday_timestamp() start, end = get_yesterday_timestamp()
if i['unique_tag'] == 'guild_account': if i['unique_tag'] == 'guild_account':
sql = f"select sum(initial_money) as number from v3_guild_account_statistics where create_time>={start} and create_time<{end}" sql = f"select initial_money as number from v3_guild_account_statistics_copy where create_time>={start} and create_time<{end} ORDER BY create_time DESC"
money_res = LinkMysql(env.DB_3YV2).query_mysql(sql) money_res = LinkMysql(env.DB_3YV2).query_mysql(sql)
i['consumable'] = money_res[0]['number'] if money_res and money_res[0]['number'] else 0 i['consumable'] = money_res[0]['number'] if money_res and money_res[0]['number'] else 0
elif i['unique_tag'] == 'anchor_account': elif i['unique_tag'] == 'anchor_account':
...@@ -84,22 +81,22 @@ class HDUd(): ...@@ -84,22 +81,22 @@ class HDUd():
def get_account_list(self, name, page, size): def get_account_list(self, name, page, size):
"""账户列表,查询""" """账户列表,查询"""
if name: if name:
count_sql = f"select count(id) as num from fi_account where name like '%{name}%' and unique_tag!='anchor_account'" count_sql = f"select count(id) as num from fi_account where name like '%{name}%' order by create_time"
number = LinkMysql(env.DB_3YV2).query_mysql(count_sql) number = LinkMysql(env.DB_3YV2).query_mysql(count_sql)
if number: if number:
count = number[0].get("num") count = number[0].get("num")
else: else:
count = 0 count = 0
data_sql = f"select id,name,unique_tag,uuid,config_key,beneficiary,description,create_time, income, output from fi_account where name like '%{name}%' and unique_tag!='anchor_account' ORDER BY id DESC LIMIT {(int(page) - 1) * size},{size}" data_sql = f"select id,name,unique_tag,uuid,config_key,beneficiary,description,create_time, income, output from fi_account where name like '%{name}%' ORDER BY create_time LIMIT {(int(page) - 1) * size},{size}"
query_res = LinkMysql(env.DB_3YV2).query_mysql(data_sql) query_res = LinkMysql(env.DB_3YV2).query_mysql(data_sql)
else: else:
count_sql = f"select count(id) as num from fi_account where unique_tag!='anchor_account'" count_sql = f"select count(id) as num from fi_account"
number = LinkMysql(env.DB_3YV2).query_mysql(count_sql) number = LinkMysql(env.DB_3YV2).query_mysql(count_sql)
if number: if number:
count = number[0].get("num") count = number[0].get("num")
else: else:
count = 0 count = 0
data_sql = f"select id,name,unique_tag,uuid,config_key,beneficiary,description,create_time, income, output from fi_account where unique_tag!='anchor_account' ORDER BY id DESC LIMIT {(int(page) - 1) * size},{size}" data_sql = f"select id,name,unique_tag,uuid,config_key,beneficiary,description,create_time, income, output from fi_account ORDER BY create_time LIMIT {(int(page) - 1) * size},{size}"
query_res = LinkMysql(env.DB_3YV2).query_mysql(data_sql) query_res = LinkMysql(env.DB_3YV2).query_mysql(data_sql)
if not query_res: if not query_res:
return [], 0 return [], 0
...@@ -114,7 +111,7 @@ class HDUd(): ...@@ -114,7 +111,7 @@ class HDUd():
# 等待全部结束,再结束 # 等待全部结束,再结束
for z in range(len(query_res)): for z in range(len(query_res)):
ths[z].join() ths[z].join()
self.result_list.sort(key=lambda q: q['id'], reverse=True) self.result_list.sort(key=lambda q: q['id'], reverse=False)
return self.result_list, count return self.result_list, count
...@@ -210,7 +207,7 @@ def get_finance_info(unique_tag, id, page, size, start_time, end_time, is_list=N ...@@ -210,7 +207,7 @@ def get_finance_info(unique_tag, id, page, size, start_time, end_time, is_list=N
finance_condition.append(f"create_time >= {time_str_to_timestamp(start_time + ' 00:00:00')} ") finance_condition.append(f"create_time >= {time_str_to_timestamp(start_time + ' 00:00:00')} ")
if end_time: if end_time:
finance_condition.append(f"create_time < {time_str_to_timestamp(end_time + ' 23:59:59')} ") finance_condition.append(f"create_time < {time_str_to_timestamp(end_time + ' 23:59:59')} ")
if unique_tag in ['guild_account', 'user_account', 'knapsack_account', 'pledgeDeduction']: if unique_tag in ['guild_account', 'user_account', 'knapsack_account', 'pledgeDeduction', 'anchor_account']:
if unique_tag == 'guild_account': if unique_tag == 'guild_account':
if finance_condition: if finance_condition:
count_sql = f"select create_time from v3_guild_account_statistics_copy where {' and '.join(finance_condition)} GROUP BY create_time" count_sql = f"select create_time from v3_guild_account_statistics_copy where {' and '.join(finance_condition)} GROUP BY create_time"
...@@ -218,6 +215,13 @@ def get_finance_info(unique_tag, id, page, size, start_time, end_time, is_list=N ...@@ -218,6 +215,13 @@ def get_finance_info(unique_tag, id, page, size, start_time, end_time, is_list=N
else: else:
count_sql = f"select create_time from v3_guild_account_statistics_copy GROUP BY create_time" count_sql = f"select create_time from v3_guild_account_statistics_copy GROUP BY create_time"
data_sql = f"select id,initial_money as balance,income,outcome,create_time from v3_guild_account_statistics_copy GROUP BY create_time order by create_time DESC limit {(int(page) - 1) * size},{size}" data_sql = f"select id,initial_money as balance,income,outcome,create_time from v3_guild_account_statistics_copy GROUP BY create_time order by create_time DESC limit {(int(page) - 1) * size},{size}"
if unique_tag == 'anchor_account':
if finance_condition:
count_sql = f"select date as create_time from v3_user_account_statistics where {' and '.join(finance_condition)} GROUP BY date"
data_sql = f"select id,initial_money as balance,income,outcome,date as create_time from v3_user_account_statistics where {' and '.join(finance_condition)} GROUP BY date order by date limit {(int(page) - 1) * size},{size}"
else:
count_sql = f"select date as create_time from v3_user_account_statistics GROUP BY date"
data_sql = f"select id,initial_money as balance,income,pay as outcome,date as create_time from v3_user_account_statistics GROUP BY date order by date limit {(int(page) - 1) * size},{size}"
if unique_tag == 'user_account': if unique_tag == 'user_account':
if finance_condition: if finance_condition:
condition = [i.replace('create_time', 'calculation_time') for i in finance_condition] condition = [i.replace('create_time', 'calculation_time') for i in finance_condition]
...@@ -247,6 +251,9 @@ def get_finance_info(unique_tag, id, page, size, start_time, end_time, is_list=N ...@@ -247,6 +251,9 @@ def get_finance_info(unique_tag, id, page, size, start_time, end_time, is_list=N
future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, data_sql) future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, data_sql)
count = future1.result() count = future1.result()
res = future2.result() res = future2.result()
else:
if finance_condition:
sys_sql = f"select initial_money as balance,income,outcome,create_time from v2_system_account_statistics_copy where fi_account_id={id} and {' and '.join(finance_condition)} ORDER BY create_time DESC"
else: else:
sys_sql = f"select initial_money as balance,income,outcome,create_time from v2_system_account_statistics_copy where fi_account_id={id} ORDER BY create_time DESC" sys_sql = f"select initial_money as balance,income,outcome,create_time from v2_system_account_statistics_copy where fi_account_id={id} ORDER BY create_time DESC"
result = LinkMysql(env.DB_3YV2).query_mysql(sys_sql) result = LinkMysql(env.DB_3YV2).query_mysql(sys_sql)
...@@ -359,7 +366,7 @@ class AccountStatistics(object): ...@@ -359,7 +366,7 @@ class AccountStatistics(object):
self.type = type self.type = type
self.gift_type = gift_type self.gift_type = gift_type
self.unique = unique self.unique = unique
self.guild_uuid = [] self.guild_dict = {}
self.user_list = [] self.user_list = []
def business_query(self, date, condition): def business_query(self, date, condition):
...@@ -376,6 +383,7 @@ class AccountStatistics(object): ...@@ -376,6 +383,7 @@ class AccountStatistics(object):
u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,reference_number,create_time from {date} WHERE {' and '.join(condition)} ORDER BY create_time DESC" u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,reference_number,create_time from {date} WHERE {' and '.join(condition)} ORDER BY create_time DESC"
else: else:
u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,reference_number,create_time from {date} ORDER BY create_time DESC" u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,reference_number,create_time from {date} ORDER BY create_time DESC"
Logger().logger.info(f"查询sql:{u_sql}")
result = LinkMysql(env.DB_HISTORY).query_mysql(u_sql) result = LinkMysql(env.DB_HISTORY).query_mysql(u_sql)
return result return result
...@@ -430,14 +438,21 @@ class AccountStatistics(object): ...@@ -430,14 +438,21 @@ class AccountStatistics(object):
total_list = [] total_list = []
yw_condition = self.public_query() yw_condition = self.public_query()
if get_now_timestamp() - time_str_to_timestamp(self.start_time + ' 00:00:00') > 90 * 86400:
month, last_month, before_last_month = judge_time_period(self.start_time)
else:
month, last_month, before_last_month = get_last_month() month, last_month, before_last_month = get_last_month()
if self.unique == 'guild_account': if self.unique == 'guild_account':
guild_sql = f"select id,guild_name from guild"
guild_info = LinkMysql(env.DB_3YV2).query_mysql(guild_sql)
for i in guild_info:
self.guild_dict[i['id']] = i['guild_name']
data, total = self.guild_calculation() data, total = self.guild_calculation()
return data, total, 0 return data, total, 0
if self.unique == "pledgeDeduction": if self.unique == "pledgeDeduction":
data, total, money = self. guild_calculation_pledge() data, total, money = self. guild_calculation_pledge()
return data, total, money return data, total, money
if self.unique == "user_account": if self.unique == "user_account" or self.unique == "knapsack_account":
if self.user_id: if self.user_id:
user_sql = f"select uuid from v2_user where user_id={self.user_id}" user_sql = f"select uuid from v2_user where user_id={self.user_id}"
self.user_list = LinkMysql(env.DB_3YV2).query_mysql(user_sql) self.user_list = LinkMysql(env.DB_3YV2).query_mysql(user_sql)
...@@ -469,17 +484,18 @@ class AccountStatistics(object): ...@@ -469,17 +484,18 @@ class AccountStatistics(object):
total = len(ultimately_data) total = len(ultimately_data)
res = ultimately_data[int(self.page - 1) * self.size: self.page * self.size] res = ultimately_data[int(self.page - 1) * self.size: self.page * self.size]
# 判断是列表还是导出接口 # 判断是列表还是导出接口
type_name = query_fi_account_type()
if is_list: if is_list:
if not res: if not res:
return [], 0, 0 return [], 0, 0
for i in res: for i in res:
i['reference_name'] = type_name[i['reference_type']] if type_name.get(i['reference_type']) else i[ i['reference_name'] = TYPE_NAME[i['reference_type']] if TYPE_NAME.get(i['reference_type']) else i[
'reference_type'] 'reference_type']
data_pd = pd.DataFrame(ultimately_data) data_pd = pd.DataFrame(ultimately_data)
amount_total = data_pd['amount'].sum() amount_total = data_pd['amount'].sum()
return res, total, float(amount_total) return res, total, float(amount_total)
else: else:
for i in res:
i['reference_type'] = TYPE_NAME.get(i['reference_type'], i['reference_type'])
return res return res
def guild_calculation(self): def guild_calculation(self):
...@@ -492,6 +508,8 @@ class AccountStatistics(object): ...@@ -492,6 +508,8 @@ class AccountStatistics(object):
guild_cond_list.append(f" guild_id={self.user_id}") guild_cond_list.append(f" guild_id={self.user_id}")
sql = f"select guild_id,initial_money,income,outcome,create_time from v3_guild_account_detail_copy where {' and '.join(guild_cond_list)} order by create_time DESC" sql = f"select guild_id,initial_money,income,outcome,create_time from v3_guild_account_detail_copy where {' and '.join(guild_cond_list)} order by create_time DESC"
mysql_data = LinkMysql(env.DB_3YV2).query_mysql(sql) mysql_data = LinkMysql(env.DB_3YV2).query_mysql(sql)
for info in mysql_data:
info['guild_name'] = self.guild_dict.get(info['guild_id'])
return mysql_data[int(self.page - 1) * self.size: self.page * self.size], len(mysql_data) return mysql_data[int(self.page - 1) * self.size: self.page * self.size], len(mysql_data)
def guild_calculation_pledge(self): def guild_calculation_pledge(self):
...@@ -531,18 +549,41 @@ def delete_specify_field(data, unique_tag): ...@@ -531,18 +549,41 @@ def delete_specify_field(data, unique_tag):
for i in data: for i in data:
if unique_tag in ["knapsack_account", "user_account", "guild_account", "pledgeDeduction"]: if unique_tag in ["knapsack_account", "user_account", "guild_account", "pledgeDeduction"]:
structure = { structure = {
"uuid": i["uuid"], "uuid": i.get('uuid', ''),
"income": i["income"], "income": i.get('income', ''),
"outcome": i["outcome"], "outcome": i.get('outcome', ''),
"create_time": i["create_time"] "create_time": i.get('create_time', '')
}
if unique_tag == 'guild_account':
structure = {
"uuid": i.get('guild_id', ''),
"income": i.get('income', ''),
"outcome": i.get('outcome', ''),
"initial_money": i.get('initial_money', ''),
"create_time": i.get('create_time', '')
} }
else: else:
structure = { structure = {
"reference_number": i["reference_number"], "reference_number": i.get('reference_number', ''),
"type": '出账' if i["type"] == 0 else '入账', "type": '出账' if i["type"] == 0 else '入账',
"reference_type": i["reference_type"], "reference_type": i.get('reference_type', ''),
"amount": round(float(i["amount"]), 3), "amount": round(float(i["amount"]), 3),
"create_time": i["create_time"] "create_time": i.get('create_time', '')
}
res_list.append(structure)
return res_list
def delete_guild_specify_field(data):
res_list = []
for i in data:
structure = {
"name": GUILD_NAME[i.get('guild_id')],
"guild_id":i.get('guild_id', ''),
"income": i.get('income', ''),
"outcome": i.get('outcome', ''),
"initial_money": i.get('initial_money', ''),
"create_time": i.get('create_time', '')
} }
res_list.append(structure) res_list.append(structure)
return res_list return res_list
...@@ -606,9 +647,8 @@ class SpecificAccountQuery(object): ...@@ -606,9 +647,8 @@ class SpecificAccountQuery(object):
res = self.total_list[int(self.page - 1) * self.size: self.page * self.size] res = self.total_list[int(self.page - 1) * self.size: self.page * self.size]
if not res: if not res:
return [], 0, 0 return [], 0, 0
type_name = query_fi_account_type()
for i in res: for i in res:
i['reference_name'] = type_name[i['reference_type']] if type_name.get(i['reference_type']) else i[ i['reference_name'] = TYPE_NAME[i['reference_type']] if TYPE_NAME.get(i['reference_type']) else i[
'reference_type'] 'reference_type']
data_pd = pd.DataFrame(self.total_list) data_pd = pd.DataFrame(self.total_list)
amount_total = data_pd['amount'].sum() amount_total = data_pd['amount'].sum()
...@@ -647,7 +687,7 @@ class HomePageDisplay(object): ...@@ -647,7 +687,7 @@ class HomePageDisplay(object):
else: else:
acc_uuid = [i['uuid'] for i in acc_data if i['unique_tag'] == self.unique_tag] acc_uuid = [i['uuid'] for i in acc_data if i['unique_tag'] == self.unique_tag]
if not acc_uuid: if not acc_uuid:
print('没找到系统账户') Logger(20).logger.info('没找到系统账户')
return [], 0, 0 return [], 0, 0
assets_cond.append(f" uuid='{acc_uuid[0]}'") assets_cond.append(f" uuid='{acc_uuid[0]}'")
assets_sql = f"select reference_type,type,sum(amount) as amount from {self.date} where {' and '.join(assets_cond)} GROUP BY reference_type,type" assets_sql = f"select reference_type,type,sum(amount) as amount from {self.date} where {' and '.join(assets_cond)} GROUP BY reference_type,type"
...@@ -672,3 +712,19 @@ class HomePageDisplay(object): ...@@ -672,3 +712,19 @@ class HomePageDisplay(object):
income.append(op) income.append(op)
res_list = outcome + income res_list = outcome + income
return res_list, self.outcome/1000, self.income/1000 return res_list, self.outcome/1000, self.income/1000
class AccountAnchor(object):
def __init__(self, anchor_id):
self.anchor_id = anchor_id
def anchor_balance(self):
if self.anchor_id:
sql = f"SELECT sum(pearl) as num FROM v2_user_account where user_id in(SELECT user_id from v2_user where is_achor=1) and user_id = {self.anchor_id} "
else:
sql = "SELECT sum(pearl) as num FROM v2_user_account where user_id in(SELECT user_id from v2_user where is_achor=1)"
res = LinkMysql(env.DB_3YV2).query_mysql(sql)
money = res[0]['num']
if not money:
return 0
return money/100
...@@ -7,6 +7,8 @@ from app.api.account import schemas, crud ...@@ -7,6 +7,8 @@ from app.api.account import schemas, crud
from app.api.account.crud import AccountStatistics, SpecificAccountQuery, HomePageDisplay from app.api.account.crud import AccountStatistics, SpecificAccountQuery, HomePageDisplay
from app.api.statement import crud as statement_crud from app.api.statement import crud as statement_crud
from libs import functions from libs import functions
from libs.business import GUILD_NAME
from libs.export import Export, TableToFile
from libs.functions import get_date_list from libs.functions import get_date_list
from libs.log_utils import Logger from libs.log_utils import Logger
from libs.result_format import HttpResultResponse, HttpMessage from libs.result_format import HttpResultResponse, HttpMessage
...@@ -71,7 +73,7 @@ def finance_info_excel(data: schemas.FinanceInfo, request: Request, ...@@ -71,7 +73,7 @@ def finance_info_excel(data: schemas.FinanceInfo, request: Request,
token=Depends(login_required), db: Session = Depends(get_db)): token=Depends(login_required), db: Session = Depends(get_db)):
"""账户财务详情导出""" """账户财务详情导出"""
headers = request.get("headers") headers = request.get("headers")
statement_list = crud.get_finance_info(data.unique_tag, data.id, data.page, 99999, data.start_time, data.end_time) statement_list = crud.get_finance_info(data.unique_tag, data.id, data.page, 99999999, data.start_time, data.end_time)
df = ['账户余额', '入账', '出账', '时间'] df = ['账户余额', '入账', '出账', '时间']
return statement_crud.data_to_file(db, statement_list, "财务信息", headers, df) return statement_crud.data_to_file(db, statement_list, "财务信息", headers, df)
...@@ -100,15 +102,20 @@ def finance_info_excel(data: schemas.FinanceDetails, request: Request, ...@@ -100,15 +102,20 @@ def finance_info_excel(data: schemas.FinanceDetails, request: Request,
token=Depends(login_required), db: Session = Depends(get_db)): token=Depends(login_required), db: Session = Depends(get_db)):
"""账户财务明细导出""" """账户财务明细导出"""
headers = request.get("headers") headers = request.get("headers")
statement_list = AccountStatistics(data.page, 99999, data.uuid, data.user_id, data.start_time, data.end_time, data.type, statement_list = AccountStatistics(data.page, 99999999, data.uuid, data.user_id, data.start_time, data.end_time, data.type,
data.gift_type, data.unique_tag).get_finance_details() data.gift_type, data.unique_tag).get_finance_details()
if data.unique_tag in ["knapsack_account", "user_account", "guild_account", "pledgeDeduction"]: if data.unique_tag in ["knapsack_account", "user_account", "guild_account", "pledgeDeduction"]:
field_head = ['uuid', '入账', '出账', '时间'] field_head = ['uuid', '入账', '出账', '时间']
statement_list = statement_list[0] statement_list = statement_list[0]
if data.unique_tag == 'guild_account':
field_head = ['公会名', '公会id', '入账', '出账', '余额', '时间']
data = crud.delete_guild_specify_field(statement_list)
return TableToFile(db, data, "财务明细", headers, field_head).main_method()
else: else:
field_head = ['订单号', '出入账方式', '礼物类型', '金额', '时间'] field_head = ['订单号', '出入账方式', '礼物类型', '金额', '时间']
data = crud.delete_specify_field(statement_list, data.unique_tag) data = crud.delete_specify_field(statement_list, data.unique_tag)
return statement_crud.data_to_file(db, data, "财务明细", headers, field_head) # return statement_crud.data_to_file(db, data, "财务明细", headers, field_head)
return TableToFile(db, data, "财务明细", headers, field_head).main_method()
@router.get("/finance/fourth/details") @router.get("/finance/fourth/details")
...@@ -226,3 +233,10 @@ def outon_account(token=Depends(login_required)): ...@@ -226,3 +233,10 @@ def outon_account(token=Depends(login_required)):
"""系统账户列表""" """系统账户列表"""
account_list = crud.query_account_data() account_list = crud.query_account_data()
return HttpResultResponse(data=account_list) return HttpResultResponse(data=account_list)
@router.get("/anchor")
def anchor_account(anchor_id: Optional[int] = None, ):
"""主播账户余额"""
anchor_money = crud.AccountAnchor(anchor_id).anchor_balance()
return HttpResultResponse(data=anchor_money)
...@@ -127,8 +127,7 @@ class CalculationMonthlyBill(object): ...@@ -127,8 +127,7 @@ class CalculationMonthlyBill(object):
def kv_search(self): def kv_search(self):
"""查询筛选的key, value""" """查询筛选的key, value"""
k_list = [] k_list = []
type_name = query_fi_account_type() for k, v in TYPE_NAME.items():
for k, v in type_name.items():
if v == self.name or self.name in v: if v == self.name or self.name in v:
k_list.append(k) k_list.append(k)
return k_list return k_list
...@@ -186,7 +185,7 @@ class CalculationMonthlyBill(object): ...@@ -186,7 +185,7 @@ class CalculationMonthlyBill(object):
v['error_money'] = float('%.3f' % (v['expenditure'] - v['income'])) v['error_money'] = float('%.3f' % (v['expenditure'] - v['income']))
res_all_data.append(v) res_all_data.append(v)
# 存入redis # 存入redis
red.set('business_type_sum-' + str(self.date), str(res_all_data), 3600) red.set('business_type_sum-' + str(self.date), str(res_all_data), 1800)
else: else:
if assert_list: if assert_list:
res_all_data = self.search_red_data(business_type_sum_data) res_all_data = self.search_red_data(business_type_sum_data)
...@@ -225,6 +224,14 @@ class CalculationMonthlyDetails(object): ...@@ -225,6 +224,14 @@ class CalculationMonthlyDetails(object):
num = future2.result() num = future2.result()
total = future3.result() total = future3.result()
if data: if data:
for i in data:
if i['amount_type'] == 'consumable':
i['amount_type'] = '钻石'
if i['amount_type'] == 'withdrawable':
i['amount_type'] = '珍珠'
if i['amount_type'] == 'backpack':
i['amount_type'] = '背包'
i['reference_type'] = TYPE_NAME.get(param.get('key'), param.get('key'))
return data, num, float(total[0]['amount']) return data, num, float(total[0]['amount'])
return [], 0, 0 return [], 0, 0
...@@ -264,8 +271,7 @@ class MonthDataDerive(object): ...@@ -264,8 +271,7 @@ class MonthDataDerive(object):
assert_list = [] assert_list = []
if name: if name:
k_list = [] k_list = []
type_name = query_fi_account_type() for k, v in TYPE_NAME.items():
for k, v in type_name.items():
if v == name or name in v: if v == name or name in v:
k_list.append(k) k_list.append(k)
if len(k_list) > 1: if len(k_list) > 1:
...@@ -285,9 +291,8 @@ class MonthDataDerive(object): ...@@ -285,9 +291,8 @@ class MonthDataDerive(object):
for res in res_data: for res in res_data:
if res["reference_type"] in self.derive_key: if res["reference_type"] in self.derive_key:
continue continue
type_name = query_fi_account_type() if res["reference_type"] in TYPE_NAME:
if res["reference_type"] in type_name: name = TYPE_NAME[res["reference_type"]]
name = type_name[res["reference_type"]]
else: else:
name = res["reference_type"] name = res["reference_type"]
out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0] out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0]
...@@ -318,7 +323,7 @@ class ReferenceTypeClassification(): ...@@ -318,7 +323,7 @@ class ReferenceTypeClassification():
def classification_summary(self): def classification_summary(self):
data_sql = f"select uuid,type,sum(amount) as amount,reference_type from {self.date} where reference_type='{self.reference_type}' GROUP BY uuid,type" data_sql = f"select uuid,type,sum(amount) as amount,reference_type,amount_type from {self.date} where reference_type='{self.reference_type}' GROUP BY uuid,type,amount_type"
guild_sql = f"select uuid from guild" guild_sql = f"select uuid from guild"
account_sql = f"select uuid,name from fi_account" account_sql = f"select uuid,name from fi_account"
anchor_sql = f"select uuid from v2_user where is_achor in(1,2)" anchor_sql = f"select uuid from v2_user where is_achor in(1,2)"
...@@ -381,6 +386,7 @@ class AbnormalDataDetails(object): ...@@ -381,6 +386,7 @@ class AbnormalDataDetails(object):
self.size = size self.size = size
def abnormal_task(self): def abnormal_task(self):
Logger().logger.info('开始查询异常数据')
out_sql = f"select order_number from {self.date} where reference_type='{self.reference_type}' and type=0" out_sql = f"select order_number from {self.date} where reference_type='{self.reference_type}' and type=0"
income_sql = f"select order_number from {self.date} where reference_type='{self.reference_type}' and type=1" income_sql = f"select order_number from {self.date} where reference_type='{self.reference_type}' and type=1"
with ThreadPoolExecutor(max_workers=2) as pool: with ThreadPoolExecutor(max_workers=2) as pool:
......
...@@ -12,6 +12,7 @@ from core.config.env import env, COS_PATH, COS_RERURN_PATH ...@@ -12,6 +12,7 @@ from core.config.env import env, COS_PATH, COS_RERURN_PATH
from libs.db_link import LinkMysql from libs.db_link import LinkMysql
from libs.functions import get_now_timestamp, get_now_datetime, get_order, get_ip, time_str_to_timestamp, \ from libs.functions import get_now_timestamp, get_now_datetime, get_order, get_ip, time_str_to_timestamp, \
time_int_timestamp, send_json_rpc_request, AES_Decrypt, AES_Encrypt time_int_timestamp, send_json_rpc_request, AES_Decrypt, AES_Encrypt
from libs.log_utils import Logger
from libs.orm import QueryAllData from libs.orm import QueryAllData
from libs.token_verify import get_current_user from libs.token_verify import get_current_user
from models.recharge import Settlement, Fitransferlog, FinanceFixLog, Account_log, Paymentlog from models.recharge import Settlement, Fitransferlog, FinanceFixLog, Account_log, Paymentlog
...@@ -270,6 +271,7 @@ def query_token(db, h_list): ...@@ -270,6 +271,7 @@ def query_token(db, h_list):
def transfer_trigger_task(uuid, user_id, balance, type, amount_type, remark='用户转账', money_data=[], def transfer_trigger_task(uuid, user_id, balance, type, amount_type, remark='用户转账', money_data=[],
isUser=1, dst_uuid="", reference_number=''): isUser=1, dst_uuid="", reference_number=''):
"""转账验证""" """转账验证"""
Logger(20).logger.info("转账验证")
request_data = { request_data = {
"ip": get_ip(), "ip": get_ip(),
"uuid": uuid, "uuid": uuid,
...@@ -281,13 +283,17 @@ def transfer_trigger_task(uuid, user_id, balance, type, amount_type, remark='用 ...@@ -281,13 +283,17 @@ def transfer_trigger_task(uuid, user_id, balance, type, amount_type, remark='用
"amount_type": amount_type if amount_type else 'consumable', "amount_type": amount_type if amount_type else 'consumable',
"notify_url": "" "notify_url": ""
} }
res = send_json_rpc_request(request_data, 'Server.UserExecute.Transfer')
try: try:
if not res['data']['result']['status']: res = send_json_rpc_request(request_data, 'Server.UserExecute.Transfer')
if not res:
return '清算系统调用失败'
Logger(20).logger.info("数据验证,是否成功")
if not res['status']:
if 'Insufficient assets' in res['data']['result']['msg']: if 'Insufficient assets' in res['data']['result']['msg']:
return 200,"资产不足,无法转账" return 200,"资产不足,无法转账"
return res['data']['result']['msg'] return res['data']['result']['msg']
except Exception as e: except Exception as e:
Logger(40).logger.error(f"错误数据格式:{res}")
return f"清算系统异常:{str(e)}" return f"清算系统异常:{str(e)}"
status = 2 status = 2
if res['status'] == 0: if res['status'] == 0:
......
...@@ -20,7 +20,7 @@ class StatementCreate(StatementBase): ...@@ -20,7 +20,7 @@ class StatementCreate(StatementBase):
class StatementList(BaseModel): class StatementList(BaseModel):
page: int = 1 page: int = 1
size: int = 9999999 size: int = 99999999
start_time: Optional[str] = "" start_time: Optional[str] = ""
end_time: Optional[str] = "" end_time: Optional[str] = ""
order_number: Optional[str] = "" order_number: Optional[str] = ""
......
...@@ -57,7 +57,7 @@ class TestingEnv(Env): ...@@ -57,7 +57,7 @@ class TestingEnv(Env):
NACOS_URL = YAML_DATA.get('config_url') NACOS_URL = YAML_DATA.get('config_url')
NACOSCONFIG = "show=all&dataId=fj-finance-test&group=DEFAULT_GROUP&tenant=cw-test&namespaceId=cw-test" NACOSCONFIG = "show=all&dataId=fj-finance-test&group=DEFAULT_GROUP&tenant=cw-test&namespaceId=cw-test"
NACOS_NAME = YAML_DATA.get('name') NACOS_NAME = YAML_DATA.get('name')
NACOS_PWD = YAML_DATA.get('pwd') NACOS_PWD = YAML_DATA.get('password')
DB_HISTORY = apo.get('history') DB_HISTORY = apo.get('history')
DB_3YV2 = apo.get('business') DB_3YV2 = apo.get('business')
Redis = apo.get('redis') Redis = apo.get('redis')
...@@ -90,11 +90,11 @@ class ProdEnv(Env): ...@@ -90,11 +90,11 @@ class ProdEnv(Env):
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256" ALGORITHM: str = "HS256"
PASSWORD: str = "fj123456" PASSWORD: str = "fj123456"
CLEARING_CENTER_URL: str = 'http://47.103.144.36:5454/' CLEARING_CENTER_URL: str = 'http://219.152.95.226:5454/'
CLEARING_CENTER_HOST: str = '47.103.144.36' CLEARING_CENTER_HOST: str = '219.152.95.226'
CLEARING_CENTER_PORT: int = 5454 CLEARING_CENTER_PORT: int = 5454
KEY = "dK8tZ1jM0wA6oE3j" KEY = "dK8tZ1jM0wA6oE3j"
PHP_URL = "http://47.103.97.109:6750" PHP_URL = "http://219.152.95.226:6750"
# env = TestingEnv() # 开发环境 # env = TestingEnv() # 开发环境
......
...@@ -9,11 +9,24 @@ def query_fi_account_type(): ...@@ -9,11 +9,24 @@ def query_fi_account_type():
res_data = LinkMysql(env.DB_3YV2).query_mysql(sql) res_data = LinkMysql(env.DB_3YV2).query_mysql(sql)
for i in res_data: for i in res_data:
if not fi_type.get(i['keyValue']): if not fi_type.get(i['keyValue']):
fi_type[i['keyValue']] = i['keyName'] try:
name, type = i['keyName'].split('-')
except Exception as e:
name = i['keyName']
fi_type[i['keyValue']] = name
return fi_type return fi_type
def query_fi_guild_name():
guild_name = {}
sql = f"SELECT id,guild_name,uuid FROM guild"
res_data = LinkMysql(env.DB_3YV2).query_mysql(sql)
for i in res_data:
guild_name[i['id']] = i['guild_name']
return guild_name
TYPE_NAME = query_fi_account_type() TYPE_NAME = query_fi_account_type()
GUILD_NAME = query_fi_guild_name()
# TYPE_NAME_T = { # TYPE_NAME_T = {
......
import math
import time
import openpyxl
import threading
import pandas as pd
from app.api.statement.guild import query_token
from starlette.responses import StreamingResponse
from app.api.export import crud
from libs.log_utils import Logger
class Export(object):
def __init__(self, db, data, name, header, field_list):
self.db = db
self.data = data
self.name = name
self.header = header
self.field_list = field_list
self.lock = threading.Lock()
self.wb = openpyxl.Workbook() # 创建一个新的 Excel 文件
self.sheet = self.wb.active
def write_data(self, sheet, row, col, data):
sheet.cell(row=row, column=col, value=data)
# 定义写入任务
def write_task(self, start_row, end_row, data):
for row in range(start_row, end_row):
with self.lock:
index = 0
col = 1
if index < len(data):
for k, v in data[index].items():
self.write_data(self.sheet, row, col, v)
col += 1
index += 1
def data_to_file(self):
# 获取操作人
user = query_token(self.db, self.header)
params = {"source": self.name, "method": "data_to_file", "status": 1}
if len(self.data) == 0:
params["status"] = 3
try:
bk = pd.DataFrame(self.data)
if self.data[0].get('create_time'):
if isinstance(self.data[0]['create_time'], int):
bk['create_time'] = bk['create_time'].apply(
lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x)))
bk.columns = self.field_list # 修改pandas头
write_data = bk.to_dict(orient='records')
# 创建线程列表
threads = []
rows_per_thread = math.ceil(len(write_data) / 10)
# 写入头部
# self.wb
for i in self.field_list:
self.write_data(self.sheet, 1, self.field_list.index(i)+1, i)
# 启动线程
for i in range(10):
start_row = i*rows_per_thread + 2 + i
end_row = (i+1)*rows_per_thread + 2 + i
thread = threading.Thread(target=self.write_task, args=(start_row, end_row, write_data[i*rows_per_thread:(i+1)*rows_per_thread]))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
crud.create_export_data(self.db, params, user)
# 保存 Excel 文件
self.wb.save(f'static/{self.name}.xlsx')
self.wb.close()
file = open(f'static/{self.name}.xlsx', 'rb')
return StreamingResponse(file,
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
except Exception as e:
Logger(40).logger.error("导出失败:%s" % str(e))
params["status"] = 2
crud.create_export_data(self.db, params, user)
class TableToFile(object):
def __init__(self, db, data, name, header, field_list):
self.db = db
self.data = data
self.name = name
self.header = header
self.field_list = field_list
self.lock = threading.Lock()
self.wb = openpyxl.Workbook() # 创建一个新的 Excel 文件
self.lock = threading.Lock()
def thread_task(self,bk, writer, sheet_name):
"""线程执行方法"""
self.lock.acquire()
bk.to_excel(writer, sheet_name=sheet_name, index=False)
self.lock.release()
def main_method(self):
"""主函数"""
Logger().logger.info('开始导出')
user = query_token(self.db, self.header)
params = {"source": self.name, "method": "data_to_file", "status": 1}
if len(self.data) == 0:
params["status"] = 3
crud.create_export_data(self.db, params, user)
Logger().logger.info(f'导出没有数据')
return None
try:
bk = pd.DataFrame(self.data)
if self.data[0].get('create_time'):
if isinstance(self.data[0]['create_time'], int):
bk['create_time'] = bk['create_time'].apply(
lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x)))
bk.columns = self.field_list # 修改pandas头
write_data = bk.to_dict(orient='records')
with pd.ExcelWriter(f'static/{self.name}.xlsx') as writer:
# bk.to_excel(writer, sheet_name='Sheet1', index=False)
threads = []
rows_per_thread = math.ceil(len(write_data) / 5)
for i in range(5):
sheet_name = 'sheet' + str(i + 1)
threads.append(threading.Thread(target=self.thread_task,
args=[bk.iloc[i * rows_per_thread: rows_per_thread * (i+1)], writer, sheet_name]))
# 启动线程
for y in threads:
y.start()
# 等待所有线程完成
for z in threads:
z.join()
file = open(writer, 'rb')
# 记录导出
crud.create_export_data(self.db, params, user)
return StreamingResponse(file,
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
except Exception as e:
Logger().logger.info(f'导出异常:{str(e)}')
params["status"] = 2
crud.create_export_data(self.db, params, user)
...@@ -79,6 +79,13 @@ def get_last_month(): ...@@ -79,6 +79,13 @@ def get_last_month():
return datetime.now().strftime("%Y%m"), last_month.strftime("%Y%m"), before_last_month.strftime("%Y%m") return datetime.now().strftime("%Y%m"), last_month.strftime("%Y%m"), before_last_month.strftime("%Y%m")
def judge_time_period(start_day):
"""判断时间段有多少天"""
last_month = datetime.strptime(start_day, '%Y-%m-%d') + relativedelta(months=1)
before_last_month = datetime.strptime(start_day, '%Y-%m-%d') + relativedelta(months=2)
return datetime.strptime(start_day, '%Y-%m-%d').strftime("%Y%m"), last_month.strftime("%Y%m"), before_last_month.strftime("%Y%m")
def md5(s): def md5(s):
"""md5加密""" """md5加密"""
sign_str = hashlib.md5() sign_str = hashlib.md5()
...@@ -153,11 +160,10 @@ def send_json_rpc_request(params, method): ...@@ -153,11 +160,10 @@ def send_json_rpc_request(params, method):
s.sendall(request_str.encode()) s.sendall(request_str.encode())
# 接收服务器响应 # 接收服务器响应
response_str = s.recv(1024).decode() response_str = s.recv(10240).decode()
# 将响应字符串解码为 JSON 对象 # 将响应字符串解码为 JSON 对象
response = json.loads(response_str) response = json.loads(response_str)
if "error" in response: if "error" in response:
return {} return {}
return response["result"] return response["result"]
...@@ -259,7 +265,9 @@ def AES_Decrypt(data): ...@@ -259,7 +265,9 @@ def AES_Decrypt(data):
except Exception as e: except Exception as e:
Logger(40).logger.error(f"php数据解密异常:{str(e)},数据:{plaintext}") Logger(40).logger.error(f"php数据解密异常:{str(e)},数据:{plaintext}")
coding_data = str(plaintext, encoding="utf-8") coding_data = str(plaintext, encoding="utf-8")
return list(eval(coding_data)) num = coding_data.index(']')
return list(eval(coding_data[:num + 1]))
return res_data return res_data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment