Commit 519198c2 authored by xianyang's avatar xianyang

优化导出问题

parent 950a59a1
import math
import os
import random
import time
import openpyxl
import threading
import pandas as pd
from fastapi import Response
from app.api.statement.guild import query_token
from starlette.responses import StreamingResponse
from datetime import datetime
from app.api.export import crud
from libs.log_utils import Logger
......@@ -101,6 +105,49 @@ class TableToFile(object):
bk.to_excel(writer, sheet_name=sheet_name, index=False)
self.lock.release()
def th_task(self, branch_data, f_name, num):
try:
bk = pd.DataFrame(branch_data)
if branch_data[0].get('create_time'):
if isinstance(branch_data[0]['create_time'], int):
bk['create_time'] = bk['create_time'].apply(
lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x)))
bk.columns = self.field_list # 修改pandas头
with pd.ExcelWriter(f'static/{f_name}/{self.name}-{num}.xlsx') as writer:
bk.to_excel(writer, sheet_name='Sheet1', index=False)
except Exception as e:
Logger(40).logger.error(f'导出线程{threading.Thread().getName()}失败,原因:{e}')
def th_number(self, total):
if total <= 500000:
strip = 5
data = math.ceil(total/5)
elif 500000 < total <= 1000000:
strip = 7
data = math.ceil(total/strip)
else:
# 当数量超过100w条时,每个xlsx只写10w条数据
data = 100000
strip = math.ceil(total/data)
return strip, data
def compress_folder(self, name):
import zipfile
# 定义需要压缩的文件夹路径和名称
directory_name = f"static/{name}"
zip_file_name = f"static/{self.name}.zip"
# 创建 ZipFile 对象,用于写入压缩文件
with zipfile.ZipFile(zip_file_name, 'w', compression=zipfile.ZIP_DEFLATED) as zip_file:
# 遍历需要压缩的文件夹中的所有子目录和文件
for root, dirs, files in os.walk(directory_name):
for file in files:
# 构造文件的完整路径
file_path = os.path.join(root, file)
# 在压缩文件中添加文件
zip_file.write(file_path)
return zip_file_name
def main_method(self):
"""主函数"""
Logger().logger.info('开始导出')
......@@ -111,35 +158,79 @@ class TableToFile(object):
crud.create_export_data(self.db, params, user)
Logger().logger.info(f'导出没有数据')
return None
folder_name = datetime.now().strftime('%m%d%H%M%S')
try:
bk = pd.DataFrame(self.data)
if self.data[0].get('create_time'):
if isinstance(self.data[0]['create_time'], int):
bk['create_time'] = bk['create_time'].apply(
lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x)))
bk.columns = self.field_list # 修改pandas头
write_data = bk.to_dict(orient='records')
with pd.ExcelWriter(f'static/{self.name}.xlsx') as writer:
# bk.to_excel(writer, sheet_name='Sheet1', index=False)
threads = []
rows_per_thread = math.ceil(len(write_data) / 5)
for i in range(5):
sheet_name = 'sheet' + str(i + 1)
threads.append(threading.Thread(target=self.thread_task,
args=[bk.iloc[i * rows_per_thread: rows_per_thread * (i+1)], writer, sheet_name]))
os.mkdir(f"static/{folder_name}")
Logger().logger.info("文件夹已创建!")
except OSError as error:
uid = random.randint(1, 1000)
Logger().logger.info(f"无法创建目录:{folder_name},原因:{error},重新创建随机文件夹")
folder_name = folder_name + str(uid)
os.mkdir(f"static/{folder_name}")
# 判断多少条线程
number, count = self.th_number(len(self.data))
Logger().logger.info(f"开启线程:{number}, 每个数量:{count}")
# 起线程
ths = []
for x in range(number):
ths.append(threading.Thread(target=self.th_task,
args=[self.data[x * count:(1 + x) * count], folder_name, x]))
# 启动线程
for y in threads:
y.start()
for y in range(number):
ths[y].start()
# 等待所有线程完成
for z in threads:
z.join()
file = open(writer, 'rb')
for z in range(number):
ths[z].join()
Logger().logger.info(f"线程结束,压缩文件!!!")
zip_folder = self.compress_folder(folder_name)
# 记录导出
crud.create_export_data(self.db, params, user)
return StreamingResponse(file,
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
except Exception as e:
Logger().logger.info(f'导出异常:{str(e)}')
params["status"] = 2
crud.create_export_data(self.db, params, user)
with open(zip_folder, 'rb') as f:
data = f.read()
response = Response(content=data)
response.headers["Content-Disposition"] = "attachment; filename=example.zip"
Logger().logger.info(f"返回压缩文件!!!")
return response
# def main_method(self):
# """主函数"""
# Logger().logger.info('开始导出')
# user = query_token(self.db, self.header)
# params = {"source": self.name, "method": "data_to_file", "status": 1}
# if len(self.data) == 0:
# params["status"] = 3
# crud.create_export_data(self.db, params, user)
# Logger().logger.info(f'导出没有数据')
# return None
# try:
# bk = pd.DataFrame(self.data)
# if self.data[0].get('create_time'):
# if isinstance(self.data[0]['create_time'], int):
# bk['create_time'] = bk['create_time'].apply(
# lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(x)))
# bk.columns = self.field_list # 修改pandas头
# write_data = bk.to_dict(orient='records')
# with pd.ExcelWriter(f'static/{self.name}.xlsx') as writer:
# # bk.to_excel(writer, sheet_name='Sheet1', index=False)
# threads = []
# rows_per_thread = math.ceil(len(write_data) / 5)
# for i in range(5):
# sheet_name = 'sheet' + str(i + 1)
# threads.append(threading.Thread(target=self.thread_task,
# args=[bk.iloc[i * rows_per_thread: rows_per_thread * (i+1)], writer, sheet_name]))
# # 启动线程
# for y in threads:
# y.start()
# # 等待所有线程完成
# for z in threads:
# z.join()
# file = open(writer, 'rb')
# # 记录导出
# crud.create_export_data(self.db, params, user)
# return StreamingResponse(file,
# media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
# except Exception as e:
# Logger().logger.info(f'导出异常:{str(e)}')
# params["status"] = 2
# crud.create_export_data(self.db, params, user)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment