Commit 1373742b authored by xianyang's avatar xianyang

优化tcp清算

parent 1633515b
......@@ -10,7 +10,7 @@ from core.config.env import env
from libs.business import TYPE_NAME, query_fi_account_type
from libs.db_link import LinkMysql
from libs.functions import wrapper_out, get_now_timestamp, uuid, get_before_timestamp, time_str_to_timestamp, \
get_yesterday_timestamp, search, get_last_month, get_date_list
get_yesterday_timestamp, get_last_month, get_date_list, send_json_rpc_request
from libs.log_utils import Logger
from libs.orm import QueryAllData
from models import account as models
......@@ -68,13 +68,14 @@ class HDUd():
money_res = LinkMysql(env.DB_3YV2).query_mysql(sql)
i['consumable'] = money_res[0]['number'] if money_res and money_res[0]['number'] else 0
else:
platform_data = search({"uuid": i['uuid']}, 'Server.UserQuery.GetUserAsset')
platform_data = send_json_rpc_request({"uuid": i['uuid']}, 'Server.UserQuery.GetUserAsset')
if platform_data['status']:
try:
i['consumable'] = platform_data['data']['result']['data']['asset_balance']['consumable'][
'current_amount']
# i['consumable'] = platform_data['data']['result']['data']['asset_balance']['consumable'][
# 'current_amount'] # http调用清算返回结果
i['consumable'] = platform_data['data']['asset_balance']['consumable']['current_amount']
except Exception as e:
print(e)
Logger(40).logger.error(f"账户列表获取余额异常,原因:{str(e)},uuid:{i['uuid']}")
i['consumable'] = 0
else:
i['consumable'] = 0
......@@ -165,13 +166,15 @@ def create_account(param):
'name': param.name,
}
try:
result = search(clearing_dict, 'Server.UserExecute.CreateBalanceUser')
result = send_json_rpc_request(clearing_dict, 'Server.UserExecute.CreateBalanceUser')
Logger().logger.info(f"创建账户清算结果:{str(result)}")
if result['data']['result']['status']:
account_uuid = result['data']['result']['data']['uuid']
# if result['data']['result']['status']: # http调用清算返回结果
# account_uuid = result['data']['result']['data']['uuid']
if result['status']:
account_uuid = result['data']['uuid']
sql = f"insert into fi_account(name, unique_tag, config_key, description, uuid, income, output, create_time) " \
f"values('{param.name}', '{param.unique_tag}', '{param.config_key}', '{param.remark}', '{account_uuid}', '{income}', '{output}', {get_now_timestamp()});"
account = LinkMysql(env.DB_3YV2).perform_mysql(sql)
LinkMysql(env.DB_3YV2).perform_mysql(sql)
except Exception as e:
Logger(40).logger.error(f"创建新账户失败,原因:{str(e)}")
if 'config_key' in str(e):
......
......@@ -5,7 +5,8 @@ from concurrent.futures.thread import ThreadPoolExecutor
from app.api.statement.guild import query_token
from core.config.env import env
from libs.db_link import LinkMysql
from libs.functions import time_str_to_timestamp, get_now_timestamp, get_order, search, get_now_datetime
from libs.functions import time_str_to_timestamp, get_now_timestamp, get_order, get_now_datetime, \
send_json_rpc_request
from libs.log_utils import Logger
from libs.token_verify import get_current_user
......@@ -93,7 +94,7 @@ class GuildMargin(object):
'reference_type': reference_type,
'reference_info': '保证金充值',
}
res = search(request_data, 'Server.UserExecute.Recharge')
res = send_json_rpc_request(request_data, 'Server.UserExecute.Recharge')
Logger().logger.info(f"清算recharge_user结果:{str(res)}")
insert_sql = f"insert into all_record_table(user_id, type, status, uuid, reference_number, money, amount_type, money_data, is_add, create_time, errmsg) " \
f"values({guild_id}, '{reference_type}', 2, '{ice_uuid}', {reference_number}, {money * 10}, 1, '保证金充值',1,{timestamp}, '{json.dumps(res)}');"
......
......@@ -10,8 +10,8 @@ from sqlalchemy import and_, func, engine
from sqlalchemy.orm import Session
from core.config.env import env, COS_PATH, COS_RERURN_PATH
from libs.db_link import LinkMysql
from libs.functions import get_now_timestamp, get_now_datetime, search, get_order, get_ip, time_str_to_timestamp, \
time_int_timestamp
from libs.functions import get_now_timestamp, get_now_datetime, get_order, get_ip, time_str_to_timestamp, \
time_int_timestamp, send_json_rpc_request
from libs.orm import QueryAllData
from libs.token_verify import get_current_user
from models.recharge import Settlement, Fitransferlog, FinanceFixLog, Account_log, Paymentlog
......@@ -238,10 +238,11 @@ def query_uuid_or_user_number(param):
def account_money(uuid, amount_type):
"""查询账户余额"""
platform_data = search({"uuid": uuid}, 'Server.UserQuery.GetUserAsset')
platform_data = send_json_rpc_request({"uuid": uuid}, 'Server.UserQuery.GetUserAsset')
if platform_data['status']:
try:
platform_money = platform_data['data']['result']['data']['asset_balance'][amount_type]['current_amount']
# platform_money = platform_data['data']['result']['data']['asset_balance'][amount_type]['current_amount']
platform_money = platform_data['data']['asset_balance'][amount_type]['current_amount']
except Exception as e:
print(f"清算系统异常:{e}")
platform_money = '清算系统异常'
......@@ -277,7 +278,7 @@ def transfer_trigger_task(uuid, user_id, balance, type, amount_type, remark='用
"amount_type": amount_type if amount_type else 'consumable',
"notify_url": ""
}
res = search(request_data, 'Server.UserExecute.Transfer')
res = send_json_rpc_request(request_data, 'Server.UserExecute.Transfer')
try:
if not res['data']['result']['status']:
if 'Insufficient assets' in res['data']['result']['msg']:
......@@ -389,7 +390,7 @@ def create_fix_table(db: Session, param, h_list):
"reference_info": [],
"timestamp": get_now_timestamp()
}
clearing_res = search(data, method)
clearing_res = send_json_rpc_request(data, method)
if clearing_res['status']:
unique_res = []
if param.type == 0:
......@@ -513,7 +514,7 @@ def transfer_query(data):
params['src_uuid'] = uuid
params['ip'] =ip
params['timestamp'] = get_now_timestamp()
status =search(params=params, method='Server.BaseExecute.Reduce')
status = send_json_rpc_request(params=params, method='Server.BaseExecute.Reduce')
return status
......
......@@ -89,11 +89,11 @@ async def uploadFile(a_file: bytes = File(...), token=Depends(login_required), )
@router.post("/payment/add")
def guild_payment_list(data: PaymentAdd, db: Session = Depends(get_db),token=Depends(login_required)):
"""提交打款"""
status=transfer_query(data)
status = transfer_query(data)
if status:
return HttpResultResponse(code=200,data='')
return HttpResultResponse(code=200, data='')
else:
return HttpResultResponse(code=500,msg="打款失败")
return HttpResultResponse(code=500, msg="打款失败")
@router.get("/onaccount")
......
......@@ -65,6 +65,8 @@ class TestingEnv(Env):
PASSWORD: str = "fj123456"
oss_url = 'http://oss.3yakj.com/application_static_data'
CLEARING_CENTER_URL: str = 'http://106.55.103.148:6464/'
CLEARING_CENTER_HOST: str = '106.55.103.148'
CLEARING_CENTER_PORT: int = 5454
class ProdEnv(Env):
......@@ -86,6 +88,8 @@ class ProdEnv(Env):
ALGORITHM: str = "HS256"
PASSWORD: str = "fj123456"
CLEARING_CENTER_URL: str = 'http://47.103.144.36:5454/'
CLEARING_CENTER_HOST: str = '47.103.144.36'
CLEARING_CENTER_PORT: int = 5454
env = TestingEnv() # 测试环境
......
......@@ -98,7 +98,7 @@ def wrapper_out():
def search(params, method):
"""
调用清算接口
调用清算接口http
:param params: 传入参数
:param method: 传入方法
"""
......@@ -116,6 +116,39 @@ def search(params, method):
return text
def send_json_rpc_request(params, method):
"""
调用清算接口tcp
:param params: 传入参数
:param method: 传入方法
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((env.CLEARING_CENTER_HOST, env.CLEARING_CENTER_PORT))
request = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": '125'
}
# 将请求对象编码为 JSON 字符串
request_str = json.dumps(request) + "\n" # test协议一定要加\n
# 发送 JSON-RPC 请求
s.sendall(request_str.encode())
# 接收服务器响应
response_str = s.recv(1024).decode()
# 将响应字符串解码为 JSON 对象
response = json.loads(response_str)
if "error" in response:
return {}
return response["result"]
def get_ip():
res = socket.gethostbyname(socket.gethostname())
return res
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment