Commit b9412a11 authored by xupeng's avatar xupeng

Merge remote-tracking branch 'origin/development' into development

# Conflicts:
#	libs/functions.py
parents 000df4fc f038c72d
...@@ -7,7 +7,7 @@ from sqlalchemy import and_, func ...@@ -7,7 +7,7 @@ from sqlalchemy import and_, func
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.api.account import schemas from app.api.account import schemas
from core.config.env import env from core.config.env import env
from libs.business import TYPE_NAME from libs.business import TYPE_NAME, query_fi_account_type
from libs.db_link import LinkMysql from libs.db_link import LinkMysql
from libs.functions import wrapper_out, get_now_timestamp, uuid, get_before_timestamp, time_str_to_timestamp, \ from libs.functions import wrapper_out, get_now_timestamp, uuid, get_before_timestamp, time_str_to_timestamp, \
get_yesterday_timestamp, search, get_last_month get_yesterday_timestamp, search, get_last_month
...@@ -187,7 +187,7 @@ def get_finance_group_by(date, condition): ...@@ -187,7 +187,7 @@ def get_finance_group_by(date, condition):
return result return result
def get_finance_info(unique_tag, page, size, start_time, end_time, is_list=None): def get_finance_info(unique_tag, id, page, size, start_time, end_time, is_list=None):
"""账户财务信息""" """账户财务信息"""
finance_condition = [] finance_condition = []
month, last_month, before_last_month = get_last_month() month, last_month, before_last_month = get_last_month()
...@@ -233,21 +233,8 @@ def get_finance_info(unique_tag, page, size, start_time, end_time, is_list=None) ...@@ -233,21 +233,8 @@ def get_finance_info(unique_tag, page, size, start_time, end_time, is_list=None)
count = future1.result() count = future1.result()
res = future2.result() res = future2.result()
else: else:
total_list = [] sys_sql = f"select initial_money as balance,income,outcome,create_time from v2_system_account_statistics_copy where fi_account_id={id} ORDER BY create_time DESC"
fi_sql = f"select uuid from fi_account where unique_tag='{unique_tag}'" result = LinkMysql(env.DB_3YV2).query_mysql(sys_sql)
fi_res = LinkMysql(env.DB_3YV2).query_mysql(fi_sql)
finance_condition.append(f" uuid='{fi_res[0]['uuid']}'")
with ThreadPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(get_finance_group_by, 'assets_log_' + month, finance_condition)
future2 = pool.submit(get_finance_group_by, 'assets_log_' + last_month, finance_condition)
future3 = pool.submit(get_finance_group_by, 'assets_log_' + before_last_month, finance_condition)
month_data = future1.result()
last_data = future2.result()
before_last_data = future3.result()
total_list = total_list + month_data if month_data else total_list
total_list = total_list + last_data if last_data else total_list
ultimately_data = total_list + before_last_data if before_last_data else total_list
result = out_income_combine(ultimately_data)
res = result[int(page - 1) * size: page * size] res = result[int(page - 1) * size: page * size]
count = result count = result
# 判断是列表还是导出接口 # 判断是列表还是导出接口
...@@ -359,14 +346,14 @@ class AccountStatistics(object): ...@@ -359,14 +346,14 @@ class AccountStatistics(object):
if self.user_list: if self.user_list:
user_uuid = self.user_list[0] user_uuid = self.user_list[0]
condition.append(f" uuid='{user_uuid.get('uuid')}'") condition.append(f" uuid='{user_uuid.get('uuid')}'")
u_sql = f"select uuid,type,reference_type,SUM(cast(amount as decimal(20,6)))/1000 as amount,create_time,amount_type from {date} WHERE {' and '.join(condition)} GROUP BY uuid,type,reference_type,amount_type ORDER BY create_time DESC" u_sql = f"select uuid,type,reference_type,SUM(cast(amount as decimal(20,6)))/1000 as amount,reference_number,create_time,amount_type from {date} WHERE {' and '.join(condition)} GROUP BY uuid,type,reference_type,amount_type ORDER BY create_time DESC"
else: else:
if self.uuid: if self.uuid:
condition.append(f" uuid='{self.uuid}'") condition.append(f" uuid='{self.uuid}'")
if condition: if condition:
u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,create_time from {date} WHERE {' and '.join(condition)} ORDER BY create_time DESC" u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,reference_number,create_time from {date} WHERE {' and '.join(condition)} ORDER BY create_time DESC"
else: else:
u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,create_time from {date} ORDER BY create_time DESC" u_sql = f"select id,uuid,order_number,type,reference_type,amount/1000 as amount,reference_number,create_time from {date} ORDER BY create_time DESC"
result = LinkMysql(env.DB_HISTORY).query_mysql(u_sql) result = LinkMysql(env.DB_HISTORY).query_mysql(u_sql)
return result return result
...@@ -452,11 +439,12 @@ class AccountStatistics(object): ...@@ -452,11 +439,12 @@ class AccountStatistics(object):
total = len(ultimately_data) total = len(ultimately_data)
res = ultimately_data[int(self.page - 1) * self.size: self.page * self.size] res = ultimately_data[int(self.page - 1) * self.size: self.page * self.size]
# 判断是列表还是导出接口 # 判断是列表还是导出接口
type_name = query_fi_account_type()
if is_list: if is_list:
if not res: if not res:
return [], 0, 0 return [], 0, 0
for i in res: for i in res:
i['reference_name'] = TYPE_NAME[i['reference_type']] if TYPE_NAME.get(i['reference_type']) else i[ i['reference_name'] = type_name[i['reference_type']] if type_name.get(i['reference_type']) else i[
'reference_type'] 'reference_type']
data_pd = pd.DataFrame(ultimately_data) data_pd = pd.DataFrame(ultimately_data)
amount_total = data_pd['amount'].sum() amount_total = data_pd['amount'].sum()
...@@ -481,7 +469,7 @@ class SpecificAccountQuery(object): ...@@ -481,7 +469,7 @@ class SpecificAccountQuery(object):
def condition_query(self, date, cond_list): def condition_query(self, date, cond_list):
sql = f"select uuid,type,cast(amount as decimal(20,6))/1000 as amount,reference_type,create_time,amount_type from {date} WHERE {' and '.join(cond_list)} ORDER BY create_time DESC" sql = f"select uuid,type,cast(amount as decimal(20,6))/1000 as amount,reference_type,reference_number,order_number,create_time,amount_type from {date} WHERE {' and '.join(cond_list)} ORDER BY create_time DESC"
result = LinkMysql(env.DB_HISTORY).query_mysql(sql) result = LinkMysql(env.DB_HISTORY).query_mysql(sql)
return result return result
...@@ -517,8 +505,9 @@ class SpecificAccountQuery(object): ...@@ -517,8 +505,9 @@ class SpecificAccountQuery(object):
res = self.total_list[int(self.page - 1) * self.size: self.page * self.size] res = self.total_list[int(self.page - 1) * self.size: self.page * self.size]
if not res: if not res:
return [], 0, 0 return [], 0, 0
type_name = query_fi_account_type()
for i in res: for i in res:
i['reference_name'] = TYPE_NAME[i['reference_type']] if TYPE_NAME.get(i['reference_type']) else i[ i['reference_name'] = type_name[i['reference_type']] if type_name.get(i['reference_type']) else i[
'reference_type'] 'reference_type']
data_pd = pd.DataFrame(self.total_list) data_pd = pd.DataFrame(self.total_list)
amount_total = data_pd['amount'].sum() amount_total = data_pd['amount'].sum()
......
...@@ -53,12 +53,13 @@ def read_account(data: schemas.AccountUpdate, token=Depends(login_required)): ...@@ -53,12 +53,13 @@ def read_account(data: schemas.AccountUpdate, token=Depends(login_required)):
def finance_information(unique_tag: str, def finance_information(unique_tag: str,
page: int, page: int,
size: int, size: int,
id: Optional[int] = None,
start_time: Optional[str] = "", start_time: Optional[str] = "",
end_time: Optional[str] = "", end_time: Optional[str] = "",
token=Depends(login_required)): token=Depends(login_required)):
# unique_tag:Optional[str] = Query(None, min_length=3, max_length=50, regex="^xiao\d+$") 参数验证 # unique_tag:Optional[str] = Query(None, min_length=3, max_length=50, regex="^xiao\d+$") 参数验证
"""账户财务详情列表""" """账户财务详情列表"""
res, total = crud.get_finance_info(unique_tag, page, size, start_time, end_time, 1) res, total = crud.get_finance_info(unique_tag, id, page, size, start_time, end_time, 1)
return HttpResultResponse(total=total, data=res) return HttpResultResponse(total=total, data=res)
...@@ -114,8 +115,7 @@ def finance_fourth_info(page: int, ...@@ -114,8 +115,7 @@ def finance_fourth_info(page: int,
"""账户财务明细 第四层""" """账户财务明细 第四层"""
if not all([uuid, time, unique_tag]): if not all([uuid, time, unique_tag]):
return HttpResultResponse(code=500, msg="缺少必传参数") return HttpResultResponse(code=500, msg="缺少必传参数")
res, total, count = SpecificAccountQuery(page, size, uuid, time, type, res, total, count = SpecificAccountQuery(page, size, uuid, time, type, reference_type, unique_tag).business_logic()
reference_type, unique_tag).business_logic()
return HttpResultResponse(total=total, data=res, count=count) return HttpResultResponse(total=total, data=res, count=count)
...@@ -153,7 +153,7 @@ def create_user(data: schemas.CreateType, token=Depends(login_required), db: Ses ...@@ -153,7 +153,7 @@ def create_user(data: schemas.CreateType, token=Depends(login_required), db: Ses
res = crud.create_type(data) res = crud.create_type(data)
if not res: if not res:
return HttpResultResponse(code=500, msg=res) return HttpResultResponse(code=500, msg=res)
return HttpResultResponse(data=res.id) return HttpResultResponse()
@router.get("/userInfo") @router.get("/userInfo")
......
...@@ -2,7 +2,7 @@ from concurrent.futures.thread import ThreadPoolExecutor ...@@ -2,7 +2,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from core.config.env import env from core.config.env import env
from libs.business import TYPE_NAME from libs.business import TYPE_NAME, query_fi_account_type
from libs.db_link import LinkMysql from libs.db_link import LinkMysql
from libs.functions import get_now_datetime from libs.functions import get_now_datetime
from libs.orm import QueryAllData from libs.orm import QueryAllData
...@@ -79,7 +79,8 @@ class CalculationMonthlyBill(object): ...@@ -79,7 +79,8 @@ class CalculationMonthlyBill(object):
assert_list = [] assert_list = []
if name: if name:
k_list = [] k_list = []
for k, v in TYPE_NAME.items(): type_name = query_fi_account_type()
for k, v in type_name.items():
if v == name or name in v: if v == name or name in v:
k_list.append(k) k_list.append(k)
if len(k_list) > 1: if len(k_list) > 1:
...@@ -92,31 +93,32 @@ class CalculationMonthlyBill(object): ...@@ -92,31 +93,32 @@ class CalculationMonthlyBill(object):
if key_type: if key_type:
assert_list.append(f" reference_type like '%{key_type}%'") assert_list.append(f" reference_type like '%{key_type}%'")
if assert_list: if assert_list:
sql = f"SELECT reference_type, type, SUM(cast(amount as decimal(20,6)))/1000 as money FROM {date} where {' and '.join(assert_list)} GROUP BY reference_type, type ORDER BY reference_type" sql = f"SELECT reference_type, type, SUM(amount)/1000 as money FROM {date} where {' and '.join(assert_list)} GROUP BY reference_type, type ORDER BY reference_type"
else: else:
sql = f"SELECT reference_type, type, SUM(cast(amount as decimal(20,6)))/1000 as money FROM {date} GROUP BY reference_type, type ORDER BY reference_type" sql = f"SELECT reference_type, type, SUM(amount)/1000 as money FROM {date} GROUP BY reference_type, type ORDER BY reference_type"
try: try:
res_data = LinkMysql(env.DB_HISTORY).query_mysql(sql) res_data = LinkMysql(env.DB_HISTORY).query_mysql(sql)
except Exception as e: except Exception as e:
return [], 0 return [], 0
type_name = query_fi_account_type()
for res in res_data: for res in res_data:
if res["reference_type"] in self.structure_key: if res["reference_type"] in self.structure_key:
continue continue
if res["reference_type"] in TYPE_NAME: if res["reference_type"] in type_name:
name = TYPE_NAME[res["reference_type"]] name = type_name[res["reference_type"]]
else: else:
name = res["reference_type"] name = res["reference_type"]
out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0] out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0]
income = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 1] income = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 1]
out_value = float(out[0]) if out else 0 out_value = out[0] if out else 0
income_value = float(income[0]) if income else 0 income_value = income[0] if income else 0
a = { a = {
"name": name, "name": name,
"type": res["reference_type"], "type": res["reference_type"],
"expenditure": out_value, "expenditure": out_value,
"income": income_value, "income": income_value,
"is_error": 0 if out_value == income_value else 1, "is_error": 0 if out_value == income_value else 1,
"error_money": float('%.2f' % (out_value - income_value)) "error_money": float('%.3f' % (out_value - income_value))
} }
self.structure_key.append(res["reference_type"]) self.structure_key.append(res["reference_type"])
self.structure_list.append(a) self.structure_list.append(a)
...@@ -138,14 +140,24 @@ class CalculationMonthlyDetails(object): ...@@ -138,14 +140,24 @@ class CalculationMonthlyDetails(object):
result = LinkMysql(db).query_mysql(sql) result = LinkMysql(db).query_mysql(sql)
return len(result) return len(result)
@staticmethod
def calculation_total(db, date, reference_type, is_out):
sql = f"SELECT sum(amount)/1000 as amount FROM {date} where reference_type='{reference_type}' and type={is_out}"
result = LinkMysql(db).query_mysql(sql)
return result
def statement_income_expenditure(self, **param): def statement_income_expenditure(self, **param):
with ThreadPoolExecutor(max_workers=2) as pool: with ThreadPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(self.data_query, env.DB_HISTORY, 'assets_log_' + param.get("date"), param.get("key"), param.get("is_income"), param.get("page"), param.get("size")) future1 = pool.submit(self.data_query, env.DB_HISTORY, 'assets_log_' + param.get("date"), param.get("key"), param.get("is_income"), param.get("page"), param.get("size"))
future2 = pool.submit(self.acquired_total, env.DB_HISTORY, 'assets_log_' + param.get("date"), param.get("key"), param.get("is_income")) future2 = pool.submit(self.acquired_total, env.DB_HISTORY, 'assets_log_' + param.get("date"), param.get("key"), param.get("is_income"))
future3 = pool.submit(self.calculation_total, env.DB_HISTORY, 'assets_log_' + param.get("date"), param.get("key"), param.get("is_income"))
data = future1.result() data = future1.result()
num = future2.result() num = future2.result()
return data, num total = future3.result()
if data:
return data, num, float(total[0]['amount'])
return [], 0, 0
@staticmethod @staticmethod
def query_error_data(date, reference_type): def query_error_data(date, reference_type):
...@@ -183,7 +195,8 @@ class MonthDataDerive(object): ...@@ -183,7 +195,8 @@ class MonthDataDerive(object):
assert_list = [] assert_list = []
if name: if name:
k_list = [] k_list = []
for k, v in TYPE_NAME.items(): type_name = query_fi_account_type()
for k, v in type_name.items():
if v == name or name in v: if v == name or name in v:
k_list.append(k) k_list.append(k)
if len(k_list) > 1: if len(k_list) > 1:
...@@ -203,8 +216,9 @@ class MonthDataDerive(object): ...@@ -203,8 +216,9 @@ class MonthDataDerive(object):
for res in res_data: for res in res_data:
if res["reference_type"] in self.derive_key: if res["reference_type"] in self.derive_key:
continue continue
if res["reference_type"] in TYPE_NAME: type_name = query_fi_account_type()
name = TYPE_NAME[res["reference_type"]] if res["reference_type"] in type_name:
name = type_name[res["reference_type"]]
else: else:
name = res["reference_type"] name = res["reference_type"]
out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0] out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0]
...@@ -235,7 +249,7 @@ class ReferenceTypeClassification(): ...@@ -235,7 +249,7 @@ class ReferenceTypeClassification():
def classification_summary(self): def classification_summary(self):
data_sql = f"select uuid,type,sum(amount)/1000 as amount,create_time from {self.date} where reference_type='{self.reference_type}' GROUP BY uuid,type" data_sql = f"select uuid,type,sum(amount) as amount,create_time from {self.date} where reference_type='{self.reference_type}' GROUP BY uuid,type"
guild_sql = f"select uuid from guild" guild_sql = f"select uuid from guild"
account_sql = f"select uuid,name from fi_account" account_sql = f"select uuid,name from fi_account"
anchor_sql = f"select uuid from v2_user where is_achor in(1,2)" anchor_sql = f"select uuid from v2_user where is_achor in(1,2)"
...@@ -277,10 +291,37 @@ class ReferenceTypeClassification(): ...@@ -277,10 +291,37 @@ class ReferenceTypeClassification():
outcome = pd.DataFrame(self.outcome) outcome = pd.DataFrame(self.outcome)
ogs = outcome.groupby("nickname").sum() ogs = outcome.groupby("nickname").sum()
for k, v in ogs.to_dict().get('money').items(): for k, v in ogs.to_dict().get('money').items():
res_list.append({"type": "出账", "name": k, "money": round(float(v), 3)}) res_list.append({"type": "出账", "name": k, "money": round(float(v/1000), 3)})
if self.income: if self.income:
income = pd.DataFrame(self.income) income = pd.DataFrame(self.income)
igs = income.groupby("nickname").sum() igs = income.groupby("nickname").sum()
for k, v in igs.to_dict().get('money').items(): for k, v in igs.to_dict().get('money').items():
res_list.append({"type": "入账", "name": k, "money": round(float(v), 3)}) res_list.append({"type": "入账", "name": k, "money": round(float(v/1000), 3)})
return res_list return res_list
class AbnormalDataDetails(object):
"""业务类型汇总异常数据详情"""
def __init__(self, date, reference_type):
self.date = 'assets_log_' + date
self.reference_type = reference_type
def abnormal_task(self):
out_sql = f"select order_number from {self.date} where reference_type='{self.reference_type}' and type=0"
income_sql = f"select order_number from {self.date} where reference_type='{self.reference_type}' and type=1"
with ThreadPoolExecutor(max_workers=2) as pool:
out_res = pool.submit(LinkMysql(env.DB_HISTORY).query_mysql, out_sql)
income_res = pool.submit(LinkMysql(env.DB_HISTORY).query_mysql, income_sql)
out_data = out_res.result()
income_data = income_res.result()
out_order_number = [i['order_number'] for i in out_data]
income_order_number = [i['order_number'] for i in income_data]
error_order_number = list(set(out_order_number) ^ set(income_order_number))
if len(error_order_number) == 1:
error_sql = f"select uuid,type,reference_type,order_number,amount/1000 as amount,create_time from {self.date} where order_number='{error_order_number[0]}'"
elif len(error_order_number) > 1:
error_sql = f"select uuid,type,reference_type,order_number,amount/1000 as amount,create_time from {self.date} where order_number in{tuple(error_order_number)}"
else:
return []
res = LinkMysql(env.DB_HISTORY).query_mysql(error_sql)
return res
...@@ -75,8 +75,8 @@ def month_query_total_export(key: str, ...@@ -75,8 +75,8 @@ def month_query_total_export(key: str,
"""月度计算,详情""" """月度计算,详情"""
if not date: if not date:
return HttpResultResponse(code=500, msg='查询月份不能为空') return HttpResultResponse(code=500, msg='查询月份不能为空')
result, num = crud.CalculationMonthlyDetails().statement_income_expenditure(key=key, is_income=is_income, page=page, size=size, date=date) result, total, count = crud.CalculationMonthlyDetails().statement_income_expenditure(key=key, is_income=is_income, page=page, size=size, date=date)
return HttpResultResponse(total=num, data=result) return HttpResultResponse(total=total, data=result, count=count)
@router.post("/error/data") @router.post("/error/data")
...@@ -107,3 +107,12 @@ def reference_type_total(date: str, type: str): ...@@ -107,3 +107,12 @@ def reference_type_total(date: str, type: str):
return HttpResultResponse(code=500, msg='缺少必传参数') return HttpResultResponse(code=500, msg='缺少必传参数')
result = crud.ReferenceTypeClassification(date, type).classification_summary() result = crud.ReferenceTypeClassification(date, type).classification_summary()
return HttpResultResponse(data=result) return HttpResultResponse(data=result)
@router.get("/abnormal/data")
def abnormal_total(date: str, type: str, token=Depends(login_required)):
"""异常数据详情"""
if not all([date, type]):
return HttpResultResponse(code=500, msg='缺少必传参数')
result = crud.AbnormalDataDetails(date, type).abnormal_task()
return HttpResultResponse(data=result)
from core.config.env import env from core.config.env import env
from libs.db_link import LinkMysql from libs.db_link import LinkMysql
sql = f"SELECT keyName,keyValue,type FROM fi_account_type"
res_data = LinkMysql(env.DB_3YV2).query_mysql(sql)
TYPE_NAME = {} def query_fi_account_type():
for i in res_data:
if not TYPE_NAME.get(i['keyValue']): fi_type = {}
TYPE_NAME[i['keyValue']] = i['keyName'] sql = f"SELECT keyName,keyValue,type FROM fi_account_type"
res_data = LinkMysql(env.DB_3YV2).query_mysql(sql)
for i in res_data:
if not fi_type.get(i['keyValue']):
fi_type[i['keyValue']] = i['keyName']
return fi_type
TYPE_NAME = query_fi_account_type()
# TYPE_NAME_T = { # TYPE_NAME_T = {
# "updateUserNameFee": "用户昵称修改", # "updateUserNameFee": "用户昵称修改",
......
...@@ -113,7 +113,6 @@ def search(params, method): ...@@ -113,7 +113,6 @@ def search(params, method):
if response.status_code != 200: if response.status_code != 200:
return {} return {}
text = json.loads(response.text) text = json.loads(response.text)
print(text)
return text return text
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment