329 lines
12 KiB
Python
329 lines
12 KiB
Python
"""
|
||
导出模板和字段关系到Excel表格
|
||
用于汇总整理模板和字段关系,后续可以基于这个Excel表格新增数据并增加导入脚本
|
||
"""
|
||
import pymysql
|
||
import os
|
||
from dotenv import load_dotenv
|
||
from openpyxl import Workbook
|
||
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
|
||
from openpyxl.utils import get_column_letter
|
||
from datetime import datetime
|
||
import re
|
||
|
||
# 加载环境变量
|
||
load_dotenv()
|
||
|
||
# 数据库配置
|
||
DB_CONFIG = {
|
||
'host': os.getenv('DB_HOST'),
|
||
'port': int(os.getenv('DB_PORT', 3306)),
|
||
'user': os.getenv('DB_USER'),
|
||
'password': os.getenv('DB_PASSWORD'),
|
||
'database': os.getenv('DB_NAME'),
|
||
'charset': 'utf8mb4'
|
||
}
|
||
|
||
TENANT_ID = 615873064429507639
|
||
|
||
|
||
def clean_query_result(data):
|
||
"""清理查询结果,将 bytes 类型转换为字符串"""
|
||
if isinstance(data, bytes):
|
||
if len(data) == 1:
|
||
return int.from_bytes(data, byteorder='big')
|
||
try:
|
||
return data.decode('utf-8')
|
||
except UnicodeDecodeError:
|
||
return data.decode('utf-8', errors='ignore')
|
||
elif isinstance(data, dict):
|
||
return {key: clean_query_result(value) for key, value in data.items()}
|
||
elif isinstance(data, list):
|
||
return [clean_query_result(item) for item in data]
|
||
elif isinstance(data, (int, float, str, bool, type(None))):
|
||
return data
|
||
else:
|
||
return str(data)
|
||
|
||
|
||
def extract_template_category(file_path, template_name):
|
||
"""
|
||
从文件路径或模板名称提取模板的上级分类
|
||
例如:/615873064429507639/TEMPLATE/2025/12/2-初核模版/2.谈话审批/走读式谈话审批/2谈话审批表.docx
|
||
提取为:2-初核模版/2.谈话审批/走读式谈话审批
|
||
"""
|
||
category = ""
|
||
|
||
# 首先尝试从文件路径提取
|
||
if file_path:
|
||
# 移除开头的斜杠和租户ID部分
|
||
path = file_path.lstrip('/')
|
||
# 移除租户ID/TEMPLATE/年份/月份/部分
|
||
pattern = r'^\d+/TEMPLATE/\d+/\d+/(.+)'
|
||
match = re.match(pattern, path)
|
||
if match:
|
||
full_path = match.group(1)
|
||
# 移除文件名,只保留目录路径
|
||
if '/' in full_path:
|
||
category = '/'.join(full_path.split('/')[:-1])
|
||
|
||
# 如果路径格式不匹配,尝试其他方式
|
||
if not category and ('template_finish' in path.lower() or '初核' in path or '谈话' in path or '函询' in path):
|
||
# 尝试提取目录结构
|
||
parts = path.split('/')
|
||
result_parts = []
|
||
for part in parts:
|
||
if any(keyword in part for keyword in ['初核', '谈话', '函询', '模版', '模板']):
|
||
result_parts.append(part)
|
||
if result_parts:
|
||
category = '/'.join(result_parts[:-1]) if len(result_parts) > 1 else result_parts[0]
|
||
|
||
# 如果从路径无法提取,尝试从模板名称推断
|
||
if not category and template_name:
|
||
# 根据模板名称中的关键词推断分类
|
||
if '初核' in template_name:
|
||
if '谈话' in template_name:
|
||
category = '2-初核模版/2.谈话审批'
|
||
elif '请示' in template_name or '审批' in template_name:
|
||
category = '2-初核模版/1.初核请示'
|
||
elif '结论' in template_name or '报告' in template_name:
|
||
category = '2-初核模版/3.初核结论'
|
||
else:
|
||
category = '2-初核模版'
|
||
elif '谈话' in template_name:
|
||
if '函询' in template_name:
|
||
category = '1-谈话函询模板/函询模板'
|
||
else:
|
||
category = '1-谈话函询模板/谈话模版'
|
||
elif '函询' in template_name:
|
||
category = '1-谈话函询模板/函询模板'
|
||
|
||
return category
|
||
|
||
|
||
def get_all_templates_with_fields():
|
||
"""
|
||
获取所有模板及其关联的输入和输出字段
|
||
|
||
Returns:
|
||
list: 模板列表,每个模板包含字段信息
|
||
"""
|
||
conn = pymysql.connect(**DB_CONFIG)
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
# 查询所有启用的模板
|
||
cursor.execute("""
|
||
SELECT
|
||
fc.id AS template_id,
|
||
fc.name AS template_name,
|
||
fc.file_path
|
||
FROM f_polic_file_config fc
|
||
WHERE fc.tenant_id = %s
|
||
AND fc.state = 1
|
||
ORDER BY fc.name
|
||
""", (TENANT_ID,))
|
||
|
||
templates = cursor.fetchall()
|
||
templates = [clean_query_result(t) for t in templates]
|
||
|
||
result = []
|
||
|
||
for template in templates:
|
||
template_id = template['template_id']
|
||
template_name = template['template_name']
|
||
file_path = template.get('file_path', '')
|
||
|
||
# 提取模板上级分类
|
||
template_category = extract_template_category(file_path, template_name)
|
||
|
||
# 查询该模板关联的输入字段
|
||
cursor.execute("""
|
||
SELECT
|
||
f.id AS field_id,
|
||
f.name AS field_name,
|
||
f.filed_code AS field_code
|
||
FROM f_polic_file_field fff
|
||
INNER JOIN f_polic_field f ON fff.filed_id = f.id
|
||
WHERE fff.file_id = %s
|
||
AND fff.tenant_id = %s
|
||
AND fff.state = 1
|
||
AND f.state = 1
|
||
AND f.field_type = 1
|
||
ORDER BY f.name
|
||
""", (template_id, TENANT_ID))
|
||
|
||
input_fields = cursor.fetchall()
|
||
input_fields = [clean_query_result(f) for f in input_fields]
|
||
|
||
# 查询该模板关联的输出字段
|
||
cursor.execute("""
|
||
SELECT
|
||
f.id AS field_id,
|
||
f.name AS field_name,
|
||
f.filed_code AS field_code
|
||
FROM f_polic_file_field fff
|
||
INNER JOIN f_polic_field f ON fff.filed_id = f.id
|
||
WHERE fff.file_id = %s
|
||
AND fff.tenant_id = %s
|
||
AND fff.state = 1
|
||
AND f.state = 1
|
||
AND f.field_type = 2
|
||
ORDER BY f.name
|
||
""", (template_id, TENANT_ID))
|
||
|
||
output_fields = cursor.fetchall()
|
||
output_fields = [clean_query_result(f) for f in output_fields]
|
||
|
||
# 格式化字段信息
|
||
input_fields_str = '; '.join([f"{f['field_name']}({f['field_code']})" for f in input_fields])
|
||
output_fields_str = '; '.join([f"{f['field_name']}({f['field_code']})" for f in output_fields])
|
||
|
||
result.append({
|
||
'template_id': template_id,
|
||
'template_name': template_name,
|
||
'template_category': template_category,
|
||
'input_fields': input_fields,
|
||
'output_fields': output_fields,
|
||
'input_fields_str': input_fields_str,
|
||
'output_fields_str': output_fields_str,
|
||
'input_field_count': len(input_fields),
|
||
'output_field_count': len(output_fields)
|
||
})
|
||
|
||
return result
|
||
|
||
finally:
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
def create_excel_file(templates_data, output_file='template_fields_export.xlsx'):
|
||
"""
|
||
创建Excel文件
|
||
|
||
Args:
|
||
templates_data: 模板数据列表
|
||
output_file: 输出文件名
|
||
"""
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.title = "模板字段关系"
|
||
|
||
# 设置表头
|
||
headers = ['模板ID', '模板名称', '模板上级', '输入字段', '输出字段', '输入字段数量', '输出字段数量']
|
||
ws.append(headers)
|
||
|
||
# 设置表头样式
|
||
header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
|
||
header_font = Font(bold=True, color="FFFFFF", size=11)
|
||
header_alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
|
||
border = Border(
|
||
left=Side(style='thin'),
|
||
right=Side(style='thin'),
|
||
top=Side(style='thin'),
|
||
bottom=Side(style='thin')
|
||
)
|
||
|
||
for col_num, header in enumerate(headers, 1):
|
||
cell = ws.cell(row=1, column=col_num)
|
||
cell.fill = header_fill
|
||
cell.font = header_font
|
||
cell.alignment = header_alignment
|
||
cell.border = border
|
||
|
||
# 填充数据
|
||
data_font = Font(size=10)
|
||
data_alignment = Alignment(horizontal="left", vertical="top", wrap_text=True)
|
||
|
||
for template in templates_data:
|
||
row = [
|
||
template['template_id'],
|
||
template['template_name'],
|
||
template['template_category'],
|
||
template['input_fields_str'],
|
||
template['output_fields_str'],
|
||
template['input_field_count'],
|
||
template['output_field_count']
|
||
]
|
||
ws.append(row)
|
||
|
||
# 设置数据行样式
|
||
for col_num in range(1, len(headers) + 1):
|
||
cell = ws.cell(row=ws.max_row, column=col_num)
|
||
cell.font = data_font
|
||
cell.alignment = data_alignment
|
||
cell.border = border
|
||
|
||
# 设置列宽
|
||
ws.column_dimensions['A'].width = 18 # 模板ID
|
||
ws.column_dimensions['B'].width = 40 # 模板名称
|
||
ws.column_dimensions['C'].width = 50 # 模板上级
|
||
ws.column_dimensions['D'].width = 60 # 输入字段
|
||
ws.column_dimensions['E'].width = 80 # 输出字段
|
||
ws.column_dimensions['F'].width = 15 # 输入字段数量
|
||
ws.column_dimensions['G'].width = 15 # 输出字段数量
|
||
|
||
# 设置行高
|
||
ws.row_dimensions[1].height = 30 # 表头行高
|
||
for row_num in range(2, ws.max_row + 1):
|
||
ws.row_dimensions[row_num].height = 60 # 数据行高
|
||
|
||
# 冻结首行
|
||
ws.freeze_panes = 'A2'
|
||
|
||
# 保存文件
|
||
wb.save(output_file)
|
||
print(f"Excel文件已生成: {output_file}")
|
||
print(f"共导出 {len(templates_data)} 个模板")
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("开始导出模板和字段关系...")
|
||
print("=" * 80)
|
||
|
||
try:
|
||
# 获取所有模板及其字段
|
||
templates_data = get_all_templates_with_fields()
|
||
|
||
if not templates_data:
|
||
print("未找到任何模板数据")
|
||
return
|
||
|
||
print(f"共找到 {len(templates_data)} 个模板")
|
||
|
||
# 生成Excel文件
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
output_file = f"template_fields_export_{timestamp}.xlsx"
|
||
create_excel_file(templates_data, output_file)
|
||
|
||
# 打印统计信息
|
||
print("\n统计信息:")
|
||
print(f" 模板总数: {len(templates_data)}")
|
||
total_input_fields = sum(t['input_field_count'] for t in templates_data)
|
||
total_output_fields = sum(t['output_field_count'] for t in templates_data)
|
||
print(f" 输入字段总数: {total_input_fields}")
|
||
print(f" 输出字段总数: {total_output_fields}")
|
||
|
||
# 打印前几个模板的信息
|
||
print("\n前5个模板预览:")
|
||
for i, template in enumerate(templates_data[:5], 1):
|
||
print(f"\n{i}. {template['template_name']}")
|
||
print(f" 上级: {template['template_category']}")
|
||
print(f" 输入字段: {template['input_field_count']} 个")
|
||
print(f" 输出字段: {template['output_field_count']} 个")
|
||
|
||
if len(templates_data) > 5:
|
||
print(f"\n... 还有 {len(templates_data) - 5} 个模板")
|
||
|
||
except Exception as e:
|
||
print(f"导出失败: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|