Commit b41a1181 authored by xianyang's avatar xianyang

账户类型汇总代码(查dlc)

parent 8829c33e
......@@ -6,6 +6,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
import pandas as pd
from sqlalchemy.orm import Session
from core.config.env import env, red
from libs import dlc
from libs.business import TYPE_NAME, GUILD_NAME
from libs.db_link import LinkMysql
from libs.functions import get_now_timestamp, time_str_to_timestamp, \
......@@ -560,7 +561,7 @@ class AccountStatistics(object):
start_time = time_str_to_timestamp(self.start_time + ' 00:00:00')
end_time = time_str_to_timestamp(self.end_time + ' 23:59:59')
# guild_cond_list.append(f" (income>0 or outcome>0 or initial_money>0) ")
guild_cond_list.append(f" create_time >={start_time} and create_time<= {end_time}")
guild_cond_list.append(f" create_time >={start_time} and create_time< {end_time}")
if self.user_id:
guild_cond_list.append(f" guild_id={self.user_id}")
other_sql = f"select guild_id,initial_money as balance,income,outcome,create_time,amount_type from v3_guild_account_detail_copy where {' and '.join(guild_cond_list)} and amount_type != 1"
......@@ -641,6 +642,9 @@ def delete_specify_field(data, unique_tag):
def delete_guild_specify_field(data):
res_list = []
for i in data:
if not i.get('guild_id'):
Logger(20).logger.info(f'错误数据:{i}')
continue
structure = {
"name": GUILD_NAME[i.get('guild_id')],
"guild_id":i.get('guild_id', ''),
......@@ -735,10 +739,8 @@ class SpecificAccountQuery(object):
class HomePageDisplay(object):
def __init__(self, date, unique_tag):
self.date = 'assets_log_' + date
self.date = date
self.unique_tag = unique_tag
self.account = []
self.guild = []
self.bus_data = []
self.income = 0
self.outcome = 0
......@@ -747,70 +749,64 @@ class HomePageDisplay(object):
res_ads = LinkMysql(env.DB_HISTORY).query_mysql(sql)
if res_ads:
self.bus_data += res_ads
df = pd.DataFrame(res_ads)
df_data = df.groupby(['reference_type', 'type'], as_index=False).sum()
child_thread_data = df_data.to_dict(orient='records')
Logger(20).logger.info(f'子线程{threading.Thread().getName()}查询结束!!!共{len(child_thread_data)}条')
self.bus_data += child_thread_data
def get_month_data(self):
is_data = red.get('account_sum-' + self.unique_tag + '-' + str(self.date))
if not is_data:
acc_sql = "select unique_tag,uuid from fi_account"
guild_sql = "select uuid from guild"
count_sql = f"select count(id) as num from {self.date}"
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, acc_sql)
future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, guild_sql)
future3 = pool.submit(LinkMysql(env.DB_HISTORY).query_mysql, count_sql)
acc_data = future1.result()
guild_data = future2.result()
count = future3.result()
account = [i['uuid'] for i in acc_data]
guild = [i['uuid'] for i in guild_data]
assets_cond = ['type in(0,1)']
if self.unique_tag == 'guild_account':
assets_cond.append(f" uuid in{tuple(guild)}")
elif self.unique_tag == 'user_account':
assets_cond.append(f" uuid not in{tuple(guild + account)} and amount_type in('consumable','withdrawable')")
elif self.unique_tag == 'knapsack_account':
assets_cond.append(f" amount_type='backpack'")
elif self.unique_tag == 'pledgeDeduction':
assets_cond.append(f" reference_type in('marginRecharge','pledgeDeduction')")
else:
acc_uuid = [i['uuid'] for i in acc_data if i['unique_tag'] == self.unique_tag]
if not acc_uuid:
Logger(20).logger.info('没找到系统账户')
return [], 0, 0
assets_cond.append(f" uuid='{acc_uuid[0]}'")
ths_count = math.ceil(count[0]['num']/5000000)
ths = []
for x in range(ths_count):
assets_sql = f"select reference_type,type,sum(cast(amount as FLOAT(3))) as amount from {self.date} where {' and '.join(assets_cond)} GROUP BY reference_type,type LIMIT {x * 5000000},{(x+1) * 5000000}"
ths.append(threading.Thread(target=self.th_task, args=[assets_sql]))
for y in range(ths_count):
ths[y].start()
for z in range(ths_count):
ths[z].join()
redis_dict = {"my_data": self.bus_data}
red.set('account_sum-' + self.unique_tag + '-' + str(self.date), json.dumps(redis_dict), 3600)
Logger(20).logger.info("开始查询!!")
acc_sql = "select unique_tag,uuid from fi_account"
guild_sql = "select uuid from guild"
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, acc_sql)
future2 = pool.submit(LinkMysql(env.DB_3YV2).query_mysql, guild_sql)
acc_data = future1.result()
guild_data = future2.result()
account = [i['uuid'] for i in acc_data]
guild = [i['uuid'] for i in guild_data]
Logger(20).logger.info("系统,公会账户查询完毕!!")
assets_cond = []
assets_cond.append(f"year={self.date}")
if self.unique_tag == 'guild_account':
assets_cond.append(f" uuid in{tuple(guild)}")
elif self.unique_tag == 'user_account':
assets_cond.append(f" uuid not in{tuple(guild + account)} and amount_type in('consumable','withdrawable')")
elif self.unique_tag == 'knapsack_account':
assets_cond.append(f" amount_type='backpack'")
elif self.unique_tag == 'pledgeDeduction':
assets_cond.append(f" reference_type in('marginRecharge','pledgeDeduction')")
else:
self.bus_data = json.loads(is_data)['my_data']
acc_uuid = [i['uuid'] for i in acc_data if i['unique_tag'] == self.unique_tag]
if not acc_uuid:
Logger(20).logger.info('没找到系统账户')
return [], 0, 0
assets_cond.append(f" uuid='{acc_uuid[0]}'")
Logger(20).logger.info("查询数据中!!!!")
assets_sql = f"select reference_type,type,sum(cast(amount as decimal(20,6))) as amount from assets_log where {' and '.join(assets_cond)} GROUP BY reference_type,type"
total_data = dlc.tx_query_list("clearing_center", assets_sql, ["reference_type", "type", "amount"])
# 数据分类
income = []
outcome = []
for i in self.bus_data:
Logger(20).logger.info("数据查询完成!!!!")
for i in total_data:
op = {}
if TYPE_NAME.get(i['reference_type']):
op['name'] = TYPE_NAME.get(i['reference_type'])
else:
op['name'] = i['reference_type']
op['money'] = round(i['amount'] / 1000, 3)
op['money'] = i['amount'] / 1000
if i['type'] == 0:
op['type'] = '出账'
self.outcome = self.outcome + i['amount']
outcome.append(op)
else:
elif i['type'] == 1:
op['type'] = '入账'
self.income = self.income + i['amount']
income.append(op)
Logger(20).logger.info("返回!!!!")
res_list = outcome + income
return res_list, self.outcome/1000, self.income/1000
......
......@@ -71,6 +71,7 @@ class TestingEnv(Env):
CLEARING_CENTER_PORT: int = 5454
KEY = "dK8tZ1jM0wA6oE3j"
PHP_URL = "http://106.55.103.148:8787"
TX_URL = "http://dev.jdbc.3yakj.com/dlc/resultset"
TX_SECRET_ID = tencent.get("secret_id")
TX_SECRET_KEY = tencent.get("secret_key")
TX_REGION = tencent.get("region")
......@@ -100,6 +101,7 @@ class ProdEnv(Env):
CLEARING_CENTER_PORT: int = 5454
KEY = "dK8tZ1jM0wA6oE3j"
PHP_URL = "http://219.152.95.226:6750"
TX_URL = "http://etl-rds.3yakj.com/prod-api/dlc/resultset"
TX_SECRET_ID = tencent.get("secret_id")
TX_SECRET_KEY = tencent.get("secret_key")
TX_REGION = tencent.get("region")
......
import datetime
import json
import time
import requests
from core.config.env import env
from libs import functions
from libs.log_utils import Logger
def retry(number=1):
"""
最外层传递装饰器参数(这一层可以不写)
中间层传递被装饰器装饰的函数,这里相当于get()
内层传递被装饰器装饰函数的参数,这里是get()的参数a
装饰器作用:限制request重复请求的次数
"""
def outer(func):
def inner(database_name, mon_sql, field_list):
num = 1
res_data = 0
for i in range(number):
try:
res_data = func(database_name, mon_sql, field_list)
break
except Exception as e:
Logger(21).logger.info(f"------------JDBC超时或者失败{i + 1}次-----------")
time.sleep(1)
num += 1
if num == 3:
now_time = functions.get_now_datetime()
environment = "正式环境"
if 'dev' in env.TX_URL:
environment = "测试环境"
msg = {
"error_time": now_time,
"error_env": environment,
"error_line": str(e.__traceback__.tb_frame.f_globals["__file__"]) + ':' + str(
e.__traceback__.tb_lineno),
"error_info": str(e),
"query_sql": str(mon_sql)
}
Logger(21).logger.error("腾讯数据库查询异常:" + str(msg))
return res_data
return res_data
return inner
return outer
@retry(number=2)
def tx_query_num(database_name, mon_sql, field_list):
"""
查询腾讯sql总数语句
:param database_name 数据库
:param mon_sql 查询sql
:param field_list 要返回的字段
:return 0
"""
data = {
"sql_sentence": mon_sql,
"database_name": database_name,
"expiretime": 10,
"fields": field_list
}
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
res = requests.post(url=env.TX_URL, json=data)
end_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if res.status_code != 200:
raise Exception(res.text)
content = json.loads(res.text)
Logger(21).logger.info(f"数据库查询sql:{str(mon_sql)},结果:{str(content)},开始时间:{str(start_time)},结束时间:{str(end_time)}")
if content.get("code") == 200:
row = content.get("rows")[0]
amount = row.get("amount")
if not amount:
amount = 0
else:
amount = 0
return amount
@retry(number=2)
def tx_query_list(database_name, mon_sql, field_list):
"""
查询sql语句
:param database_name
:param mon_sql
:param field_list
:return []
"""
data = {
"sql_sentence": mon_sql,
"database_name": database_name,
"expiretime": 10,
"fields": field_list
}
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
res = requests.post(url=env.TX_URL, json=data)
end_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if res.status_code != 200:
raise Exception(res.text)
content = json.loads(res.text)
Logger(21).logger.info(
f"数据库查询sql:{database_name}---结果:{content.get('code')}---,开始时间:{start_time},结束时间:{end_time}")
if res.status_code == 200:
row = content.get("rows")
if not row:
row = []
else:
row = []
return row
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment