Commit 9d877115 authored by 郑磊's avatar 郑磊

更新到最新代码

parent f6b4831e
*-cache
*.bak
**/__pycache__
*.pyc
/logs/
/test/
*.~*
*-workspace
*.db
fs-env/
.idea/
static/
bin/
\ No newline at end of file
/static/*
!/static/DejaVuSans-BoldOblique.ttf
/core/config/*.yaml
This diff is collapsed.
......@@ -16,6 +16,7 @@ class AccountCreate(BaseModel):
remark: Optional[str] = ''
unique_tag: Optional[str] = ''
config_key: Optional[str] = ''
beneficiary: Optional[str] = ''
income: Optional[list] = []
output: Optional[list] = []
......@@ -32,6 +33,7 @@ class AccountUpdate(BaseModel):
remark: str
income: list
output: list
beneficiary: Optional[str] = ''
class FinanceInfo(PublicModel):
......@@ -45,6 +47,7 @@ class FinanceDetails(PublicModel):
uuid: str
user_id: Optional[int] = None
unique_tag: Optional[str] = ""
amount_type: Optional[str] = ""
class FixTable(BaseModel):
......
......@@ -7,10 +7,7 @@ from app.api.account import schemas, crud
from app.api.account.crud import AccountStatistics, SpecificAccountQuery, HomePageDisplay
from app.api.statement import crud as statement_crud
from libs import functions
from libs.business import GUILD_NAME
from libs.export import Export, TableToFile
from libs.functions import get_date_list
from libs.log_utils import Logger
from libs.result_format import HttpResultResponse, HttpMessage
from libs.token_verify import login_required
......@@ -75,7 +72,13 @@ def finance_info_excel(data: schemas.FinanceInfo, request: Request,
headers = request.get("headers")
statement_list = crud.get_finance_info(data.unique_tag, data.id, data.page, 99999999, data.start_time, data.end_time)
df = ['账户余额', '入账', '出账', '时间']
return statement_crud.data_to_file(db, statement_list, "财务信息", headers, df)
if data.unique_tag in ['guild_account', 'user_account']:
df = ['id', '账户余额', '入账', '出账', '时间', '可消费入账', '可消费出账', '可提现入账', '可提现出账']
if data.unique_tag in ['anchor_account', 'knapsack_account', 'pledgeDeduction']:
df.insert(0, 'id')
# return statement_crud.data_to_file(db, statement_list, "财务信息", headers, df)
url = TableToFile(db, statement_list, data.unique_tag, headers, df).main_method()
return HttpResultResponse(data=url)
@router.get("/finance/details")
......@@ -88,12 +91,13 @@ def finance_details(page: int,
type: Optional[int] = None,
gift_type: Optional[str] = "",
unique_tag: Optional[str] = "",
amount_type: Optional[str] = "",
token=Depends(login_required)):
"""账户财务明细列表"""
if not start_time and not end_time:
return HttpResultResponse(code=500, msg="请输入你要查询的时间段")
res, total, count = AccountStatistics(page, size, uuid, user_id, start_time, end_time, type, gift_type,
unique_tag).get_finance_details(is_list=1)
unique_tag, amount_type).get_finance_details(is_list=1)
return HttpResultResponse(total=total, data=res, count=count)
......@@ -103,19 +107,21 @@ def finance_info_excel(data: schemas.FinanceDetails, request: Request,
"""账户财务明细导出"""
headers = request.get("headers")
statement_list = AccountStatistics(data.page, 99999999, data.uuid, data.user_id, data.start_time, data.end_time, data.type,
data.gift_type, data.unique_tag).get_finance_details()
data.gift_type, data.unique_tag, data.amount_type).get_finance_details()
if data.unique_tag in ["knapsack_account", "user_account", "guild_account", "pledgeDeduction"]:
field_head = ['uuid', '入账', '出账', '时间']
statement_list = statement_list[0]
if data.unique_tag == 'guild_account':
field_head = ['公会名', '公会id', '入账', '出账', '余额', '时间']
field_head = ['公会名', '公会id', '总入账', '总出账', '可消费入账', '可消费出账', '可提现入账', '可提现出账', '余额', '时间']
data = crud.delete_guild_specify_field(statement_list)
return TableToFile(db, data, "财务明细", headers, field_head).main_method()
url = TableToFile(db, data, "财务明细", headers, field_head).main_method()
return HttpResultResponse(data=url)
else:
field_head = ['订单号', '出入账方式', '礼物类型', '金额', '时间']
data = crud.delete_specify_field(statement_list, data.unique_tag)
# return statement_crud.data_to_file(db, data, "财务明细", headers, field_head)
return TableToFile(db, data, "财务明细", headers, field_head).main_method()
url = TableToFile(db, data, "财务明细", headers, field_head).main_method()
return HttpResultResponse(data=url)
@router.get("/finance/fourth/details")
......@@ -125,14 +131,15 @@ def finance_fourth_info(page: int,
guild_id: Optional[str] = "", # 针对公会账户,没有uuid,传公会id过来,再查uuid。
time: Optional[str] = "",
type: Optional[int] = None,
reference_type: Optional[str] = "",
gift_type: Optional[str] = "",
unique_tag: Optional[str] = "",
amount_type: Optional[str] = "",
token=Depends(login_required)):
"""账户财务明细 第四层"""
if not all([time, unique_tag]):
return HttpResultResponse(code=500, msg="缺少必传参数")
res, total, count = SpecificAccountQuery(page, size, uuid, time, type, reference_type, unique_tag,
guild_id).business_logic()
res, total, count = SpecificAccountQuery(page, size, uuid, time, type, gift_type, unique_tag,
guild_id, amount_type).business_logic()
return HttpResultResponse(total=total, data=res, count=count)
......@@ -203,8 +210,9 @@ def query_guilds_info(uuid: str, token=Depends(login_required)):
def read_account(date: Optional[str] = "",
unique_tag: Optional[str] = "",
account_type: Optional[str] = "",
token=Depends(login_required)):
"""月,业务类型,消费类型,出入账目统计"""
token=Depends(login_required)
):
"""月,业务类型,消费类型,出入账目统计(账户类型汇总)"""
if not date or (not unique_tag and not account_type):
return HttpResultResponse(code=500, msg=HttpMessage.MISSING_PARAMETER)
if account_type and not unique_tag:
......
......@@ -82,12 +82,12 @@ class CalculationMonthlyBill(object):
self.structure_key = []
self.query_data = []
def thead_task(self, as_list, page, size):
if as_list:
q_sql = f"SELECT reference_type, type, sum(cast(amount as decimal(20,3))) as money FROM (select reference_type,type,amount FROM {self.date} where {' and '.join(as_list)} limit {page},{size}) as a GROUP BY reference_type,type ORDER BY reference_type"
else:
q_sql = f"SELECT reference_type, type, sum(cast(amount as decimal(20,3))) as money FROM (select reference_type,type,amount FROM {self.date} limit {page},{size}) as a GROUP BY reference_type, type ORDER BY reference_type"
def thead_task(self, page, size):
q_sql = f"SELECT reference_type, type, sum(cast(amount as decimal(20,3))) as money FROM (select reference_type,type,amount FROM {self.date} limit {page},{size}) as a GROUP BY reference_type, type ORDER BY reference_type"
count_data = LinkMysql(env.DB_HISTORY).query_mysql(q_sql)
if not count_data:
count_data = []
self.query_data = self.query_data + count_data
def data_deal_with(self):
......@@ -135,7 +135,10 @@ class CalculationMonthlyBill(object):
def search_red_data(self, red_str):
"""redis缓存数据筛选"""
res_list = []
red_data_list = list(eval(red_str))
if isinstance(red_str, list):
red_data_list = red_str
else:
red_data_list = list(eval(red_str))
for reds in red_data_list:
if self.name and not self.key_type:
if self.name in reds.get('name'):
......@@ -171,7 +174,7 @@ class CalculationMonthlyBill(object):
# 创建线程
for i in range(10):
ths.append(threading.Thread(target=self.thead_task,
args=[assert_list, num*i, num]))
args=[num*i, num]))
# 启动线程
for i in range(10):
ths[i].start()
......@@ -186,6 +189,10 @@ class CalculationMonthlyBill(object):
res_all_data.append(v)
# 存入redis
red.set('business_type_sum-' + str(self.date), str(res_all_data), 1800)
if assert_list:
res_all_data = self.search_red_data(res_all_data)
else:
res_all_data = self.date
else:
if assert_list:
res_all_data = self.search_red_data(business_type_sum_data)
......
......@@ -112,6 +112,7 @@ def reference_type_total(date: str, type: str):
def abnormal_total(date: str, type: str, page: Optional[int] = None, size: Optional[int] = None,
token=Depends(login_required)):
"""异常数据详情"""
# 我们的第二个情人节,也陪你走过了春夏秋冬,从陌生的彼此走到了一起,完成了爱你所爱,喜你所喜,弃你所恶这样的转变
if not all([date, type]):
return HttpResultResponse(code=500, msg='缺少必传参数')
result, tota = crud.AbnormalDataDetails(date, type, page, size).abnormal_task()
......
......@@ -96,6 +96,7 @@ class GuildMargin(object):
}
res = send_json_rpc_request(request_data, 'Server.UserExecute.Recharge')
Logger().logger.info(f"清算recharge_user结果:{str(res)}")
res['_sql'] = []
insert_sql = f"insert into all_record_table(user_id, type, status, uuid, reference_number, money, amount_type, money_data, is_add, create_time, errmsg) " \
f"values({guild_id}, '{reference_type}', 2, '{ice_uuid}', {reference_number}, {money * 10}, 1, '保证金充值',1,{timestamp}, '{json.dumps(res)}');"
if res['status'] == 9:
......@@ -104,7 +105,7 @@ class GuildMargin(object):
if res['status'] == False:
insert_sql = f"insert into all_record_table(user_id, type, status, uuid, reference_number, money, amount_type, money_data, is_add, create_time, errmsg) " \
f"values({guild_id}, '{reference_type}', 3, '{ice_uuid}', {reference_number}, {money * 10}, 1, '保证金充值',1,{timestamp}, '{json.dumps(res)}');"
print(f'保证金错误,订单号:{reference_number}')
Logger().logger.info(f'保证金错误,订单号:{reference_number}')
LinkMysql(env.DB_3YV2).perform_mysql(insert_sql)
def guild_authority(self, guild_id, amount, guild_info):
......
This diff is collapsed.
......@@ -7,10 +7,12 @@ from app.api.account import schemas as acc_schemas
from app import get_db
from fastapi import Depends, APIRouter, File, Request
from sqlalchemy.orm import Session
from app.api.statement.crud import RechargeStatement, WithdrawStatement, get_menu_list, get_menu_config
from app.api.statement.crud import RechargeStatement, WithdrawStatement, get_menu_list, get_menu_config, \
BlindBoxRecharge
from app.api.statement.guild import GuildSet, paymentset_guild_data, outon_account_data, accout_list_data, \
query_uuid_or_user_number, account_money, transfer_money, transfer_query, GuildSettlementAdd, GuildSettlementmodify
from app.api.statement.schemas import PaymentWithdrawalList, PaymentAdd, PaymentAccountlList, UserNumber, CreateBill
from libs.export import TableToFile
from libs.img_code import new_upload_file, random_number
from libs.result_format import HttpResultResponse, HttpMessage
from libs.token_verify import login_required
......@@ -27,20 +29,18 @@ def statement_recharge_list(request: Request,db: Session = Depends(get_db),page:
return HttpResultResponse(code=500, msg='时间为必传参数')
query_params = request.query_params
menu_id=query_params.getlist("menu_id[]")
total,statement_list,money= RechargeStatement().query_data(db,page,size,order_number,uuid,sid,start_time,end_time,types,menu_id,month_type,'')
return HttpResultResponse(total=total,count=float(money),data=statement_list)
total,statement_list,money, sun_m = RechargeStatement().query_data(db,page,size,order_number,uuid,sid,start_time,end_time,types,menu_id,month_type,'')
return HttpResultResponse(total=total,count=float(money),data=statement_list, sum_money=sun_m)
@router.post("/derive/excel")
def statement_derive_excel(request:Request,data: schemas.StatementList,db: Session = Depends(get_db),token=Depends(login_required)):
"""充值报表导出"""
header_list = request.get("headers")
export_status =1
total,statement_list,money= RechargeStatement().query_data(db, data.page, data.size, data.order_number, data.uuid,
data.sid, data.start_time, data.end_time, data.types,
data.menu_id, data.month_type,export_status)
field_list = ["id", "uuid", "充值金额(元)", "支付时间", "类型", "订单号"]
return crud.data_to_file(db, statement_list, "充值报表", header_list, field_list)
statement_list = RechargeStatement().get_statements(data)
field_list = ["用户Id", "昵称", "充值金额(元)", "充值状态", "渠道", "商户订单号", "充值时间"]
url = TableToFile(db, statement_list, "充值报表", header_list, field_list).main_method()
return HttpResultResponse(data=url)
@router.get("/userWithdrawal/list")
......@@ -218,7 +218,7 @@ def recovery_fix(data: acc_schemas.RecoveryupdateTable, token=Depends(login_requ
@router.post("/menu/add")
def menu_add(data: schemas.MenuAdd, db: Session = Depends(get_db), token=Depends(login_required)):
"""新增财务菜单配置"""
"""新增财务菜单配置"""
db_user = crud.get_menu_name(db,menu_name=data.menu_name)
if db_user:
return HttpResultResponse(code=400, msg=HttpMessage.TYPE_NAME)
......@@ -240,7 +240,7 @@ def menu_edit(data: schemas.MenuEdit,db: Session = Depends(get_db),page: Optiona
"""修改配置列表"""
db_menu = crud.get_menu_id(db, id=data.id)
if db_menu:
db_menu = crud.get_menu_update(db, data)
db_menu = crud.get_menu_update(db,data)
else:
return HttpResultResponse(code=400, msg=HttpMessage.MENU_NOT_EXIST)
return HttpResultResponse(data=db_menu)
......@@ -256,7 +256,7 @@ def menu_delte(id: Optional[int] = '',db: Session = Depends(get_db),page: Option
@router.get("/menu/config")
def menu_list(db: Session = Depends(get_db),menu_type: Optional[int] = ""):
"""菜单配置下拉"""
menu_list = get_menu_config(db, menu_type)
menu_list = get_menu_config(db,menu_type)
return HttpResultResponse(data=menu_list)
......@@ -271,4 +271,75 @@ def guild_add(data: schemas.GuildAddLog, db: Session = Depends(get_db)):
def guild_modify(db: Session = Depends(get_db)):
"""公会结算同步"""
code, data = GuildSettlementmodify(db)
return HttpResultResponse(code=code, msg=HttpMessage.SUCCESS)
\ No newline at end of file
return HttpResultResponse(code=code, msg=HttpMessage.SUCCESS)
@router.get("/userWithdrawal/excel")
def user_withdrawal_excel(request: Request,
db: Session = Depends(get_db),
page: Optional[int] = 1,
size: Optional[int] = 9999999,
uuid: Optional[str] = '',
status: Optional[int] = '',
start_time: Optional[str] = '',
end_time: Optional[str] = "",
month_type: Optional[int] = "",
token=Depends(login_required)):
"""用户提现列表导出"""
query_params = request.query_params
menu_id = query_params.getlist("menu_id[]")
header_list = request.get("headers")
to, statement_list, mo, re = WithdrawStatement().get_user_withdraw_cash(db, page, 9999999, uuid, status, start_time,
end_time, month_type, menu_id)
field_list = ["订单号", "提现金额", "业务类型", "uuid", "账户类型", "提现时间"]
res_data = crud.user_data_handle(statement_list)
url = TableToFile(db, res_data, "用户提现", header_list, field_list).main_method()
return HttpResultResponse(data=url)
@router.get("/guildWithdrawal/excel")
def guild_withdrawal_excel(request: Request,
db: Session = Depends(get_db),
page: Optional[int] = 1,
size: Optional[int] = 9999999,
guild_id: Optional[int] = '',
status: Optional[int] = '',
start_time: Optional[str] = '',
end_time: Optional[str] = "",
month_type: Optional[int] = "",
token=Depends(login_required)):
"""公会提现导出"""
query_params = request.query_params
menu_id=query_params.getlist("menu_id[]")
header_list = request.get("headers")
res = WithdrawStatement().get_guild_withdraw_cash(db,page,9999999,guild_id,status,start_time,end_time,month_type,menu_id)
field_list = ["提现时间", "处理时间", "公会ID", "公会名称", "提现金额", "扣除金额", "实际到账金额"]
res_data = crud.guild_data_handle(res[1])
url = TableToFile(db, res_data, "公会提现", header_list, field_list).main_method()
return HttpResultResponse(data=url)
@router.get("/blind_box/recharge")
def blind_box_recharge(request: Request, token=Depends(login_required)):
"""盲盒充值"""
query_params = request.query_params
if not all([query_params.get('start_time'), query_params.get('end_time')]):
return HttpResultResponse(code=500, msg='时间为必传参数')
data, money, count = BlindBoxRecharge(query_params).box_recharge()
return HttpResultResponse(total=count, money=money, data=data)
@router.get("/blind_box/excel")
def blind_box_excel(request: Request,
db: Session = Depends(get_db),
token=Depends(login_required)):
"""盲盒充值导出"""
query_params = request.query_params
header_list = request.get("headers")
if not all([query_params.get('start_time'), query_params.get('end_time')]):
return HttpResultResponse(code=500, msg='时间为必传参数')
data, _, _ = BlindBoxRecharge(query_params).box_recharge(1)
field_list = ["订单号", "商户订单号", "渠道", "UUID", "昵称", "充值状态", "业务类型", "充值金额(元)", "充值时间"]
res_data = crud.blind_box_handle(data)
url = TableToFile(db, res_data, "盲盒充值", header_list, field_list).main_method()
return HttpResultResponse(data=url)
......@@ -20,7 +20,7 @@ def get_user(db, username: str):
try:
result = db.query(users.User).filter(users.User.username == username).first()
except Exception as e:
result = db.query(users.User).filter(users.User.username == username).first()
return None
return result
......@@ -91,6 +91,8 @@ def authenticate_pwd(db: Session, form_data: GoogleLogin):
"""只验证密码"""
user_data = get_user(db=db, username=form_data.username)
# 如果密码不正确,也是返回False
if not user_data:
return {"result": False, "msg": "密码错误"}
md5_password = md5(form_data.password)
if md5_password != user_data.hashed_password:
return {"result": False, "msg": "密码错误"}
......
......@@ -34,6 +34,7 @@ config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Toke
client = CosS3Client(config)
COS_PATH = 'https://fj-dc-test-1256890024.cos.ap-guangzhou.myqcloud.com' # 测试
COS_RERURN_PATH = '/images/'
tencent = apo.get("TencentCloud")
class Env(BaseSettings):
......@@ -57,19 +58,20 @@ class TestingEnv(Env):
NACOS_URL = YAML_DATA.get('config_url')
NACOSCONFIG = "show=all&dataId=fj-finance-test&group=DEFAULT_GROUP&tenant=cw-test&namespaceId=cw-test"
NACOS_NAME = YAML_DATA.get('name')
NACOS_PWD = YAML_DATA.get('password')
NACOS_PWD = YAML_DATA.get('pwd')
DB_HISTORY = apo.get('history')
DB_3YV2 = apo.get('business')
Redis = apo.get('redis')
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256"
PASSWORD: str = "fj123456"
PASSWORD: str = "fj147sy258.#"
oss_url = 'http://oss.3yakj.com/application_static_data'
CLEARING_CENTER_URL: str = 'http://106.55.103.148:6464/'
CLEARING_CENTER_HOST: str = '106.55.103.148'
CLEARING_CENTER_PORT: int = 5454
KEY = "dK8tZ1jM0wA6oE3j"
PHP_URL = "http://106.55.103.148:8787"
class ProdEnv(Env):
......@@ -80,7 +82,7 @@ class ProdEnv(Env):
PEM_PATH = os.path.join(SITE_ROOT_YAML, "config", "")
YAML_DATA = apo.get('yaml')
NACOS_NAME = YAML_DATA.get('name')
NACOS_PWD = YAML_DATA.get('password')
NACOS_PWD = YAML_DATA.get('pwd')
LOGIN_URL = YAML_DATA.get('login_url')
NACOS_URL = YAML_DATA.get('config_url')
NACOSCONFIG = "dataId=fj-finance&group=DEFAULT_GROUP&namespaceId=cw-pro&tenant=cw-pro&show=all&username=fj_finance"
......@@ -89,18 +91,19 @@ class ProdEnv(Env):
Redis = apo.get('redis')
SECRET_KEY: str = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM: str = "HS256"
PASSWORD: str = "fj123456"
PASSWORD: str = "fj147sy258.#"
CLEARING_CENTER_URL: str = 'http://219.152.95.226:5454/'
CLEARING_CENTER_HOST: str = '219.152.95.226'
CLEARING_CENTER_PORT: int = 5454
KEY = "dK8tZ1jM0wA6oE3j"
PHP_URL = "http://219.152.95.226:6750"
# env = TestingEnv() # 开发环境
env = ProdEnv() # 生产环境
env = ProdEnv() # 生产环境docke
redis_data = env.Redis
pool = redis.ConnectionPool(host=redis_data.get("host"), port=redis_data.get("port"), password=redis_data.get("password"),
db=redis_data.get("redis_db"), decode_responses=True)
red = redis.StrictRedis(connection_pool=pool)
\ No newline at end of file
red = redis.StrictRedis(connection_pool=pool)
......@@ -21,9 +21,9 @@ SQLALCHEMY_DATABASE_URL = f'sqlite:///{modul_path}/sql_app.db'
# SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True, connect_args={'check_same_thread': False}
# )
#数据中心测试服
engine = create_engine('mysql+mysqldb://data_center:KCMBfAjeJhbJXsSe@43.138.132.9:3398/finance')
engine = create_engine('mysql+mysqldb://data_center:KCMBfAjeJhbJXsSe@43.138.132.9:3398/finance', pool_size=20)
#财务原数据数据库
engine_3yakj = create_engine('mysql+mysqldb://root:c1ea602311a369f6@106.55.103.148:3398/3yakj_v2')
engine_3yakj = create_engine('mysql+mysqldb://root:c1ea602311a369f6@106.55.103.148:3398/3yakj_v2', pool_size=20)
# 数据库 session 类,用于创建 session 实例
# autoflush 是指发送数据库语句到数据库,但数据库不一定执行写入到磁盘
......
#version: "3"
#services:
# app:
# restart: always
# container_name: financial-system
# build: .
# ports:
# - "8001:8001"
# volumes:
# - /www/wwwroot/financial-system/:/financial-system/
# stdin_open: true
# command: python main.py
version: "3"
version: '3'
services:
financial-system:
app:
restart: always
container_name: financial-system
build:
context: ./../_base/python-38/
image: python-38
build: .
ports:
- "8009:8009"
deploy:
resources:
limits:
cpus: "4.00"
memory: 16G
reservations:
memory: 500M
- '8009:8009'
volumes:
- /www/python-38/site-packages/financial-system/:/usr/local/lib/python3.8/site-packages
- /www/wwwroot/financial-system/:/var/www/
logging:
driver: json-file
options:
max-size: "20m"
max-file: "10"
stdin_open: true
command: >
sh -c "python -m ensurepip &&
python -m pip install --upgrade pip &&
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple &&
python main.py"
\ No newline at end of file
- .:/financial-system
command: python main.py
......@@ -60,3 +60,31 @@ class LinkMysql(object):
# rb_info = pika.PlainCredentials(rb.get('username'), rb.get('password'))
# self.connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(rb.get('host'), rb.get('port'), rb.get('vhost'), rb_info))
# self.channel = self.connection.channel()
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
# 连接服务器
class LinkMonitor(object):
def __init__(self):
conn = CosConfig(Region=env.TX_REGION, SecretId=env.TX_SECRET_ID, SecretKey=env.TX_SECRET_KEY,
Token=None, Scheme='https')
self.client = CosS3Client(conn)
# 断点上传
def upload_block(self, path, key, folder=""):
# 正常情况日志级别使用INFO,需要定位时可以修改为DEBUG,此时SDK会打印和服务端的通信信息
# logging.basicConfig(level=logging.INFO, stream=sys.stdout)
token = None
scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
response = self.client.upload_file(
Bucket=env.TX_BUCKET,
Key=folder + key + '.xlsx',
LocalFilePath=path,
EnableMD5=False,
progress_callback=None
)
import datetime
import json
import time
import requests
from core.config.env import env
from libs import functions
from libs.log_utils import Logger
def retry(number=1):
"""
最外层传递装饰器参数(这一层可以不写)
中间层传递被装饰器装饰的函数,这里相当于get()
内层传递被装饰器装饰函数的参数,这里是get()的参数a
装饰器作用:限制request重复请求的次数
"""
def outer(func):
def inner(database_name, mon_sql, field_list):
num = 1
res_data = 0
for i in range(number):
try:
res_data = func(database_name, mon_sql, field_list)
break
except Exception as e:
Logger(21).logger.info(f"------------JDBC超时或者失败{i + 1}次-----------")
time.sleep(1)
num += 1
if num == 3:
now_time = functions.get_now_datetime()
environment = "正式环境"
if 'dev' in env.TX_URL:
environment = "测试环境"
msg = {
"error_time": now_time,
"error_env": environment,
"error_line": str(e.__traceback__.tb_frame.f_globals["__file__"]) + ':' + str(
e.__traceback__.tb_lineno),
"error_info": str(e),
"query_sql": str(mon_sql)
}
Logger(21).logger.error("腾讯数据库查询异常:" + str(msg))
return res_data
return res_data
return inner
return outer
@retry(number=2)
def tx_query_num(database_name, mon_sql, field_list):
"""
查询腾讯sql总数语句
:param database_name 数据库
:param mon_sql 查询sql
:param field_list 要返回的字段
:return 0
"""
data = {
"sql_sentence": mon_sql,
"database_name": database_name,
"expiretime": 10,
"fields": field_list
}
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
res = requests.post(url=env.TX_URL, json=data)
end_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if res.status_code != 200:
raise Exception(res.text)
content = json.loads(res.text)
Logger(21).logger.info(f"数据库查询sql:{str(mon_sql)},结果:{str(content)},开始时间:{str(start_time)},结束时间:{str(end_time)}")
if content.get("code") == 200:
row = content.get("rows")[0]
amount = row.get("amount")
if not amount:
amount = 0
else:
amount = 0
return amount
@retry(number=2)
def tx_query_list(database_name, mon_sql, field_list):
"""
查询sql语句
:param database_name
:param mon_sql
:param field_list
:return []
"""
data = {
"sql_sentence": mon_sql,
"database_name": database_name,
"expiretime": 10,
"fields": field_list
}
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
res = requests.post(url=env.TX_URL, json=data)
end_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if res.status_code != 200:
raise Exception(res.text)
content = json.loads(res.text)
Logger(21).logger.info(f"---结果:{content}--")
Logger(21).logger.info(
f"数据库查询sql:{database_name}---结果:{content.get('code')}---,开始时间:{start_time},结束时间:{end_time}")
if res.status_code == 200:
row = content.get("rows")
if not row:
row = []
else:
row = []
return row
import math
import os
import random
import time
import openpyxl
import threading
import pandas as pd
from fastapi import Response
from app.api.statement.guild import query_token
from starlette.responses import StreamingResponse
from datetime import datetime
from app.api.export import crud
from core.config.env import env
from libs.db_link import LinkMonitor
from libs.log_utils import Logger
......@@ -101,6 +107,137 @@ class TableToFile(object):
bk.to_excel(writer, sheet_name=sheet_name, index=False)
self.lock.release()
def th_task(self, branch_data, f_name, num):
try:
bk = pd.DataFrame(branch_data)
if branch_data[0].get('create_time'):
if isinstance(branch_data[0]['create_time'], int):
bk['create_time'] = bk['create_time'].apply(
lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x)))
bk.columns = self.field_list # 修改pandas头
with pd.ExcelWriter(f'static/{f_name}/{self.name}-{num}.xlsx') as writer:
bk.to_excel(writer, sheet_name='Sheet1', index=False)
except Exception as e:
Logger(40).logger.error(f'导出线程{threading.Thread().getName()}失败,原因:{e}')
def th_number(self, total):
if total <= 500000:
strip = 5
data = math.ceil(total/5)
elif 500000 < total <= 1000000:
strip = 7
data = math.ceil(total/strip)
else:
# 当数量超过100w条时,每个xlsx只写10w条数据
data = 100000
strip = math.ceil(total/data)
return strip, data
def compress_folder(self, name):
import zipfile
# 定义需要压缩的文件夹路径和名称
directory_name = f"static/{name}"
zip_file_name = f"static/{self.name}.zip"
# 创建 ZipFile 对象,用于写入压缩文件
with zipfile.ZipFile(zip_file_name, 'w', compression=zipfile.ZIP_DEFLATED) as zip_file:
# 遍历需要压缩的文件夹中的所有子目录和文件
for root, dirs, files in os.walk(directory_name):
for file in files:
# 构造文件的完整路径
file_path = os.path.join(root, file)
# 在压缩文件中添加文件
zip_file.write(file_path)
return zip_file_name
# def main_method(self):
# """主函数"""
# Logger().logger.info('开始导出')
# user = query_token(self.db, self.header)
# params = {"source": self.name, "method": "data_to_file", "status": 1}
# if len(self.data) == 0:
# params["status"] = 3
# crud.create_export_data(self.db, params, user)
# Logger().logger.info(f'导出没有数据')
# return None
# folder_name = datetime.now().strftime('%m%d%H%M%S')
# try:
# os.mkdir(f"static/{folder_name}")
# Logger().logger.info("文件夹已创建!")
# except OSError as error:
# uid = random.randint(1, 1000)
# Logger().logger.info(f"无法创建目录:{folder_name},原因:{error},重新创建随机文件夹")
# folder_name = folder_name + str(uid)
# os.mkdir(f"static/{folder_name}")
# # 判断多少条线程
# number, count = self.th_number(len(self.data))
# Logger().logger.info(f"开启线程:{number}, 每个数量:{count}")
# # 起线程
# ths = []
# for x in range(number):
# ths.append(threading.Thread(target=self.th_task,
# args=[self.data[x * count:(1 + x) * count], folder_name, x]))
# # 启动线程
# for y in range(number):
# ths[y].start()
# # 等待所有线程完成
# for z in range(number):
# ths[z].join()
# Logger().logger.info(f"线程结束,压缩文件!!!")
# zip_folder = self.compress_folder(folder_name)
# # 记录导出
# crud.create_export_data(self.db, params, user)
# with open(zip_folder, 'rb') as f:
# data = f.read()
# response = Response(content=data)
# response.headers["Content-Disposition"] = "attachment; filename=example.zip"
# Logger().logger.info(f"返回压缩文件!!!")
# return response
# def main_method(self):
# """主函数"""
# Logger().logger.info('开始导出')
# user = query_token(self.db, self.header)
# params = {"source": self.name, "method": "data_to_file", "status": 1}
# if len(self.data) == 0:
# params["status"] = 3
# crud.create_export_data(self.db, params, user)
# Logger().logger.info(f'导出没有数据')
# return None
# try:
# bk = pd.DataFrame(self.data)
# if self.data[0].get('create_time'):
# if isinstance(self.data[0]['create_time'], int):
# bk['create_time'] = bk['create_time'].apply(
# lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x)))
# bk.columns = self.field_list # 修改pandas头
# write_data = bk.to_dict(orient='records')
# with pd.ExcelWriter(f'static/{self.name}.xlsx') as writer:
# # bk.to_excel(writer, sheet_name='Sheet1', index=False)
# threads = []
# rows_per_thread = math.ceil(len(write_data) / 5)
# for i in range(5):
# sheet_name = 'sheet' + str(i + 1)
# threads.append(threading.Thread(target=self.thread_task,
# args=[bk.iloc[i * rows_per_thread: rows_per_thread * (i+1)], writer, sheet_name]))
# # 启动线程
# for y in threads:
# y.start()
# # 等待所有线程完成
# for z in threads:
# z.join()
# # 记录导出
# crud.create_export_data(self.db, params, user)
# import zipfile
# with zipfile.ZipFile('ceshi.zip', 'w') as zip:
# # 将指定文件添加到压缩文件中
# zip.write(f"static/{self.name}.xlsx")
# return
# except Exception as e:
# Logger().logger.info(f'导出异常:{str(e)}')
# params["status"] = 2
# crud.create_export_data(self.db, params, user)
def main_method(self):
"""主函数"""
Logger().logger.info('开始导出')
......@@ -121,25 +258,29 @@ class TableToFile(object):
write_data = bk.to_dict(orient='records')
with pd.ExcelWriter(f'static/{self.name}.xlsx') as writer:
# bk.to_excel(writer, sheet_name='Sheet1', index=False)
threads = []
rows_per_thread = math.ceil(len(write_data) / 5)
for i in range(5):
sheet_name = 'sheet' + str(i + 1)
threads.append(threading.Thread(target=self.thread_task,
args=[bk.iloc[i * rows_per_thread: rows_per_thread * (i+1)], writer, sheet_name]))
# 启动线程
for y in threads:
y.start()
# 等待所有线程完成
for z in threads:
z.join()
file = open(writer, 'rb')
if len(self.data) < 500:
bk.to_excel(writer, sheet_name='sheet', index=False)
else:
threads = []
rows_per_thread = math.ceil(len(write_data) / 5)
for i in range(5):
sheet_name = 'sheet' + str(i + 1)
threads.append(threading.Thread(target=self.thread_task,
args=[bk.iloc[i * rows_per_thread: rows_per_thread * (i+1)], writer, sheet_name]))
# 启动线程
for y in threads:
y.start()
# 等待所有线程完成
for z in threads:
z.join()
SITE_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LinkMonitor().upload_block(f'{SITE_ROOT}/static/{self.name}.xlsx', self.name, 'finance/')
# 记录导出
crud.create_export_data(self.db, params, user)
return StreamingResponse(file,
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
except Exception as e:
Logger().logger.info(f'导出异常:{str(e)}')
params["status"] = 2
crud.create_export_data(self.db, params, user)
return None
return f"https://{env.TX_BUCKET}.cos.ap-guangzhou.myqcloud.com/finance/{self.name}.xlsx"
......@@ -266,7 +266,8 @@ def AES_Decrypt(data):
Logger(40).logger.error(f"php数据解密异常:{str(e)},数据:{plaintext}")
coding_data = str(plaintext, encoding="utf-8")
num = coding_data.index(']')
return list(eval(coding_data[:num + 1]))
missing_value = coding_data[:num + 1]
return list(eval(missing_value.replace('null', '""')))
return res_data
......
......@@ -13,20 +13,20 @@ session = requests.session()
class imageCode():
'''
"""
验证码处理
'''
"""
def rndColor(self):
'''随机颜色'''
"""随机颜色"""
return (random.randint(32, 127), random.randint(32, 127), random.randint(32, 127))
def geneText(self):
'''生成4位验证码'''
"""生成4位验证码"""
return ''.join(random.sample(string.ascii_letters + string.digits, 4)) # ascii_letters是生成所有字母 digits是生成所有数字0-9
def drawLines(self, draw, num, width, height):
'''划线'''
"""划线"""
for num in range(num):
x1 = random.randint(0, width / 2)
y1 = random.randint(0, height / 2)
......@@ -35,7 +35,7 @@ class imageCode():
draw.line(((x1, y1), (x2, y2)), fill='black', width=1)
def getVerifyCode(self):
'''生成验证码图形'''
"""生成验证码图形"""
code = self.geneText()
# 图片大小120×50
width, height = 120, 50
......@@ -67,9 +67,9 @@ class imageCode():
return img.decode('utf-8')
def new_upload_file(file_object,filename):
'''图片上传cos'''
filename = COS_RERURN_PATH+filename+'.png'
def new_upload_file(file_object, filename):
"""图片上传cos"""
filename = COS_RERURN_PATH + filename + '.png'
try:
response = client.put_object(
Bucket=Bucket,
......@@ -86,4 +86,4 @@ def new_upload_file(file_object,filename):
def random_number():
randomId = ''.join([str(random.randint(1, 999999)).zfill(3) for _ in range(2)])
return randomId
\ No newline at end of file
return randomId
......@@ -23,31 +23,10 @@ app.add_middleware(
allow_headers=['*']) # 允许跨域的headers,可以用来鉴别来源等作用。
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
hs = request.headers
token = hs.get("authorization")
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
url_list = ['/api/users/imgCode', '/api/users/login', '/api/users/goodleCode', '/api/users/googleLogin']
if request.url.path in url_list:
return response
if token:
try:
payload = jwt.decode(token.replace('Bearer','').replace(' ',''), env.SECRET_KEY, algorithms=[env.ALGORITHM])
except Exception as e:
print(e)
return response
timestamp = payload.get("exp")
access_token_expires = timedelta(hours=time_format(timestamp))
create_access_token({'username':payload.get("xup"),'password':payload.get("password")},expires_delta=access_token_expires) #更新token时间
return response
app.include_router(api_router, prefix="/api") # 路由
if __name__ == '__main__':
create_yaml()
uvicorn.run(app=app, host="0.0.0.0", port=8009)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment