Commit 16d7f3a2 authored by 郑磊's avatar 郑磊

更新到最新版本

parent c68df983
import json
import math
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor
import pandas as pd
from sqlalchemy.orm import Session
from core.config.env import env, red
from libs import dlc
from core.config.env import env
from libs.business import TYPE_NAME, GUILD_NAME
from libs.db_link import LinkMysql
from libs.functions import get_now_timestamp, time_str_to_timestamp, \
......@@ -795,7 +792,6 @@ class HomePageDisplay(object):
Logger(20).logger.info("查询数据中!!!!")
assets_sql = f"select reference_type,type,sum(cast(amount as decimal(20,6))) as amount from assets_log_{self.date} where {' and '.join(assets_cond)} GROUP BY reference_type,type"
Logger(20).logger.info(assets_sql)
# total_data = dlc.tx_query_list("clearing_center", assets_sql, ["reference_type", "type", "amount"])
total_data = LinkMysql(env.DB_HISTORY).query_mysql(assets_sql)
# 数据分类
income = []
......
......@@ -248,13 +248,3 @@ def anchor_account(anchor_id: Optional[int] = None, ):
"""主播账户余额"""
anchor_money = crud.AccountAnchor(anchor_id).anchor_balance()
return HttpResultResponse(data=anchor_money)
@router.get("/ceshi_dlc")
def anchor_account_dlc():
from libs.dlc import dlc_resultset
sql_se = "select reference_type,type,sum(cast(amount as decimal(20,6))) as amount from assets_log where year=202310 and uuid='e7f52d19-4ec4-ae62-0a7e-df11fe61774d' GROUP BY reference_type,type",
abase_name = "clearing_center",
field = ["reference_type", "type", "amount"]
anchor_money = dlc_resultset(sql_se, abase_name, field)
return HttpResultResponse(data=anchor_money)
......@@ -37,59 +37,18 @@ COS_RERURN_PATH = '/images/'
tencent = apo.get("TencentCloud")
class Env(BaseSettings):
"""基类,公共参数"""
class AppEnv(BaseSettings):
"""环境配置"""
DEBUG: bool = True
TESTING: bool = False
DATABASE_URI: str = 'sqlite://:memory:'
DATABASE_USER: str = ''
DATABASE_PWD: str = ''
PAYMENT_URL = 'https://oss.3yakj.com/application_static_data'
class TestingEnv(Env):
"""测试环境配置"""
TESTING: bool = True
LOG_PATH = os.path.join(SITE_ROOT_TOO, "bin", "runtime", "logs", "info", "")
LOG_ERROR_PATH = os.path.join(SITE_ROOT_TOO, "bin", "runtime", "logs", "error", "")
PEM_PATH = os.path.join(SITE_ROOT_YAML, "config", "")
YAML_DATA = apo.get('yaml')
LOGIN_URL = YAML_DATA.get('login_url')
NACOS_URL = YAML_DATA.get('config_url')
NACOSCONFIG = "show=all&dataId=fj-finance-test&group=DEFAULT_GROUP&tenant=cw-test&namespaceId=cw-test"
NACOS_NAME = YAML_DATA.get('name')
NACOS_PWD = YAML_DATA.get('pwd')
DB_HISTORY = apo.get('history')
DB_3YV2 = apo.get('business')
Redis = apo.get('redis')
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256"
PASSWORD: str = "fj147sy258.#"
oss_url = 'http://oss.3yakj.com/application_static_data'
CLEARING_CENTER_URL: str = 'http://106.55.103.148:6464/'
CLEARING_CENTER_HOST: str = '106.55.103.148'
CLEARING_CENTER_PORT: int = 5454
KEY = "dK8tZ1jM0wA6oE3j"
PHP_URL = "http://106.55.103.148:8787"
TX_URL = "http://dev.jdbc.3yakj.com/dlc/resultset"
TX_SECRET_ID = tencent.get("secret_id")
TX_SECRET_KEY = tencent.get("secret_key")
TX_REGION = tencent.get("region")
TX_BUCKET = tencent.get("bucket")
class ProdEnv(Env):
"""生产环境配置"""
LOG_PATH = os.path.join(SITE_ROOT_TOO, "bin", "runtime", "logs", "info", "")
LOG_ERROR_PATH = os.path.join(SITE_ROOT_TOO, "bin", "runtime", "logs", "error", "")
PEM_PATH = os.path.join(SITE_ROOT_YAML, "config", "")
YAML_DATA = apo.get('yaml')
NACOS_NAME = YAML_DATA.get('name')
NACOS_PWD = YAML_DATA.get('pwd')
LOGIN_URL = YAML_DATA.get('login_url')
NACOS_URL = YAML_DATA.get('config_url')
NACOSCONFIG = "dataId=fj-finance&group=DEFAULT_GROUP&namespaceId=cw-pro&tenant=cw-pro&show=all&username=fj_finance"
DB_HISTORY = apo.get('history')
DB_3YV2 = apo.get('business')
Redis = apo.get('redis')
......@@ -101,15 +60,8 @@ class ProdEnv(Env):
CLEARING_CENTER_PORT: int = 5454
KEY = "dK8tZ1jM0wA6oE3j"
PHP_URL = "http://219.152.95.226:6750"
TX_URL = "http://etl-rds.3yakj.com/prod-api/dlc/resultset"
TX_SECRET_ID = tencent.get("secret_id")
TX_SECRET_KEY = tencent.get("secret_key")
TX_REGION = tencent.get("region")
TX_BUCKET = tencent.get("bucket")
# env = TestingEnv() # 开发环境
env = ProdEnv() # 生产环境
env = AppEnv()
redis_data = env.Redis
pool = redis.ConnectionPool(host=redis_data.get("host"), port=redis_data.get("port"), password=redis_data.get("password"),
......
......@@ -4,10 +4,6 @@ import pymysql
from DBUtils.PooledDB import PooledDB
# 连接mysql
from core.config.env import env, red
class LinkMysql(object):
def __init__(self, db_info):
self.POOLMYSQL = PooledDB(
......@@ -52,39 +48,3 @@ class LinkMysql(object):
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute(sql)
# class RabbitMqConn(object):
# """rabbitmq 连接"""
# def __init__(self):
# rb = env.RABBITMQ
# rb_info = pika.PlainCredentials(rb.get('username'), rb.get('password'))
# self.connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(rb.get('host'), rb.get('port'), rb.get('vhost'), rb_info))
# self.channel = self.connection.channel()
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
# 连接服务器
class LinkMonitor(object):
def __init__(self):
conn = CosConfig(Region=env.TX_REGION, SecretId=env.TX_SECRET_ID, SecretKey=env.TX_SECRET_KEY,
Token=None, Scheme='https')
self.client = CosS3Client(conn)
# 断点上传
def upload_block(self, path, key, folder=""):
# 正常情况日志级别使用INFO,需要定位时可以修改为DEBUG,此时SDK会打印和服务端的通信信息
# logging.basicConfig(level=logging.INFO, stream=sys.stdout)
token = None
scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
response = self.client.upload_file(
Bucket=env.TX_BUCKET,
Key=folder + key + '.xlsx',
LocalFilePath=path,
EnableMD5=False,
progress_callback=None
)
import datetime
import json
import time
import requests
from core.config.env import env
from libs import functions
from libs.log_utils import Logger
def retry(number=1):
"""
最外层传递装饰器参数(这一层可以不写)
中间层传递被装饰器装饰的函数,这里相当于get()
内层传递被装饰器装饰函数的参数,这里是get()的参数a
装饰器作用:限制request重复请求的次数
"""
def outer(func):
def inner(database_name, mon_sql, field_list):
num = 1
res_data = 0
for i in range(number):
try:
res_data = func(database_name, mon_sql, field_list)
break
except Exception as e:
Logger(21).logger.info(f"------------JDBC超时或者失败{i + 1}次-----------")
time.sleep(1)
num += 1
if num == 3:
now_time = functions.get_now_datetime()
environment = "正式环境"
if 'dev' in env.TX_URL:
environment = "测试环境"
msg = {
"error_time": now_time,
"error_env": environment,
"error_line": str(e.__traceback__.tb_frame.f_globals["__file__"]) + ':' + str(
e.__traceback__.tb_lineno),
"error_info": str(e),
"query_sql": str(mon_sql)
}
Logger(21).logger.error("腾讯数据库查询异常:" + str(msg))
return res_data
return res_data
return inner
return outer
@retry(number=2)
def tx_query_num(database_name, mon_sql, field_list):
"""
查询腾讯sql总数语句
:param database_name 数据库
:param mon_sql 查询sql
:param field_list 要返回的字段
:return 0
"""
data = {
"sql_sentence": mon_sql,
"database_name": database_name,
"expiretime": 10,
"fields": field_list
}
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
res = requests.post(url=env.TX_URL, json=data)
end_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if res.status_code != 200:
raise Exception(res.text)
content = json.loads(res.text)
Logger(21).logger.info(f"数据库查询sql:{str(mon_sql)},结果:{str(content)},开始时间:{str(start_time)},结束时间:{str(end_time)}")
if content.get("code") == 200:
row = content.get("rows")[0]
amount = row.get("amount")
if not amount:
amount = 0
else:
amount = 0
return amount
@retry(number=2)
def tx_query_list(database_name, mon_sql, field_list):
"""
查询sql语句
:param database_name
:param mon_sql
:param field_list
:return []
"""
data = {
"sql_sentence": mon_sql,
"database_name": database_name,
"expiretime": 10,
"fields": field_list
}
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
res = requests.post(url=env.TX_URL, json=data)
end_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if res.status_code != 200:
raise Exception(res.text)
content = json.loads(res.text)
Logger(21).logger.info(
f"数据库查询sql:{database_name}---结果:{content.get('code')}---,开始时间:{start_time},结束时间:{end_time}")
if res.status_code == 200:
row = content.get("rows")
if not row:
row = []
else:
row = []
return row
import jaydebeapi
def dlc_resultset(database_name,sql_sentence, fields):
SecretId = "AKID1sWw86RVyJVSDlBA1e3D9rLbdVDyusAI"
SecretKey = "4va1GH5gCVcWEj3A9wb3ILFxSo24tWw9"
dirver = "com.tencent.cloud.dlc.jdbc.DlcDriver"
jarFile = '../static/dlc-jdbc-2.2.0-jar-with-dependencies.jar'
url = f'jdbc:dlc:dlc.tencentcloudapi.com?task_type=SQLTask&database_name={database_name}&datasource_connection_name=DataLakeCatalog&region=ap-guangzhou&data_engine_name=shared_prestos&result_type=COS'
end = int(time.time())
try:
start = int(time.time())
conn = jaydebeapi.connect(dirver, url, [SecretId, SecretKey], jarFile)
curs = conn.cursor()
curs.execute(sql_sentence)
result = curs.fetchall()
curs.close()
conn.close()
result_datas = datum_datas(sql_sentence, result, fields).datum_dispose()
Logger(21).logger.info(f"数据为:{result_datas}+“-----”+总数为:{len(result)}")
end = int(time.time())
return {'code': 200, 'msg': '创建成功', 'status': 'SUCCESS', 'rows': result_datas, 'total': len(result_datas)}
except Exception as e:
start = int(time.time())
Logger(21).logger.info(f"请求的sql:{sql_sentence}" + "-----" + f"花费时间为+{end - start}" + "-----" + f"异常为{e}")
return {'code': 500, 'msg': '请求异常', 'status': 'SUCCESS', 'rows': '', 'error': str(e)}
import re
class datum_datas():
def __init__(self, sql, data, fields):
self.sql = sql
self.da = ''
self.data = data
self.fields = fields
def datum_dispose(self):
data = re.findall("select(.*?) from", self.sql)
if self.fields:
self.da = self.fields
data_origin = self.daum_detry()
return data_origin
else:
for date in data:
da = date.split(',')
self.da = da
data_origin = self.datum_collect()
return data_origin
def daum_detry(self):
total = []
for i in range(len(self.data)):
b = 0
items = {}
for item in self.da:
items[item] = self.data[i][b]
b += 1
total.append(items)
return total
def datum_collect(self):
total_data = []
for i in range(len(self.data)):
item = {}
s = 0
c = 0
for de in self.da:
if 'as' in de:
if len(self.da) > 1:
name = self.da[s].split('as')[1]
else:
name = de.split('as')[1]
try:
name = name.strip()
if '`' in name:
name = name
c += 1
except Exception as e:
continue
else:
name = de.split('b.')[1]
try:
if self.data[i][s] == 'id':
continue
except Exception as e:
continue
try:
item[name] = self.data[i][s]
except:
item[name] = self.data[i]
s += 1
total_data.append(item)
return total_data
\ No newline at end of file
import math
import os
import random
import time
import openpyxl
import threading
......@@ -8,10 +7,7 @@ import pandas as pd
from fastapi import Response
from app.api.statement.guild import query_token
from starlette.responses import StreamingResponse
from datetime import datetime
from app.api.export import crud
from core.config.env import env
from libs.db_link import LinkMonitor
from libs.log_utils import Logger
......@@ -274,7 +270,6 @@ class TableToFile(object):
for z in threads:
z.join()
SITE_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LinkMonitor().upload_block(f'{SITE_ROOT}/static/{self.name}.xlsx', self.name, 'finance/')
# 记录导出
crud.create_export_data(self.db, params, user)
except Exception as e:
......@@ -282,5 +277,8 @@ class TableToFile(object):
params["status"] = 2
crud.create_export_data(self.db, params, user)
return None
return f"https://{env.TX_BUCKET}.cos.ap-guangzhou.myqcloud.com/finance/{self.name}.xlsx"
with open(f'{SITE_ROOT}/static/{self.name}.xlsx', 'rb') as f:
response = Response(content=f.read())
response.headers["Content-Disposition"] = "attachment; filename=example.zip"
Logger().logger.info(f"返回压缩文件!!!")
return response
......@@ -7,7 +7,7 @@ import socket
from PIL import Image, ImageFont, ImageDraw
from six import BytesIO
from core.config.env import client, Bucket, secret_key, region, secret_id, COS_PATH, COS_RERURN_PATH, red
from core.config.env import client, Bucket, COS_RERURN_PATH, red
session = requests.session()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment