Commit 445011df authored by xianyang's avatar xianyang

优化业务类型汇总

parent 0198c5e6
import math
import threading
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
...@@ -70,60 +72,149 @@ def get_source_data(db): ...@@ -70,60 +72,149 @@ def get_source_data(db):
class CalculationMonthlyBill(object): class CalculationMonthlyBill(object):
"""月度计算,出入账目""" """月度计算,出入账目"""
def __init__(self): def __init__(self, date, key_type, name, page, size):
self.structure_list = [] self.date = date
self.key_type = key_type
self.name = name
self.page = page
self.size = size
self.structure_dict = {}
self.structure_key = [] self.structure_key = []
self.query_data = []
def thead_task(self, as_list, page, size):
if as_list:
q_sql = f"SELECT reference_type, type, sum(cast(amount as decimal(20,3))) as money FROM (select reference_type,type,amount FROM {self.date} where {' and '.join(as_list)} limit {page},{size}) as a GROUP BY reference_type,type ORDER BY reference_type"
else:
q_sql = f"SELECT reference_type, type, sum(cast(amount as decimal(20,3))) as money FROM (select reference_type,type,amount FROM {self.date} limit {page},{size}) as a GROUP BY reference_type, type ORDER BY reference_type"
count_data = LinkMysql(env.DB_HISTORY).query_mysql(q_sql)
self.query_data = self.query_data + count_data
def data_deal_with(self):
pd_data = pd.DataFrame(self.query_data)
print(pd_data)
re_data = pd_data.groupby(['reference_type', 'type']).sum()
all_data = re_data.to_dict()
all_data_dict = all_data.get('money')
for k, v in all_data_dict.items():
reference_type, type = k[0], k[1]
# reference_type第二次进来 self.structure_dict有拼接好的数据
if reference_type in self.structure_key:
if type == 0:
self.structure_dict[reference_type]['expenditure'] = v / 1000
else:
self.structure_dict[reference_type]['income'] = v / 1000
else:
# reference_type类型第一次进来 类型加入self.structure_key列表,拼接数据加入self.structure_dict下次计算异常数据
if reference_type in TYPE_NAME:
name = TYPE_NAME[reference_type]
else:
name = reference_type
constructed = {
"name": name,
"type": reference_type,
"expenditure": 0, # 支出
"income": 0,
"is_error": 0,
"error_money": 0
}
if type == 0:
constructed['expenditure'] = v / 1000
else:
constructed['income'] = v / 1000
self.structure_key.append(reference_type)
self.structure_dict[reference_type] = constructed
def month_statistics_task(self, date, key_type, name, page, size): def month_statistics_task(self):
"""主函数"""
assert_list = [] assert_list = []
if name: if self.name:
k_list = [] k_list = []
type_name = query_fi_account_type() type_name = query_fi_account_type()
for k, v in type_name.items(): for k, v in type_name.items():
if v == name or name in v: if v == self.name or self.name in v:
k_list.append(k) k_list.append(k)
if len(k_list) > 1: if len(k_list) > 1:
assert_list.append(f" reference_type in{tuple(k_list)}") assert_list.append(f" reference_type in{tuple(k_list)}")
if len(k_list) == 1: if len(k_list) == 1:
assert_list.append(f" reference_type='{k_list[0]}'") assert_list.append(f" reference_type='{k_list[0]}'")
if not k_list: if not k_list:
py = Pinyin().get_pinyin(name) py = Pinyin().get_pinyin(self.name)
assert_list.append(f" reference_type='{py.replace('-', '')}'") assert_list.append(f" reference_type='{py.replace('-', '')}'")
if key_type: if self.key_type:
assert_list.append(f" reference_type like '%{key_type}%'") assert_list.append(f" reference_type like '%{self.key_type}%'")
if assert_list: count_sql = f"select count(id) as num from {self.date}"
sql = f"SELECT reference_type, type, SUM(amount)/1000 as money FROM {date} where {' and '.join(assert_list)} GROUP BY reference_type, type ORDER BY reference_type" count_data = LinkMysql(env.DB_HISTORY).query_mysql(count_sql)
else: num = math.ceil(count_data[0]['num'] / 10)
sql = f"SELECT reference_type, type, SUM(amount)/1000 as money FROM {date} GROUP BY reference_type, type ORDER BY reference_type" # 多线程
try: ths = []
res_data = LinkMysql(env.DB_HISTORY).query_mysql(sql) # 创建线程
except Exception as e: for i in range(10):
return [], 0 ths.append(threading.Thread(target=self.thead_task,
type_name = query_fi_account_type() args=[assert_list, num*i, num]))
for res in res_data: # 启动线程
if res["reference_type"] in self.structure_key: for i in range(10):
continue ths[i].start()
if res["reference_type"] in type_name: # 等待子线程
name = type_name[res["reference_type"]] for i in range(10):
else: ths[i].join()
name = res["reference_type"] self.data_deal_with()
out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0] res_all_data = []
income = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 1] for k, v in self.structure_dict.items():
out_value = out[0] if out else 0 v['is_error'] = 0 if v['expenditure'] == v['income'] else 1
income_value = income[0] if income else 0 v['error_money'] = float('%.3f' % (v['expenditure'] - v['income']))
a = { res_all_data.append(v)
"name": name, return res_all_data[(self.page - 1) * self.size:self.page * self.size], len(res_all_data)
"type": res["reference_type"],
"expenditure": out_value, # def month_statistics_task(self, date, key_type, name, page, size):
"income": income_value, # """主函数"""
"is_error": 0 if out_value == income_value else 1, # assert_list = []
"error_money": float('%.3f' % (out_value - income_value)) # if name:
} # k_list = []
self.structure_key.append(res["reference_type"]) # type_name = query_fi_account_type()
self.structure_list.append(a) # for k, v in type_name.items():
# return self.data_to_table(self.structure_list) # 导出接口 # if v == name or name in v:
return self.structure_list[(page-1)*size:page*size], len(self.structure_list) # k_list.append(k)
# if len(k_list) > 1:
# assert_list.append(f" reference_type in{tuple(k_list)}")
# if len(k_list) == 1:
# assert_list.append(f" reference_type='{k_list[0]}'")
# if not k_list:
# py = Pinyin().get_pinyin(name)
# assert_list.append(f" reference_type='{py.replace('-', '')}'")
# if key_type:
# assert_list.append(f" reference_type like '%{key_type}%'")
# if assert_list:
# sql = f"SELECT reference_type, type, SUM(amount)/1000 as money FROM {date} where {' and '.join(assert_list)} GROUP BY reference_type, type ORDER BY reference_type"
# else:
# sql = f"SELECT reference_type, type, SUM(amount)/1000 as money FROM {date} GROUP BY reference_type, type ORDER BY reference_type"
# try:
# res_data = LinkMysql(env.DB_HISTORY).query_mysql(sql)
# except Exception as e:
# return [], 0
# type_name = query_fi_account_type()
# for res in res_data:
# if res["reference_type"] in self.structure_key:
# continue
# if res["reference_type"] in type_name:
# name = type_name[res["reference_type"]]
# else:
# name = res["reference_type"]
# out = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 0]
# income = [i['money'] for i in res_data if i['reference_type'] == res["reference_type"] and i['type'] == 1]
# out_value = out[0] if out else 0
# income_value = income[0] if income else 0
# a = {
# "name": name,
# "type": res["reference_type"],
# "expenditure": out_value,
# "income": income_value,
# "is_error": 0 if out_value == income_value else 1,
# "error_money": float('%.3f' % (out_value - income_value))
# }
# self.structure_key.append(res["reference_type"])
# self.structure_list.append(a)
# # return self.data_to_table(self.structure_list) # 导出接口
# return self.structure_list[(page-1)*size:page*size], len(self.structure_list)
class CalculationMonthlyDetails(object): class CalculationMonthlyDetails(object):
......
...@@ -60,8 +60,7 @@ def month_query_total_export(page: int, ...@@ -60,8 +60,7 @@ def month_query_total_export(page: int,
if not date: if not date:
month_date = datetime.now().date() - relativedelta(months=1) month_date = datetime.now().date() - relativedelta(months=1)
date = month_date.strftime("%Y%m") date = month_date.strftime("%Y%m")
result, num = crud.CalculationMonthlyBill().month_statistics_task('assets_log_' + date, type, name, result, num = crud.CalculationMonthlyBill('assets_log_' + date, type, name, page, size).month_statistics_task()
page, size)
return HttpResultResponse(total=num, data=result) return HttpResultResponse(total=num, data=result)
......
...@@ -93,8 +93,8 @@ class ProdEnv(Env): ...@@ -93,8 +93,8 @@ class ProdEnv(Env):
CLEARING_CENTER_PORT: int = 5454 CLEARING_CENTER_PORT: int = 5454
# env = TestingEnv() # 开发环境 env = TestingEnv() # 开发环境
env = ProdEnv() # 生产环境 # env = ProdEnv() # 生产环境
redis_data = env.Redis redis_data = env.Redis
pool = redis.ConnectionPool(host=redis_data.get("host"), port=redis_data.get("port"), password=redis_data.get("password"), pool = redis.ConnectionPool(host=redis_data.get("host"), port=redis_data.get("port"), password=redis_data.get("password"),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment