Compare commits

...

2 Commits

14 changed files with 2836 additions and 40 deletions

Binary file not shown.

223
app.py
View File

@ -5,6 +5,7 @@ from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS from flask_cors import CORS
from flasgger import Swagger from flasgger import Swagger
import os import os
import pymysql
from datetime import datetime from datetime import datetime
from dotenv import load_dotenv from dotenv import load_dotenv
@ -284,6 +285,85 @@ def extract():
return error_response(2001, f"AI解析超时或发生错误: {str(e)}") return error_response(2001, f"AI解析超时或发生错误: {str(e)}")
@app.route('/api/file-configs', methods=['GET'])
def get_file_configs():
"""
获取可用的文件配置列表
用于查询可用的fileId供文档生成接口使用
---
tags:
- 字段配置
summary: 获取文件配置列表
description: 返回所有启用的文件配置包含fileId和文件名称
responses:
200:
description: 成功
schema:
type: object
properties:
code:
type: integer
example: 0
data:
type: object
properties:
fileConfigs:
type: array
items:
type: object
properties:
fileId:
type: integer
description: 文件配置ID
example: 1765273961563507
fileName:
type: string
description: 文件名称
example: 1.请示报告卡XXX
filePath:
type: string
description: MinIO文件路径
example: /615873064429507639/TEMPLATE/2025/12/1.请示报告卡XXX.docx
isSuccess:
type: boolean
example: true
"""
try:
conn = document_service.get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, name, file_path
FROM f_polic_file_config
WHERE tenant_id = %s
AND state = 1
ORDER BY name
"""
cursor.execute(sql, (document_service.tenant_id,))
configs = cursor.fetchall()
file_configs = []
for config in configs:
file_configs.append({
'fileId': config['id'],
'fileName': config['name'],
'filePath': config['file_path'] or ''
})
return success_response({
'fileConfigs': file_configs
})
finally:
cursor.close()
conn.close()
except Exception as e:
return error_response(500, f"查询文件配置失败: {str(e)}")
@app.route('/api/fields', methods=['GET']) @app.route('/api/fields', methods=['GET'])
def get_fields(): def get_fields():
""" """
@ -490,6 +570,10 @@ def generate_document():
type: string type: string
description: MinIO相对路径指向生成的文档文件 description: MinIO相对路径指向生成的文档文件
example: /615873064429507639/20251205090700/初步核实审批表_张三.docx example: /615873064429507639/20251205090700/初步核实审批表_张三.docx
downloadUrl:
type: string
description: MinIO预签名下载URL完整链接7天有效可直接下载
example: https://minio.datacubeworld.com:9000/finyx/615873064429507639/20251205090700/初步核实审批表_张三.docx?X-Amz-Algorithm=...
msg: msg:
type: string type: string
example: ok example: ok
@ -571,11 +655,142 @@ def generate_document():
first_document_name = None # 用于存储第一个生成的文档名 first_document_name = None # 用于存储第一个生成的文档名
for file_info in file_list: for file_info in file_list:
file_id = file_info.get('fileId') # 兼容 id 和 fileId 两种字段
file_name = file_info.get('fileName', '') file_id = file_info.get('fileId') or file_info.get('id')
file_name = file_info.get('fileName') or file_info.get('name', '')
if not file_id: if not file_id:
return error_response(1001, f"文件 {file_name} 缺少fileId参数") return error_response(1001, f"文件 {file_name} 缺少fileId或id参数")
try:
# 生成文档使用fileId而不是templateCode
result = document_service.generate_document(
file_id=file_id,
input_data=input_data,
file_info=file_info
)
# 使用生成的文档名称(.docx格式而不是原始文件名
generated_file_name = result.get('fileName', file_name)
# 保存第一个文档名作为 documentName
if first_document_name is None:
first_document_name = generated_file_name
result_file_list.append({
'fileId': file_id,
'fileName': generated_file_name, # 使用生成的文档名
'filePath': result['filePath'], # MinIO相对路径
'downloadUrl': result.get('downloadUrl') # MinIO预签名下载URL完整链接
})
except Exception as e:
error_msg = str(e)
if '不存在' in error_msg or '模板' in error_msg:
return error_response(1001, error_msg)
elif '生成' in error_msg or '填充' in error_msg:
return error_response(3001, error_msg)
elif '上传' in error_msg or '保存' in error_msg:
return error_response(3002, error_msg)
else:
return error_response(3001, f"文件生成失败: {error_msg}")
# 构建返回数据不包含inputData只返回生成的文档信息
return success_response({
'documentId': document_id,
'documentName': first_document_name or 'generated.docx', # 使用第一个生成的文档名
'fpolicFieldParamFileList': result_file_list
})
except Exception as e:
return error_response(3001, f"文档生成失败: {str(e)}")
<<<<<<< HEAD
@app.route('/fPolicTask/getDocument', methods=['POST'])
def get_document_by_task():
"""
通过taskId获取文档兼容接口
支持通过taskId查询关联的文件列表或直接使用提供的文件列表
"""
try:
data = request.get_json()
# 验证请求参数
if not data:
return error_response(400, "请求参数不能为空")
task_id = data.get('taskId')
input_data = data.get('inputData', [])
file_list = data.get('fpolicFieldParamFileList', [])
# 如果没有提供file_list尝试通过taskId查询
if not file_list and task_id:
try:
conn = document_service.get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 尝试从f_polic_task表查询关联的文件列表
# 注意这里需要根据实际表结构调整SQL
sql = """
SELECT file_id, file_name
FROM f_polic_task_file
WHERE task_id = %s
AND tenant_id = %s
AND state = 1
"""
cursor.execute(sql, (task_id, document_service.tenant_id))
task_files = cursor.fetchall()
if task_files:
file_list = []
for tf in task_files:
file_list.append({
'fileId': tf['file_id'],
'fileName': tf.get('file_name', '')
})
except Exception as e:
# 如果表不存在或查询失败,记录日志但不报错
print(f"[WARN] 无法通过taskId查询文件列表: {str(e)}")
finally:
cursor.close()
conn.close()
except Exception as e:
print(f"[WARN] 查询taskId关联文件时出错: {str(e)}")
# 如果仍然没有file_list返回错误
if not file_list:
return error_response(400, "缺少fpolicFieldParamFileList参数且无法通过taskId查询到关联文件。请提供fpolicFieldParamFileList参数格式: [{'fileId': 文件ID, 'fileName': '文件名'}]")
if not input_data or not isinstance(input_data, list):
return error_response(400, "inputData参数必须是非空数组")
if not file_list or not isinstance(file_list, list):
return error_response(400, "fpolicFieldParamFileList参数必须是非空数组")
# 将input_data转换为字典格式用于生成文档名称
field_data = {}
for item in input_data:
field_code = item.get('fieldCode', '')
field_value = item.get('fieldValue', '')
if field_code:
field_data[field_code] = field_value or ''
# 生成文档ID
document_id = document_service.generate_document_id()
# 处理每个文件
result_file_list = []
first_document_name = None # 用于存储第一个生成的文档名
for file_info in file_list:
# 兼容 id 和 fileId 两种字段
file_id = file_info.get('fileId') or file_info.get('id')
file_name = file_info.get('fileName') or file_info.get('name', '')
if not file_id:
return error_response(1001, f"文件 {file_name} 缺少fileId或id参数")
try: try:
# 生成文档使用fileId而不是templateCode # 生成文档使用fileId而不是templateCode
@ -620,6 +835,8 @@ def generate_document():
return error_response(3001, f"文档生成失败: {str(e)}") return error_response(3001, f"文档生成失败: {str(e)}")
=======
>>>>>>> parent of 4897c96 (添加通过taskId获取文档的接口支持文件列表查询和参数验证增强错误处理能力同时优化文档生成逻辑确保生成的文档名称和路径的准确性)
if __name__ == '__main__': if __name__ == '__main__':
# 确保static目录存在 # 确保static目录存在
os.makedirs('static', exist_ok=True) os.makedirs('static', exist_ok=True)

View File

@ -1,9 +1,16 @@
""" """
为指定的文件路径生成 MinIO 预签名下载 URL 为指定的文件路径生成 MinIO 预签名下载 URL
""" """
import sys
import io
from minio import Minio from minio import Minio
from datetime import timedelta from datetime import timedelta
# 设置输出编码为UTF-8避免Windows控制台编码问题
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# MinIO连接配置 # MinIO连接配置
MINIO_CONFIG = { MINIO_CONFIG = {
'endpoint': 'minio.datacubeworld.com:9000', 'endpoint': 'minio.datacubeworld.com:9000',
@ -16,8 +23,13 @@ BUCKET_NAME = 'finyx'
# 文件相对路径列表 # 文件相对路径列表
FILE_PATHS = [ FILE_PATHS = [
'/615873064429507639/20251210155041/初步核实审批表_张三.docx', <<<<<<< HEAD
'/615873064429507639/20251210155041/请示报告卡_张三.docx' '/615873064429507639/20251211112544/初步核实审批表_张三.docx',
'/615873064429507639/20251211112545/请示报告卡_张三.docx'
=======
'/615873064429507639/20251211101046/1_张三.docx',
'/615873064429507639/20251211101046/1_张三.docx'
>>>>>>> e3f4a394c1a4333db2fd3a9383be29fa9d9055e0
] ]
def generate_download_urls(): def generate_download_urls():
@ -52,7 +64,7 @@ def generate_download_urls():
try: try:
# 检查文件是否存在 # 检查文件是否存在
stat = client.stat_object(BUCKET_NAME, object_name) stat = client.stat_object(BUCKET_NAME, object_name)
print(f" 文件存在") print(f"[OK] 文件存在")
print(f" 文件大小: {stat.size:,} 字节") print(f" 文件大小: {stat.size:,} 字节")
print(f" 最后修改: {stat.last_modified}") print(f" 最后修改: {stat.last_modified}")
@ -63,7 +75,7 @@ def generate_download_urls():
expires=timedelta(days=7) expires=timedelta(days=7)
) )
print(f" 预签名URL生成成功7天有效") print(f"[OK] 预签名URL生成成功7天有效")
print(f"\n下载链接:") print(f"\n下载链接:")
print(f"{url}\n") print(f"{url}\n")
@ -76,7 +88,7 @@ def generate_download_urls():
}) })
except Exception as e: except Exception as e:
print(f" 错误: {e}\n") print(f"[ERROR] 错误: {e}\n")
results.append({ results.append({
'file_path': file_path, 'file_path': file_path,
'object_name': object_name, 'object_name': object_name,
@ -93,10 +105,10 @@ def generate_download_urls():
for i, result in enumerate(results, 1): for i, result in enumerate(results, 1):
print(f"\n{i}. {result['file_path']}") print(f"\n{i}. {result['file_path']}")
if result['exists']: if result['exists']:
print(f" 文件存在") print(f" [OK] 文件存在")
print(f" 下载链接: {result['url']}") print(f" 下载链接: {result['url']}")
else: else:
print(f" 文件不存在或无法访问") print(f" [ERROR] 文件不存在或无法访问")
if 'error' in result: if 'error' in result:
print(f" 错误: {result['error']}") print(f" 错误: {result['error']}")
@ -107,7 +119,7 @@ def generate_download_urls():
return results return results
except Exception as e: except Exception as e:
print(f"\n 连接MinIO失败: {e}") print(f"\n[ERROR] 连接MinIO失败: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
return None return None

View File

@ -0,0 +1,219 @@
"""
生成模板 file_id 和关联关系的详细报告
重点检查每个模板的 file_id 是否正确以及 f_polic_file_field 表的关联关系
"""
import sys
import pymysql
from pathlib import Path
from typing import Dict, List
from collections import defaultdict
# 设置控制台编码为UTF-8Windows兼容
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
except:
pass
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
def generate_detailed_report():
"""生成详细的 file_id 和关联关系报告"""
print("="*80)
print("模板 file_id 和关联关系详细报告")
print("="*80)
# 连接数据库
try:
conn = pymysql.connect(**DB_CONFIG)
print("\n[OK] 数据库连接成功\n")
except Exception as e:
print(f"\n[ERROR] 数据库连接失败: {e}")
return
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 1. 查询所有有 file_path 的模板(实际模板文件,不是目录节点)
cursor.execute("""
SELECT id, name, template_code, file_path, state, parent_id
FROM f_polic_file_config
WHERE tenant_id = %s AND file_path IS NOT NULL AND file_path != ''
ORDER BY name, id
""", (TENANT_ID,))
all_templates = cursor.fetchall()
print(f"总模板数(有 file_path: {len(all_templates)}\n")
# 2. 查询每个模板的关联字段
template_field_map = defaultdict(list)
cursor.execute("""
SELECT
fff.file_id,
fff.filed_id,
fff.state as relation_state,
fc.name as template_name,
fc.template_code,
f.name as field_name,
f.filed_code,
f.field_type,
CASE
WHEN f.field_type = 1 THEN '输入字段'
WHEN f.field_type = 2 THEN '输出字段'
ELSE '未知'
END as field_type_name
FROM f_polic_file_field fff
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s
ORDER BY fff.file_id, f.field_type, f.name
""", (TENANT_ID,))
all_relations = cursor.fetchall()
for rel in all_relations:
template_field_map[rel['file_id']].append(rel)
# 3. 按模板分组显示
print("="*80)
print("每个模板的 file_id 和关联字段详情")
print("="*80)
# 按名称分组,显示重复的模板
templates_by_name = defaultdict(list)
for template in all_templates:
templates_by_name[template['name']].append(template)
duplicate_templates = {name: tmpls for name, tmpls in templates_by_name.items() if len(tmpls) > 1}
if duplicate_templates:
print("\n[WARN] 发现重复名称的模板:\n")
for name, tmpls in duplicate_templates.items():
print(f" 模板名称: {name}")
for tmpl in tmpls:
field_count = len(template_field_map.get(tmpl['id'], []))
input_count = sum(1 for f in template_field_map.get(tmpl['id'], []) if f['field_type'] == 1)
output_count = sum(1 for f in template_field_map.get(tmpl['id'], []) if f['field_type'] == 2)
print(f" - file_id: {tmpl['id']}")
print(f" template_code: {tmpl.get('template_code', 'N/A')}")
print(f" file_path: {tmpl.get('file_path', 'N/A')}")
print(f" 关联字段: 总计 {field_count} 个 (输入 {input_count}, 输出 {output_count})")
print()
# 4. 显示每个模板的详细信息
print("\n" + "="*80)
print("所有模板的 file_id 和关联字段统计")
print("="*80)
for template in all_templates:
file_id = template['id']
name = template['name']
template_code = template.get('template_code', 'N/A')
file_path = template.get('file_path', 'N/A')
fields = template_field_map.get(file_id, [])
input_fields = [f for f in fields if f['field_type'] == 1]
output_fields = [f for f in fields if f['field_type'] == 2]
print(f"\n模板: {name}")
print(f" file_id: {file_id}")
print(f" template_code: {template_code}")
print(f" file_path: {file_path}")
print(f" 关联字段: 总计 {len(fields)}")
print(f" - 输入字段 (field_type=1): {len(input_fields)}")
print(f" - 输出字段 (field_type=2): {len(output_fields)}")
if len(fields) == 0:
print(f" [WARN] 该模板没有关联任何字段")
# 5. 检查关联关系的完整性
print("\n" + "="*80)
print("关联关系完整性检查")
print("="*80)
# 检查是否有 file_id 在 f_polic_file_field 中但没有对应的文件配置
cursor.execute("""
SELECT DISTINCT fff.file_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
WHERE fff.tenant_id = %s AND fc.id IS NULL
""", (TENANT_ID,))
orphan_file_ids = cursor.fetchall()
if orphan_file_ids:
print(f"\n[ERROR] 发现孤立的 file_id在 f_polic_file_field 中但不在 f_polic_file_config 中):")
for item in orphan_file_ids:
print(f" - file_id: {item['file_id']}")
else:
print("\n[OK] 所有关联关系的 file_id 都有效")
# 检查是否有 filed_id 在 f_polic_file_field 中但没有对应的字段
cursor.execute("""
SELECT DISTINCT fff.filed_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND f.id IS NULL
""", (TENANT_ID,))
orphan_field_ids = cursor.fetchall()
if orphan_field_ids:
print(f"\n[ERROR] 发现孤立的 filed_id在 f_polic_file_field 中但不在 f_polic_field 中):")
for item in orphan_field_ids:
print(f" - filed_id: {item['filed_id']}")
else:
print("\n[OK] 所有关联关系的 filed_id 都有效")
# 6. 统计汇总
print("\n" + "="*80)
print("统计汇总")
print("="*80)
total_templates = len(all_templates)
templates_with_fields = len([t for t in all_templates if len(template_field_map.get(t['id'], [])) > 0])
templates_without_fields = total_templates - templates_with_fields
total_relations = len(all_relations)
total_input_relations = sum(1 for r in all_relations if r['field_type'] == 1)
total_output_relations = sum(1 for r in all_relations if r['field_type'] == 2)
print(f"\n模板统计:")
print(f" 总模板数: {total_templates}")
print(f" 有关联字段的模板: {templates_with_fields}")
print(f" 无关联字段的模板: {templates_without_fields}")
print(f"\n关联关系统计:")
print(f" 总关联关系数: {total_relations}")
print(f" 输入字段关联: {total_input_relations}")
print(f" 输出字段关联: {total_output_relations}")
if duplicate_templates:
print(f"\n[WARN] 发现 {len(duplicate_templates)} 个模板名称有重复记录")
print(" 建议: 确认每个模板应该使用哪个 file_id并清理重复记录")
if templates_without_fields:
print(f"\n[WARN] 发现 {templates_without_fields} 个模板没有关联任何字段")
print(" 建议: 检查这些模板是否需要关联字段")
finally:
cursor.close()
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
generate_detailed_report()

64
get_available_file_ids.py Normal file
View File

@ -0,0 +1,64 @@
"""
获取所有可用的文件ID列表用于测试
"""
import pymysql
import os
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
def get_available_file_configs():
"""获取所有可用的文件配置"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, name, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s
AND state = 1
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID,))
configs = cursor.fetchall()
print("="*80)
print("可用的文件配置列表state=1")
print("="*80)
print(f"\n共找到 {len(configs)} 个启用的文件配置:\n")
for i, config in enumerate(configs, 1):
print(f"{i}. ID: {config['id']}")
print(f" 名称: {config['name']}")
print(f" 文件路径: {config['file_path'] or '(空)'}")
print()
# 输出JSON格式方便复制
print("\n" + "="*80)
print("JSON格式可用于测试:")
print("="*80)
print("[")
for i, config in enumerate(configs):
comma = "," if i < len(configs) - 1 else ""
print(f' {{"fileId": {config["id"]}, "fileName": "{config["name"]}.doc"}}{comma}')
print("]")
return configs
finally:
cursor.close()
conn.close()
if __name__ == '__main__':
get_available_file_configs()

View File

@ -5,7 +5,7 @@ import os
import re import re
import tempfile import tempfile
from typing import Dict, List, Optional from typing import Dict, List, Optional
from datetime import datetime from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from docx import Document from docx import Document
from minio import Minio from minio import Minio
@ -131,9 +131,80 @@ class DocumentService:
填充后的文档路径 填充后的文档路径
""" """
try: try:
print(f"[DEBUG] 开始填充模板: {template_path}")
print(f"[DEBUG] 字段数据: {field_data}")
# 打开模板文档 # 打开模板文档
doc = Document(template_path) doc = Document(template_path)
print(f"[DEBUG] 文档包含 {len(doc.paragraphs)} 个段落, {len(doc.tables)} 个表格")
<<<<<<< HEAD
def replace_placeholder_in_paragraph(paragraph):
"""在段落中替换占位符处理跨run的情况"""
try:
# 获取段落完整文本
full_text = paragraph.text
if not full_text:
return
# 检查是否有占位符需要替换
has_placeholder = False
replaced_text = full_text
replacement_count = 0
# 遍历所有字段,替换所有匹配的占位符(包括重复的)
for field_code, field_value in field_data.items():
placeholder = f"{{{{{field_code}}}}}"
# 使用循环替换所有匹配项(不仅仅是第一个)
while placeholder in replaced_text:
has_placeholder = True
replacement_count += 1
# 替换占位符,如果值为空则替换为空字符串
replaced_text = replaced_text.replace(placeholder, str(field_value) if field_value else '', 1)
print(f"[DEBUG] 替换占位符: {placeholder} -> '{field_value}' (在段落中)")
# 如果有替换,使用安全的方式更新段落文本
if has_placeholder:
print(f"[DEBUG] 段落替换了 {replacement_count} 个占位符: '{full_text[:50]}...' -> '{replaced_text[:50]}...'")
try:
# 方法1直接设置text推荐会自动处理run
paragraph.text = replaced_text
except Exception as e1:
# 如果方法1失败尝试方法2手动处理run
try:
# 清空所有run
paragraph.clear()
# 添加新的run
if replaced_text:
paragraph.add_run(replaced_text)
except Exception as e2:
# 如果两种方法都失败,记录错误但继续
print(f"[WARN] 无法更新段落文本方法1错误: {str(e1)}, 方法2错误: {str(e2)}")
pass
except Exception as e:
# 如果单个段落处理失败,记录错误但继续处理其他段落
print(f"[WARN] 处理段落时出错: {str(e)}")
import traceback
print(traceback.format_exc())
pass
# 统计替换信息
total_replacements = 0
replaced_placeholders = set()
# 替换段落中的占位符
for para_idx, paragraph in enumerate(doc.paragraphs):
before_text = paragraph.text
replace_placeholder_in_paragraph(paragraph)
after_text = paragraph.text
if before_text != after_text:
# 检查哪些占位符被替换了
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in before_text and placeholder not in after_text:
replaced_placeholders.add(field_code)
total_replacements += before_text.count(placeholder)
=======
# 替换占位符 {{field_code}} 为实际值 # 替换占位符 {{field_code}} 为实际值
for paragraph in doc.paragraphs: for paragraph in doc.paragraphs:
# 替换段落文本中的占位符 # 替换段落文本中的占位符
@ -144,11 +215,73 @@ class DocumentService:
for run in paragraph.runs: for run in paragraph.runs:
if placeholder in run.text: if placeholder in run.text:
run.text = run.text.replace(placeholder, field_value or '') run.text = run.text.replace(placeholder, field_value or '')
>>>>>>> parent of 4897c96 (添加通过taskId获取文档的接口支持文件列表查询和参数验证增强错误处理能力同时优化文档生成逻辑确保生成的文档名称和路径的准确性)
# 替换表格中的占位符 # 替换表格中的占位符
try:
for table in doc.tables:
if not table.rows:
continue
for row in table.rows:
if not row.cells:
continue
for cell in row.cells:
try:
# 检查cell是否有paragraphs属性且不为空
if hasattr(cell, 'paragraphs'):
# 安全地获取paragraphs列表
paragraphs = list(cell.paragraphs) if cell.paragraphs else []
for paragraph in paragraphs:
before_text = paragraph.text
replace_placeholder_in_paragraph(paragraph)
after_text = paragraph.text
if before_text != after_text:
# 检查哪些占位符被替换了
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in before_text and placeholder not in after_text:
replaced_placeholders.add(field_code)
total_replacements += before_text.count(placeholder)
except Exception as e:
# 如果单个单元格处理失败,记录错误但继续处理其他单元格
print(f"[WARN] 处理表格单元格时出错: {str(e)}")
pass
except Exception as e:
# 如果表格处理失败,记录错误但继续保存文档
print(f"[WARN] 处理表格时出错: {str(e)}")
pass
# 验证是否还有未替换的占位符
remaining_placeholders = set()
for paragraph in doc.paragraphs:
text = paragraph.text
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in text:
remaining_placeholders.add(field_code)
# 检查表格中的占位符
for table in doc.tables: for table in doc.tables:
for row in table.rows: for row in table.rows:
for cell in row.cells: for cell in row.cells:
<<<<<<< HEAD
if hasattr(cell, 'paragraphs'):
for paragraph in cell.paragraphs:
text = paragraph.text
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in text:
remaining_placeholders.add(field_code)
# 输出统计信息
print(f"[DEBUG] 占位符替换统计:")
print(f" - 已替换的占位符: {sorted(replaced_placeholders)}")
print(f" - 总替换次数: {total_replacements}")
if remaining_placeholders:
print(f" - ⚠️ 仍有未替换的占位符: {sorted(remaining_placeholders)}")
else:
print(f" - ✓ 所有占位符已成功替换")
=======
for paragraph in cell.paragraphs: for paragraph in cell.paragraphs:
for field_code, field_value in field_data.items(): for field_code, field_value in field_data.items():
placeholder = f"{{{{{field_code}}}}}" placeholder = f"{{{{{field_code}}}}}"
@ -156,16 +289,26 @@ class DocumentService:
for run in paragraph.runs: for run in paragraph.runs:
if placeholder in run.text: if placeholder in run.text:
run.text = run.text.replace(placeholder, field_value or '') run.text = run.text.replace(placeholder, field_value or '')
>>>>>>> parent of 4897c96 (添加通过taskId获取文档的接口支持文件列表查询和参数验证增强错误处理能力同时优化文档生成逻辑确保生成的文档名称和路径的准确性)
# 保存到临时文件 # 保存到临时文件
temp_dir = tempfile.gettempdir() temp_dir = tempfile.gettempdir()
output_file = os.path.join(temp_dir, f"filled_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx") output_file = os.path.join(temp_dir, f"filled_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx")
doc.save(output_file) doc.save(output_file)
print(f"[DEBUG] 文档已保存到: {output_file}")
return output_file return output_file
except IndexError as e:
# 索引越界错误,提供更详细的错误信息
import traceback
error_detail = traceback.format_exc()
raise Exception(f"填充模板失败: list index out of range. 详细信息: {str(e)}\n{error_detail}")
except Exception as e: except Exception as e:
raise Exception(f"填充模板失败: {str(e)}") # 其他错误,提供详细的错误信息
import traceback
error_detail = traceback.format_exc()
raise Exception(f"填充模板失败: {str(e)}\n{error_detail}")
def upload_to_minio(self, file_path: str, file_name: str) -> str: def upload_to_minio(self, file_path: str, file_name: str) -> str:
""" """
@ -183,8 +326,9 @@ class DocumentService:
try: try:
# 生成MinIO对象路径相对路径 # 生成MinIO对象路径相对路径
now = datetime.now() now = datetime.now()
# 使用日期路径组织文件 # 使用日期路径组织文件,添加微秒确保唯一性
object_name = f"{self.tenant_id}/{now.strftime('%Y%m%d%H%M%S')}/{file_name}" timestamp = f"{now.strftime('%Y%m%d%H%M%S')}{now.microsecond:06d}"
object_name = f"{self.tenant_id}/{timestamp}/{file_name}"
# 上传文件 # 上传文件
client.fput_object( client.fput_object(
@ -215,7 +359,12 @@ class DocumentService:
# 获取文件配置 # 获取文件配置
file_config = self.get_file_config_by_id(file_id) file_config = self.get_file_config_by_id(file_id)
if not file_config: if not file_config:
raise Exception(f"文件ID {file_id} 对应的模板不存在或未启用") # 提供更详细的错误信息
raise Exception(
f"文件ID {file_id} 对应的模板不存在或未启用。"
f"请通过查询 f_polic_file_config 表获取有效的文件ID"
f"或访问 /api/file-configs 接口查看可用的文件配置列表。"
)
# 检查file_path是否存在 # 检查file_path是否存在
file_path = file_config.get('file_path') file_path = file_config.get('file_path')
@ -240,15 +389,26 @@ class DocumentService:
filled_doc_path = self.fill_template(template_path, field_data) filled_doc_path = self.fill_template(template_path, field_data)
# 生成文档名称(.docx格式 # 生成文档名称(.docx格式
original_file_name = file_info.get('fileName', 'generated.doc') # 优先使用file_info中的fileName如果没有则使用数据库中的name
# 确保每个文件都使用自己的文件名
original_file_name = file_info.get('fileName') or file_info.get('name') or file_config.get('name', 'generated.doc')
print(f"[DEBUG] 文件ID: {file_id}, 原始文件名: {original_file_name}")
print(f"[DEBUG] file_info内容: {file_info}")
print(f"[DEBUG] file_config内容: {file_config}")
print(f"[DEBUG] 字段数据用于生成文档名: {field_data}")
generated_file_name = self.generate_document_name(original_file_name, field_data) generated_file_name = self.generate_document_name(original_file_name, field_data)
print(f"[DEBUG] 文件ID: {file_id}, 生成的文档名: {generated_file_name}")
# 上传到MinIO使用生成的文档名 # 上传到MinIO使用生成的文档名
file_path = self.upload_to_minio(filled_doc_path, generated_file_name) file_path = self.upload_to_minio(filled_doc_path, generated_file_name)
# 生成预签名下载URL
download_url = self.generate_presigned_download_url(file_path)
return { return {
'filePath': file_path, 'filePath': file_path,
'fileName': generated_file_name # 返回生成的文档名 'fileName': generated_file_name, # 返回生成的文档名
'downloadUrl': download_url # 返回预签名下载URL
} }
finally: finally:
@ -278,16 +438,103 @@ class DocumentService:
field_data: 字段数据 field_data: 字段数据
Returns: Returns:
生成的文档名称 "初步核实审批表_张三.docx" 生成的文档名称 "请示报告卡_张三.docx"
""" """
import re
# 提取文件基础名称(不含扩展名) # 提取文件基础名称(不含扩展名)
base_name = Path(original_file_name).stem # 处理可能包含路径的情况
# 先移除路径,只保留文件名
file_name_only = Path(original_file_name).name
# 判断是否有扩展名(.doc, .docx等
# 如果最后有常见的文档扩展名则提取stem
if file_name_only.lower().endswith(('.doc', '.docx', '.txt', '.pdf')):
base_name = Path(file_name_only).stem
else:
# 如果没有扩展名,直接使用文件名
base_name = file_name_only
print(f"[DEBUG] 原始文件名: '{original_file_name}'")
print(f"[DEBUG] 提取的基础名称(清理前): '{base_name}'")
# 清理文件名中的特殊标记
# 1. 移除开头的数字和点(如 "1."、"2." 等),但保留后面的内容
# 使用非贪婪匹配,只匹配开头的数字和点
base_name = re.sub(r'^\d+\.\s*', '', base_name)
# 2. 移除括号及其内容(如 "XXX"、"(初核谈话)" 等)
base_name = re.sub(r'[(].*?[)]', '', base_name)
# 3. 清理首尾空白字符和多余的点
base_name = base_name.strip().strip('.')
# 4. 如果清理后为空或只有数字,使用原始文件名重新处理
if not base_name or base_name.isdigit():
print(f"[DEBUG] 清理后为空或只有数字,重新处理原始文件名")
# 从原始文件名中提取,但保留更多内容
temp_name = file_name_only
# 只移除括号,保留数字前缀(但格式化为更友好的形式)
temp_name = re.sub(r'[(].*?[)]', '', temp_name)
# 移除扩展名(如果存在)
if temp_name.lower().endswith(('.doc', '.docx', '.txt', '.pdf')):
temp_name = Path(temp_name).stem
temp_name = temp_name.strip().strip('.')
if temp_name:
base_name = temp_name
else:
base_name = "文档" # 最后的备选方案
print(f"[DEBUG] 清理后的基础名称: '{base_name}'")
# 尝试从字段数据中提取被核查人姓名作为后缀 # 尝试从字段数据中提取被核查人姓名作为后缀
suffix = '' suffix = ''
if 'target_name' in field_data and field_data['target_name']: target_name = field_data.get('target_name', '')
suffix = f"_{field_data['target_name']}" if target_name and target_name.strip():
suffix = f"_{target_name.strip()}"
<<<<<<< HEAD
# 生成新文件名 # 生成新文件名
return f"{base_name}{suffix}.docx" return f"{base_name}{suffix}.docx"
def generate_presigned_download_url(self, file_path: str, expires_days: int = 7) -> Optional[str]:
"""
生成MinIO预签名下载URL
Args:
file_path: MinIO中的相对路径 '/615873064429507639/20251205090700/初步核实审批表_张三.docx'
expires_days: URL有效期天数默认7天
Returns:
预签名下载URL如果生成失败则返回None
"""
try:
if not file_path:
return None
client = self.get_minio_client()
# 从相对路径中提取对象名称(去掉开头的/
object_name = file_path.lstrip('/')
# 生成预签名URL
url = client.presigned_get_object(
self.bucket_name,
object_name,
expires=timedelta(days=expires_days)
)
return url
except Exception as e:
# 如果生成URL失败记录错误但不影响主流程
print(f"生成预签名URL失败: {str(e)}")
return None
=======
# 生成新文件名(确保是.docx格式
generated_name = f"{base_name}{suffix}.docx"
print(f"[DEBUG] 文档名称生成: '{original_file_name}' -> '{generated_name}' (base_name='{base_name}', suffix='{suffix}')")
return generated_name
>>>>>>> e3f4a394c1a4333db2fd3a9383be29fa9d9055e0

View File

@ -327,10 +327,13 @@
<div class="form-group"> <div class="form-group">
<label>文件列表</label> <label>文件列表</label>
<div style="margin-bottom: 10px;">
<button class="btn btn-secondary" onclick="loadAvailableFiles()" style="margin-right: 10px;">📋 加载可用文件列表</button>
<button class="btn btn-secondary" onclick="addFileItem()">+ 手动添加文件</button>
</div>
<div id="fileListContainer"> <div id="fileListContainer">
<!-- 动态生成的文件列表 --> <!-- 动态生成的文件列表 -->
</div> </div>
<button class="btn btn-secondary" onclick="addFileItem()">+ 添加文件</button>
</div> </div>
</div> </div>
@ -548,28 +551,82 @@
// ==================== 文档生成接口相关 ==================== // ==================== 文档生成接口相关 ====================
function initGenerateTab() { async function loadAvailableFiles() {
try {
const response = await fetch('/api/file-configs');
const result = await response.json();
if (result.isSuccess && result.data && result.data.fileConfigs) {
const container = document.getElementById('fileListContainer');
container.innerHTML = ''; // 清空现有列表
// 只添加有filePath的文件有模板文件的
const filesWithPath = result.data.fileConfigs.filter(f => f.filePath);
if (filesWithPath.length === 0) {
alert('没有找到可用的文件配置需要有filePath');
return;
}
// 添加前5个文件作为示例
filesWithPath.slice(0, 5).forEach(file => {
addFileItem(file.fileId, file.fileName);
});
if (filesWithPath.length > 5) {
alert(`已加载前5个文件共找到 ${filesWithPath.length} 个可用文件`);
} else {
alert(`已加载 ${filesWithPath.length} 个可用文件`);
}
} else {
alert('获取文件列表失败: ' + (result.errorMsg || '未知错误'));
}
} catch (error) {
alert('加载文件列表失败: ' + error.message);
}
}
async function initGenerateTab() {
// 初始化默认字段(完整的虚拟测试数据) // 初始化默认字段(完整的虚拟测试数据)
addGenerateField('target_name', '张三'); addGenerateField('target_name', '张三');
addGenerateField('target_gender', '男'); addGenerateField('target_gender', '男');
addGenerateField('target_age', '44'); addGenerateField('target_age', '34');
addGenerateField('target_date_of_birth', '198005'); addGenerateField('target_date_of_birth', '199009');
addGenerateField('target_organization_and_position', '某公司总经理'); addGenerateField('target_organization_and_position', '云南省农业机械公司党支部书记、经理');
addGenerateField('target_organization', '某公司'); addGenerateField('target_organization', '云南省农业机械公司');
addGenerateField('target_position', '总经理'); addGenerateField('target_position', '党支部书记、经理');
addGenerateField('target_education_level', '本科'); addGenerateField('target_education_level', '研究生');
addGenerateField('target_political_status', '中共党员'); addGenerateField('target_political_status', '中共党员');
addGenerateField('target_professional_rank', '正处级'); addGenerateField('target_professional_rank', '');
addGenerateField('clue_source', '群众举报'); addGenerateField('clue_source', '');
addGenerateField('target_issue_description', '违反国家计划生育有关政策规定于2010年10月生育二胎。'); addGenerateField('target_issue_description', '张三多次在私下聚会、网络群组中发表抹黑党中央决策部署的言论传播歪曲党的理论和路线方针政策的错误观点频繁接受管理服务对象安排的高档宴请、私人会所聚餐以及高尔夫球、高端足浴等娱乐活动相关费用均由对方全额承担在干部选拔任用、岗位调整工作中利用职务便利收受他人财物利用职权为其亲属经营的公司谋取不正当利益帮助该公司违规承接本单位及关联单位工程项目3个合同总额超200万元从中收受亲属给予的"感谢费"15万元其本人沉迷赌博活动每周至少参与1次大额赌资赌博单次赌资超1万元累计赌资达数十万元。');
addGenerateField('department_opinion', '建议进行初步核实'); addGenerateField('department_opinion', '');
addGenerateField('filler_name', '李四'); addGenerateField('filler_name', '');
// 初始化默认文件使用fileId不再需要templateCode // 自动加载可用的文件列表只加载前2个作为示例
// fileId可以从f_polic_file_config表查询获取 try {
const response = await fetch('/api/file-configs');
const result = await response.json();
if (result.isSuccess && result.data && result.data.fileConfigs) {
// 只添加有filePath的文件有模板文件的
const filesWithPath = result.data.fileConfigs.filter(f => f.filePath);
// 添加前2个文件作为示例
filesWithPath.slice(0, 2).forEach(file => {
addFileItem(file.fileId, file.fileName);
});
} else {
// 如果加载失败使用默认的fileId
addFileItem(1765273961883544, '初步核实审批表.doc'); // 2.初步核实审批表XXX addFileItem(1765273961883544, '初步核实审批表.doc'); // 2.初步核实审批表XXX
addFileItem(1765273961563507, '请示报告卡.doc'); // 1.请示报告卡XXX addFileItem(1765273961563507, '请示报告卡.doc'); // 1.请示报告卡XXX
} }
} catch (error) {
// 如果加载失败使用默认的fileId
addFileItem(1765273961883544, '初步核实审批表.doc');
addFileItem(1765273961563507, '请示报告卡.doc');
}
}
function addGenerateField(fieldCode = '', fieldValue = '') { function addGenerateField(fieldCode = '', fieldValue = '') {
const container = document.getElementById('generateFieldsContainer'); const container = document.getElementById('generateFieldsContainer');

467
update_all_templates.py Normal file
View File

@ -0,0 +1,467 @@
"""
更新 template_finish 目录下所有模板文件
重新上传到 MinIO 并更新数据库信息确保模板文件是最新版本
"""
import os
import sys
import json
import pymysql
from minio import Minio
from minio.error import S3Error
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
# 设置控制台编码为UTF-8Windows兼容
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
except:
pass
# MinIO连接配置
MINIO_CONFIG = {
'endpoint': 'minio.datacubeworld.com:9000',
'access_key': 'JOLXFXny3avFSzB0uRA5',
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
'secure': True # 使用HTTPS
}
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
# 固定值
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
BUCKET_NAME = 'finyx'
# 项目根目录
PROJECT_ROOT = Path(__file__).parent
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
# 文档类型映射(根据完整文件名识别,保持原文件名不变)
# 每个文件名都是独立的模板使用完整文件名作为key
DOCUMENT_TYPE_MAPPING = {
"1.请示报告卡XXX": {
"template_code": "REPORT_CARD",
"name": "1.请示报告卡XXX",
"business_type": "INVESTIGATION"
},
"2.初步核实审批表XXX": {
"template_code": "PRELIMINARY_VERIFICATION_APPROVAL",
"name": "2.初步核实审批表XXX",
"business_type": "INVESTIGATION"
},
"3.附件初核方案(XXX)": {
"template_code": "INVESTIGATION_PLAN",
"name": "3.附件初核方案(XXX)",
"business_type": "INVESTIGATION"
},
"谈话通知书第一联": {
"template_code": "NOTIFICATION_LETTER_1",
"name": "谈话通知书第一联",
"business_type": "INVESTIGATION"
},
"谈话通知书第二联": {
"template_code": "NOTIFICATION_LETTER_2",
"name": "谈话通知书第二联",
"business_type": "INVESTIGATION"
},
"谈话通知书第三联": {
"template_code": "NOTIFICATION_LETTER_3",
"name": "谈话通知书第三联",
"business_type": "INVESTIGATION"
},
"1.请示报告卡(初核谈话)": {
"template_code": "REPORT_CARD_INTERVIEW",
"name": "1.请示报告卡(初核谈话)",
"business_type": "INVESTIGATION"
},
"2谈话审批表": {
"template_code": "INTERVIEW_APPROVAL_FORM",
"name": "2谈话审批表",
"business_type": "INVESTIGATION"
},
"3.谈话前安全风险评估表": {
"template_code": "PRE_INTERVIEW_RISK_ASSESSMENT",
"name": "3.谈话前安全风险评估表",
"business_type": "INVESTIGATION"
},
"4.谈话方案": {
"template_code": "INTERVIEW_PLAN",
"name": "4.谈话方案",
"business_type": "INVESTIGATION"
},
"5.谈话后安全风险评估表": {
"template_code": "POST_INTERVIEW_RISK_ASSESSMENT",
"name": "5.谈话后安全风险评估表",
"business_type": "INVESTIGATION"
},
"1.谈话笔录": {
"template_code": "INTERVIEW_RECORD",
"name": "1.谈话笔录",
"business_type": "INVESTIGATION"
},
"2.谈话询问对象情况摸底调查30问": {
"template_code": "INVESTIGATION_30_QUESTIONS",
"name": "2.谈话询问对象情况摸底调查30问",
"business_type": "INVESTIGATION"
},
"3.被谈话人权利义务告知书": {
"template_code": "RIGHTS_OBLIGATIONS_NOTICE",
"name": "3.被谈话人权利义务告知书",
"business_type": "INVESTIGATION"
},
"4.点对点交接单": {
"template_code": "HANDOVER_FORM",
"name": "4.点对点交接单",
"business_type": "INVESTIGATION"
},
"4.点对点交接单2": {
"template_code": "HANDOVER_FORM_2",
"name": "4.点对点交接单2",
"business_type": "INVESTIGATION"
},
"5.陪送交接单(新)": {
"template_code": "ESCORT_HANDOVER_FORM",
"name": "5.陪送交接单(新)",
"business_type": "INVESTIGATION"
},
"6.1保密承诺书(谈话对象使用-非中共党员用)": {
"template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY",
"name": "6.1保密承诺书(谈话对象使用-非中共党员用)",
"business_type": "INVESTIGATION"
},
"6.2保密承诺书(谈话对象使用-中共党员用)": {
"template_code": "CONFIDENTIALITY_COMMITMENT_PARTY",
"name": "6.2保密承诺书(谈话对象使用-中共党员用)",
"business_type": "INVESTIGATION"
},
"7.办案人员-办案安全保密承诺书": {
"template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT",
"name": "7.办案人员-办案安全保密承诺书",
"business_type": "INVESTIGATION"
},
"8-1请示报告卡初核报告结论 ": {
"template_code": "REPORT_CARD_CONCLUSION",
"name": "8-1请示报告卡初核报告结论 ",
"business_type": "INVESTIGATION"
},
"8.XXX初核情况报告": {
"template_code": "INVESTIGATION_REPORT",
"name": "8.XXX初核情况报告",
"business_type": "INVESTIGATION"
}
}
def identify_document_type(file_name: str) -> Optional[Dict]:
"""
根据完整文件名识别文档类型保持原文件名不变
Args:
file_name: 文件名不含扩展名
Returns:
文档类型配置如果无法识别返回None
"""
# 获取文件名(不含扩展名),保持原样
base_name = Path(file_name).stem
# 直接使用完整文件名进行精确匹配
if base_name in DOCUMENT_TYPE_MAPPING:
return DOCUMENT_TYPE_MAPPING[base_name]
# 如果精确匹配失败返回None不进行任何修改或模糊匹配
return None
def upload_to_minio(file_path: Path, minio_client: Minio) -> str:
"""
上传文件到MinIO覆盖已存在的文件
Args:
file_path: 本地文件路径
minio_client: MinIO客户端实例
Returns:
MinIO中的相对路径
"""
try:
# 检查存储桶是否存在
found = minio_client.bucket_exists(BUCKET_NAME)
if not found:
raise Exception(f"存储桶 '{BUCKET_NAME}' 不存在,请先创建")
# 生成MinIO对象路径使用当前日期确保是最新版本
now = datetime.now()
object_name = f'{TENANT_ID}/TEMPLATE/{now.year}/{now.month:02d}/{file_path.name}'
# 上传文件fput_object 会自动覆盖已存在的文件)
minio_client.fput_object(
BUCKET_NAME,
object_name,
str(file_path),
content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document'
)
# 返回相对路径(以/开头)
return f"/{object_name}"
except S3Error as e:
raise Exception(f"MinIO错误: {e}")
except Exception as e:
raise Exception(f"上传文件时发生错误: {e}")
def update_file_config(conn, doc_config: Dict, file_path: str) -> int:
"""
更新或创建文件配置记录
Args:
conn: 数据库连接
doc_config: 文档配置
file_path: MinIO文件路径
Returns:
文件配置ID
"""
cursor = conn.cursor()
current_time = datetime.now()
try:
# 检查是否已存在(通过 template_code 查找)
select_sql = """
SELECT id, name, file_path FROM f_polic_file_config
WHERE tenant_id = %s AND template_code = %s
"""
cursor.execute(select_sql, (TENANT_ID, doc_config['template_code']))
existing = cursor.fetchone()
# 构建 input_data
input_data = json.dumps({
'template_code': doc_config['template_code'],
'business_type': doc_config['business_type']
}, ensure_ascii=False)
if existing:
file_config_id, old_name, old_path = existing
# 更新现有记录
update_sql = """
UPDATE f_polic_file_config
SET file_path = %s,
input_data = %s,
name = %s,
updated_time = %s,
updated_by = %s,
state = 1
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (
file_path,
input_data,
doc_config['name'],
current_time,
UPDATED_BY,
file_config_id,
TENANT_ID
))
conn.commit()
print(f" [OK] 更新数据库记录 (ID: {file_config_id})")
if old_path != file_path:
print(f" 旧路径: {old_path}")
print(f" 新路径: {file_path}")
return file_config_id
else:
# 创建新记录
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
file_config_id = timestamp * 1000 + random_part
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, input_data, file_path, template_code,
created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (
file_config_id,
TENANT_ID,
None, # parent_id
doc_config['name'],
input_data,
file_path,
doc_config['template_code'],
current_time,
CREATED_BY,
current_time,
CREATED_BY,
1 # state: 1表示启用
))
conn.commit()
print(f" [OK] 创建新数据库记录 (ID: {file_config_id})")
return file_config_id
except Exception as e:
conn.rollback()
raise Exception(f"更新数据库失败: {str(e)}")
finally:
cursor.close()
def update_all_templates():
"""
更新所有模板文件重新上传到MinIO并更新数据库
"""
print("="*80)
print("开始更新所有模板文件")
print("="*80)
print(f"模板目录: {TEMPLATES_DIR}")
print()
if not TEMPLATES_DIR.exists():
print(f"错误: 模板目录不存在: {TEMPLATES_DIR}")
return
# 连接数据库和MinIO
try:
conn = pymysql.connect(**DB_CONFIG)
print("[OK] 数据库连接成功")
minio_client = Minio(
MINIO_CONFIG['endpoint'],
access_key=MINIO_CONFIG['access_key'],
secret_key=MINIO_CONFIG['secret_key'],
secure=MINIO_CONFIG['secure']
)
# 检查存储桶
if not minio_client.bucket_exists(BUCKET_NAME):
raise Exception(f"存储桶 '{BUCKET_NAME}' 不存在,请先创建")
print("[OK] MinIO连接成功")
print()
except Exception as e:
print(f"[ERROR] 连接失败: {e}")
return
# 统计信息
processed_count = 0
updated_count = 0
created_count = 0
skipped_count = 0
failed_count = 0
failed_files = []
# 遍历所有.docx文件
print("="*80)
print("开始处理模板文件...")
print("="*80)
print()
for root, dirs, files in os.walk(TEMPLATES_DIR):
for file in files:
# 只处理.docx文件跳过临时文件
if not file.endswith('.docx') or file.startswith('~$'):
continue
file_path = Path(root) / file
# 识别文档类型
doc_config = identify_document_type(file)
if not doc_config:
print(f"\n[{processed_count + skipped_count + failed_count + 1}] [WARN] 跳过: {file}")
print(f" 原因: 无法识别文档类型")
print(f" 路径: {file_path}")
skipped_count += 1
continue
processed_count += 1
print(f"\n[{processed_count}] 处理: {file}")
print(f" 类型: {doc_config.get('template_code', 'UNKNOWN')}")
print(f" 名称: {doc_config.get('name', 'UNKNOWN')}")
print(f" 路径: {file_path}")
try:
# 检查文件是否存在
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
# 获取文件信息
file_size = file_path.stat().st_size
file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
print(f" 大小: {file_size:,} 字节")
print(f" 修改时间: {file_mtime.strftime('%Y-%m-%d %H:%M:%S')}")
# 上传到MinIO覆盖旧版本
print(f" 上传到MinIO...")
minio_path = upload_to_minio(file_path, minio_client)
print(f" [OK] MinIO路径: {minio_path}")
# 更新数据库
print(f" 更新数据库...")
file_config_id = update_file_config(conn, doc_config, minio_path)
# 判断是更新还是创建
cursor = conn.cursor()
check_sql = """
SELECT created_time, updated_time FROM f_polic_file_config
WHERE id = %s
"""
cursor.execute(check_sql, (file_config_id,))
result = cursor.fetchone()
cursor.close()
if result:
created_time, updated_time = result
if created_time == updated_time:
created_count += 1
else:
updated_count += 1
print(f" [OK] 处理成功 (配置ID: {file_config_id})")
except Exception as e:
failed_count += 1
failed_files.append((str(file_path), str(e)))
print(f" [ERROR] 处理失败: {e}")
import traceback
traceback.print_exc()
# 关闭数据库连接
conn.close()
# 输出统计信息
print("\n" + "="*80)
print("更新完成")
print("="*80)
print(f"总处理数: {processed_count}")
print(f" 成功更新: {updated_count}")
print(f" 成功创建: {created_count}")
print(f" 跳过: {skipped_count}")
print(f" 失败: {failed_count}")
if failed_files:
print("\n失败的文件:")
for file_path, error in failed_files:
print(f" - {file_path}")
print(f" 错误: {error}")
print("\n所有模板文件已更新到最新版本!")
if __name__ == '__main__':
update_all_templates()

View File

@ -0,0 +1,609 @@
"""
重新校验数据库中模板和数据字段对应关系
删除旧的或者无效的模板信息
根据template_finish文件夹下的模板文件重新上传模板到minio并更新数据库
"""
import os
import re
import json
import sys
import pymysql
from minio import Minio
from minio.error import S3Error
from datetime import datetime
from pathlib import Path
from docx import Document
from typing import Dict, List, Set, Optional, Tuple
from collections import defaultdict
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# MinIO连接配置
MINIO_CONFIG = {
'endpoint': 'minio.datacubeworld.com:9000',
'access_key': 'JOLXFXny3avFSzB0uRA5',
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
'secure': True
}
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
# 固定值
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
BUCKET_NAME = 'finyx'
TEMPLATE_BASE_DIR = 'template_finish'
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def extract_placeholders_from_docx(file_path: str) -> List[str]:
"""
从docx文件中提取所有占位符
Args:
file_path: docx文件路径
Returns:
占位符列表格式: ['field_code1', 'field_code2', ...]
"""
placeholders = set()
pattern = r'\{\{([^}]+)\}\}' # 匹配 {{field_code}} 格式
try:
doc = Document(file_path)
# 从段落中提取占位符
for paragraph in doc.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
# 从表格中提取占位符
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
except Exception as e:
print(f" 错误: 读取文件失败 - {str(e)}")
return []
return sorted(list(placeholders))
def normalize_template_name(file_name: str) -> str:
"""
标准化模板名称去掉扩展名括号内容数字前缀等
Args:
file_name: 文件名 "2.初步核实审批表XXX.docx"
Returns:
标准化后的名称 "初步核实审批表"
"""
# 去掉扩展名
name = Path(file_name).stem
# 去掉括号内容
name = re.sub(r'[(].*?[)]', '', name)
name = name.strip()
# 去掉数字前缀和点号
name = re.sub(r'^\d+[\.\-]?\s*', '', name)
name = name.strip()
return name
def scan_template_files(base_dir: str) -> Dict[str, Dict]:
"""
扫描模板文件夹提取所有模板文件信息
Args:
base_dir: 模板文件夹路径
Returns:
字典key为文件相对路径value为模板信息
"""
base_path = Path(base_dir)
if not base_path.exists():
print(f"错误: 目录不存在 - {base_dir}")
return {}
templates = {}
print("=" * 80)
print("扫描模板文件...")
print("=" * 80)
for docx_file in sorted(base_path.rglob("*.docx")):
# 跳过临时文件
if docx_file.name.startswith("~$"):
continue
relative_path = docx_file.relative_to(base_path)
file_name = docx_file.name
print(f"\n处理文件: {relative_path}")
# 提取占位符
placeholders = extract_placeholders_from_docx(str(docx_file))
print(f" 占位符数量: {len(placeholders)}")
if placeholders:
print(f" 占位符: {', '.join(placeholders[:10])}{'...' if len(placeholders) > 10 else ''}")
# 标准化模板名称
normalized_name = normalize_template_name(file_name)
templates[str(relative_path)] = {
'file_path': str(docx_file),
'relative_path': str(relative_path),
'file_name': file_name,
'normalized_name': normalized_name,
'placeholders': placeholders
}
print(f"\n总共扫描到 {len(templates)} 个模板文件")
return templates
def get_database_templates(conn) -> Dict[int, Dict]:
"""获取数据库中的所有模板配置"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, file_path, parent_id, state, input_data
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
result = {}
for template in templates:
result[template['id']] = {
'id': template['id'],
'name': template['name'],
'file_path': template['file_path'],
'parent_id': template['parent_id'],
'state': template['state'],
'input_data': template['input_data']
}
cursor.close()
return result
def get_database_fields(conn) -> Dict[str, Dict]:
"""
获取数据库中的所有字段定义
Returns:
字典key为field_codevalue为字段信息
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
fields = cursor.fetchall()
result = {}
for field in fields:
field_code = field['filed_code']
result[field_code] = {
'id': field['id'],
'name': field['name'],
'field_code': field_code,
'field_type': field['field_type'],
'state': field['state']
}
cursor.close()
return result
def match_placeholders_to_fields(placeholders: List[str], fields: Dict[str, Dict]) -> Tuple[List[int], List[str]]:
"""
匹配占位符到数据库字段
Args:
placeholders: 占位符列表field_code
fields: 数据库字段字典
Returns:
(匹配的字段ID列表, 未匹配的占位符列表)
"""
matched_field_ids = []
unmatched_placeholders = []
for placeholder in placeholders:
field = fields.get(placeholder)
if field:
# 只匹配输出字段field_type=2
if field['field_type'] == 2:
matched_field_ids.append(field['id'])
else:
print(f" [WARN] 警告: 占位符 {placeholder} 对应的字段类型为 {field['field_type']},不是输出字段")
unmatched_placeholders.append(placeholder)
else:
unmatched_placeholders.append(placeholder)
return matched_field_ids, unmatched_placeholders
def upload_to_minio(client: Minio, file_path: str, template_name: str) -> str:
"""上传文件到MinIO"""
try:
now = datetime.now()
object_name = f'{TENANT_ID}/TEMPLATE/{now.year}/{now.month:02d}/{template_name}'
client.fput_object(
BUCKET_NAME,
object_name,
file_path,
content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document'
)
return f"/{object_name}"
except Exception as e:
raise Exception(f"上传到MinIO失败: {str(e)}")
def find_template_by_name(conn, template_name: str) -> Optional[int]:
"""根据模板名称查找数据库中的模板ID"""
cursor = conn.cursor()
try:
sql = """
SELECT id FROM f_polic_file_config
WHERE tenant_id = %s AND name = %s
"""
cursor.execute(sql, (TENANT_ID, template_name))
result = cursor.fetchone()
return result[0] if result else None
finally:
cursor.close()
def create_or_update_template(conn, template_info: Dict, file_path: str, minio_path: str) -> int:
"""
创建或更新模板配置
Returns:
模板ID
"""
cursor = conn.cursor()
try:
# 检查是否已存在
existing_id = find_template_by_name(conn, template_info['normalized_name'])
# 准备input_data
input_data = json.dumps({
'template_code': template_info.get('template_code', ''),
'business_type': 'INVESTIGATION',
'placeholders': template_info['placeholders']
}, ensure_ascii=False)
if existing_id:
# 更新现有记录
update_sql = """
UPDATE f_polic_file_config
SET file_path = %s, input_data = %s, updated_time = NOW(), updated_by = %s, state = 1
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (
minio_path,
input_data,
UPDATED_BY,
existing_id,
TENANT_ID
))
print(f" [OK] 更新模板配置: {template_info['normalized_name']}, ID: {existing_id}")
conn.commit()
return existing_id
else:
# 创建新记录
template_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
template_id,
TENANT_ID,
template_info.get('parent_id'),
template_info['normalized_name'],
input_data,
minio_path,
CREATED_BY,
CREATED_BY,
1 # state: 1表示启用
))
print(f" [OK] 创建模板配置: {template_info['normalized_name']}, ID: {template_id}")
conn.commit()
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"创建或更新模板配置失败: {str(e)}")
finally:
cursor.close()
def update_template_field_relations(conn, template_id: int, field_ids: List[int]):
"""
更新模板和字段的关联关系
Args:
template_id: 模板ID
field_ids: 字段ID列表
"""
cursor = conn.cursor()
try:
# 删除旧的关联关系
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
deleted_count = cursor.rowcount
# 创建新的关联关系
created_count = 0
for field_id in field_ids:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
relation_id, TENANT_ID, template_id, field_id,
CREATED_BY, UPDATED_BY, 1 # state=1 表示启用
))
created_count += 1
conn.commit()
print(f" [OK] 更新字段关联: 删除 {deleted_count} 条,创建 {created_count}")
except Exception as e:
conn.rollback()
raise Exception(f"更新字段关联失败: {str(e)}")
finally:
cursor.close()
def mark_invalid_templates(conn, valid_template_names: Set[str]):
"""
标记无效的模板不在template_finish文件夹中的模板
Args:
conn: 数据库连接
valid_template_names: 有效的模板名称集合
"""
cursor = conn.cursor()
try:
# 查找所有模板
sql = """
SELECT id, name FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
all_templates = cursor.fetchall()
invalid_count = 0
for template in all_templates:
template_id = template[0]
template_name = template[1]
# 标准化名称进行匹配
normalized_name = normalize_template_name(template_name)
# 检查是否在有效模板列表中
is_valid = False
for valid_name in valid_template_names:
if normalized_name == normalize_template_name(valid_name) or normalized_name in valid_name or valid_name in normalized_name:
is_valid = True
break
if not is_valid:
# 标记为未启用
update_sql = """
UPDATE f_polic_file_config
SET state = 0, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (UPDATED_BY, template_id, TENANT_ID))
invalid_count += 1
print(f" [WARN] 标记无效模板: {template_name} (ID: {template_id})")
conn.commit()
print(f"\n总共标记 {invalid_count} 个无效模板")
except Exception as e:
conn.rollback()
raise Exception(f"标记无效模板失败: {str(e)}")
finally:
cursor.close()
def main():
"""主函数"""
print("=" * 80)
print("重新校验和更新模板配置")
print("=" * 80)
print()
try:
# 连接数据库和MinIO
print("1. 连接数据库和MinIO...")
conn = pymysql.connect(**DB_CONFIG)
minio_client = Minio(
MINIO_CONFIG['endpoint'],
access_key=MINIO_CONFIG['access_key'],
secret_key=MINIO_CONFIG['secret_key'],
secure=MINIO_CONFIG['secure']
)
# 检查存储桶
if not minio_client.bucket_exists(BUCKET_NAME):
print(f"错误: 存储桶 '{BUCKET_NAME}' 不存在")
return
print(f"[OK] 数据库连接成功")
print(f"[OK] MinIO存储桶 '{BUCKET_NAME}' 已存在\n")
# 扫描模板文件
print("2. 扫描模板文件...")
template_files = scan_template_files(TEMPLATE_BASE_DIR)
if not template_files:
print("错误: 未找到任何模板文件")
return
# 获取数据库中的模板和字段
print("\n3. 获取数据库中的模板和字段...")
db_templates = get_database_templates(conn)
db_fields = get_database_fields(conn)
print(f" 数据库中的模板数: {len(db_templates)}")
print(f" 数据库中的字段数: {len(db_fields)}")
# 标记无效模板
print("\n4. 标记无效模板...")
valid_template_names = {info['normalized_name'] for info in template_files.values()}
mark_invalid_templates(conn, valid_template_names)
# 处理每个模板文件
print("\n5. 处理模板文件...")
print("=" * 80)
success_count = 0
failed_count = 0
failed_files = []
for relative_path, template_info in template_files.items():
file_name = template_info['file_name']
normalized_name = template_info['normalized_name']
placeholders = template_info['placeholders']
file_path = template_info['file_path']
print(f"\n处理模板: {normalized_name}")
print(f" 文件: {relative_path}")
print(f" 占位符数量: {len(placeholders)}")
try:
# 匹配占位符到字段
matched_field_ids, unmatched_placeholders = match_placeholders_to_fields(placeholders, db_fields)
if unmatched_placeholders:
print(f" [WARN] 警告: {len(unmatched_placeholders)} 个占位符未匹配到字段:")
for placeholder in unmatched_placeholders[:5]: # 只显示前5个
print(f" - {{{{ {placeholder} }}}}")
if len(unmatched_placeholders) > 5:
print(f" ... 还有 {len(unmatched_placeholders) - 5}")
if not matched_field_ids:
print(f" [WARN] 警告: 没有匹配到任何字段,但仍会上传模板")
# 即使没有字段,也继续处理(上传模板和更新数据库)
print(f" [OK] 匹配到 {len(matched_field_ids)} 个字段")
# 上传到MinIO
print(f" 正在上传到MinIO...")
minio_path = upload_to_minio(minio_client, file_path, file_name)
print(f" [OK] 上传成功: {minio_path}")
# 创建或更新模板配置
print(f" 正在更新数据库...")
template_id = create_or_update_template(conn, template_info, file_path, minio_path)
# 更新字段关联(如果有匹配的字段)
if matched_field_ids:
update_template_field_relations(conn, template_id, matched_field_ids)
else:
# 即使没有字段,也删除旧的关联关系
cursor = conn.cursor()
try:
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
conn.commit()
print(f" [OK] 清理旧的字段关联: 删除 {cursor.rowcount}")
finally:
cursor.close()
success_count += 1
except Exception as e:
failed_count += 1
failed_files.append((file_name, str(e)))
print(f" [ERROR] 处理失败: {str(e)}")
# 打印汇总
print("\n" + "=" * 80)
print("处理汇总")
print("=" * 80)
print(f"总文件数: {len(template_files)}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
if failed_files:
print("\n失败的文件:")
for file_name, error in failed_files:
print(f" - {file_name}: {error}")
print("\n" + "=" * 80)
print("处理完成!")
print("=" * 80)
except Exception as e:
print(f"\n[ERROR] 发生错误: {e}")
import traceback
traceback.print_exc()
if 'conn' in locals():
conn.rollback()
finally:
if 'conn' in locals():
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,206 @@
"""
验证文档生成接口可以正确生成文档
测试模板和字段关联是否正确
"""
import sys
import os
import json
import pymysql
sys.path.insert(0, os.path.dirname(__file__))
from services.document_service import DocumentService
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
def get_template_by_name(conn, template_name: str):
"""根据模板名称获取模板信息"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, name, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s AND name = %s AND state = 1
"""
cursor.execute(sql, (TENANT_ID, template_name))
return cursor.fetchone()
finally:
cursor.close()
def get_template_fields(conn, file_id: int):
"""获取模板关联的字段"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT f.id, f.name, f.filed_code, f.field_type
FROM f_polic_field f
INNER JOIN f_polic_file_field fff ON f.id = fff.filed_id
WHERE fff.file_id = %s AND fff.tenant_id = %s AND fff.state = 1
ORDER BY f.field_type, f.filed_code
"""
cursor.execute(sql, (file_id, TENANT_ID))
return cursor.fetchall()
finally:
cursor.close()
def test_document_generation(template_name: str, test_data: list):
"""测试文档生成"""
print("=" * 80)
print(f"测试文档生成: {template_name}")
print("=" * 80)
# 连接数据库
conn = pymysql.connect(**DB_CONFIG)
try:
# 获取模板信息
template = get_template_by_name(conn, template_name)
if not template:
print(f"[ERROR] 未找到模板: {template_name}")
return False
print(f"\n模板信息:")
print(f" ID: {template['id']}")
print(f" 名称: {template['name']}")
print(f" 文件路径: {template['file_path']}")
print(f" 状态: {template['state']}")
# 获取模板关联的字段
fields = get_template_fields(conn, template['id'])
print(f"\n关联的字段数量: {len(fields)}")
if fields:
print(" 字段列表:")
for field in fields[:10]: # 只显示前10个
field_type = "输出字段" if field['field_type'] == 2 else "输入字段"
print(f" - {field['name']} ({field['filed_code']}) [{field_type}]")
if len(fields) > 10:
print(f" ... 还有 {len(fields) - 10} 个字段")
# 准备测试数据
print(f"\n测试数据字段数量: {len(test_data)}")
# 创建文档服务
doc_service = DocumentService()
# 准备文件信息
file_info = {
'fileId': template['id'],
'fileName': f"{template_name}.doc"
}
print(f"\n开始生成文档...")
# 生成文档
try:
result = doc_service.generate_document(
file_id=template['id'],
input_data=test_data,
file_info=file_info
)
print(f"[OK] 文档生成成功!")
print(f"\n生成结果:")
print(f" 文件路径: {result.get('filePath')}")
print(f" 文件名称: {result.get('fileName')}")
if result.get('downloadUrl'):
print(f" 下载URL: {result.get('downloadUrl')[:80]}...")
return True
except Exception as e:
print(f"[ERROR] 文档生成失败: {str(e)}")
import traceback
traceback.print_exc()
return False
finally:
conn.close()
def main():
"""主函数"""
print("=" * 80)
print("验证文档生成功能")
print("=" * 80)
print()
# 测试数据
test_data = [
{"fieldCode": "target_name", "fieldValue": "张三"},
{"fieldCode": "target_gender", "fieldValue": ""},
{"fieldCode": "target_age", "fieldValue": "44"},
{"fieldCode": "target_date_of_birth", "fieldValue": "198005"},
{"fieldCode": "target_organization_and_position", "fieldValue": "某公司总经理"},
{"fieldCode": "target_organization", "fieldValue": "某公司"},
{"fieldCode": "target_position", "fieldValue": "总经理"},
{"fieldCode": "target_education_level", "fieldValue": "本科"},
{"fieldCode": "target_political_status", "fieldValue": "中共党员"},
{"fieldCode": "target_professional_rank", "fieldValue": "正处级"},
{"fieldCode": "clue_source", "fieldValue": "群众举报"},
{"fieldCode": "target_issue_description", "fieldValue": "违反国家计划生育有关政策规定于2010年10月生育二胎。"},
{"fieldCode": "department_opinion", "fieldValue": "建议进行初步核实"},
{"fieldCode": "filler_name", "fieldValue": "李四"},
{"fieldCode": "target_id_number", "fieldValue": "110101198005011234"},
{"fieldCode": "target_contact", "fieldValue": "13800138000"},
{"fieldCode": "target_work_basic_info", "fieldValue": "在某公司工作10年担任总经理职务"},
{"fieldCode": "target_family_situation", "fieldValue": "已婚,有一子一女"},
{"fieldCode": "target_social_relations", "fieldValue": "社会关系简单"},
{"fieldCode": "investigation_unit_name", "fieldValue": "某市纪委监委"},
{"fieldCode": "investigation_team_leader_name", "fieldValue": "王五"},
{"fieldCode": "investigation_team_member_names", "fieldValue": "赵六、钱七"},
{"fieldCode": "investigation_team_code", "fieldValue": "DC2024001"},
{"fieldCode": "investigation_location", "fieldValue": "某市纪委监委谈话室"},
{"fieldCode": "appointment_time", "fieldValue": "2024年12月10日上午9:00"},
{"fieldCode": "appointment_location", "fieldValue": "某市纪委监委谈话室"},
{"fieldCode": "approval_time", "fieldValue": "2024年12月9日"},
{"fieldCode": "handling_department", "fieldValue": "某市纪委监委第一监督检查室"},
{"fieldCode": "handler_name", "fieldValue": "王五"},
]
# 测试几个关键模板
test_templates = [
"初步核实审批表",
"请示报告卡",
"谈话通知书第一联",
"谈话前安全风险评估表"
]
success_count = 0
failed_count = 0
for template_name in test_templates:
print()
success = test_document_generation(template_name, test_data)
if success:
success_count += 1
else:
failed_count += 1
print()
# 打印汇总
print("=" * 80)
print("测试汇总")
print("=" * 80)
print(f"总测试数: {len(test_templates)}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print("=" * 80)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,531 @@
"""
检查模板的 file_id 和相关关联关系是否正确
重点检查
1. f_polic_file_config 表中的模板记录file_id
2. f_polic_file_field 表中的关联关系file_id filed_id 的对应关系
"""
import sys
import pymysql
from pathlib import Path
from typing import Dict, List, Set, Tuple
from collections import defaultdict
# 设置控制台编码为UTF-8Windows兼容
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8')
sys.stderr.reconfigure(encoding='utf-8')
except:
pass
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
# 固定值
TENANT_ID = 615873064429507639
# 项目根目录
PROJECT_ROOT = Path(__file__).parent
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
# 文档类型映射(用于识别模板)
DOCUMENT_TYPE_MAPPING = {
"1.请示报告卡XXX": "REPORT_CARD",
"2.初步核实审批表XXX": "PRELIMINARY_VERIFICATION_APPROVAL",
"3.附件初核方案(XXX)": "INVESTIGATION_PLAN",
"谈话通知书第一联": "NOTIFICATION_LETTER_1",
"谈话通知书第二联": "NOTIFICATION_LETTER_2",
"谈话通知书第三联": "NOTIFICATION_LETTER_3",
"1.请示报告卡(初核谈话)": "REPORT_CARD_INTERVIEW",
"2谈话审批表": "INTERVIEW_APPROVAL_FORM",
"3.谈话前安全风险评估表": "PRE_INTERVIEW_RISK_ASSESSMENT",
"4.谈话方案": "INTERVIEW_PLAN",
"5.谈话后安全风险评估表": "POST_INTERVIEW_RISK_ASSESSMENT",
"1.谈话笔录": "INTERVIEW_RECORD",
"2.谈话询问对象情况摸底调查30问": "INVESTIGATION_30_QUESTIONS",
"3.被谈话人权利义务告知书": "RIGHTS_OBLIGATIONS_NOTICE",
"4.点对点交接单": "HANDOVER_FORM",
"4.点对点交接单2": "HANDOVER_FORM_2",
"5.陪送交接单(新)": "ESCORT_HANDOVER_FORM",
"6.1保密承诺书(谈话对象使用-非中共党员用)": "CONFIDENTIALITY_COMMITMENT_NON_PARTY",
"6.2保密承诺书(谈话对象使用-中共党员用)": "CONFIDENTIALITY_COMMITMENT_PARTY",
"7.办案人员-办案安全保密承诺书": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT",
"8-1请示报告卡初核报告结论 ": "REPORT_CARD_CONCLUSION",
"8.XXX初核情况报告": "INVESTIGATION_REPORT"
}
def get_template_files() -> Dict[str, Path]:
"""获取所有模板文件"""
templates = {}
if not TEMPLATES_DIR.exists():
return templates
for root, dirs, files in os.walk(TEMPLATES_DIR):
for file in files:
if file.endswith('.docx') and not file.startswith('~$'):
file_path = Path(root) / file
base_name = Path(file).stem
if base_name in DOCUMENT_TYPE_MAPPING:
templates[base_name] = file_path
return templates
def check_file_configs(conn) -> Dict:
"""检查 f_polic_file_config 表中的模板记录"""
print("\n" + "="*80)
print("1. 检查 f_polic_file_config 表中的模板记录")
print("="*80)
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 查询所有模板记录
cursor.execute("""
SELECT id, name, template_code, file_path, state, parent_id
FROM f_polic_file_config
WHERE tenant_id = %s
ORDER BY name
""", (TENANT_ID,))
all_configs = cursor.fetchall()
# 按 template_code 和 name 组织数据
configs_by_code = {}
configs_by_name = {}
for config in all_configs:
config_id = config['id']
name = config['name']
template_code = config.get('template_code')
if template_code:
if template_code not in configs_by_code:
configs_by_code[template_code] = []
configs_by_code[template_code].append(config)
if name:
if name not in configs_by_name:
configs_by_name[name] = []
configs_by_name[name].append(config)
print(f"\n总模板记录数: {len(all_configs)}")
print(f"按 template_code 分组: {len(configs_by_code)} 个不同的 template_code")
print(f"按 name 分组: {len(configs_by_name)} 个不同的 name")
# 检查重复的 template_code
duplicate_codes = {code: configs for code, configs in configs_by_code.items() if len(configs) > 1}
if duplicate_codes:
print(f"\n[WARN] 发现重复的 template_code ({len(duplicate_codes)} 个):")
for code, configs in duplicate_codes.items():
print(f" - {code}: {len(configs)} 条记录")
for cfg in configs:
print(f" ID: {cfg['id']}, 名称: {cfg['name']}, 路径: {cfg.get('file_path', 'N/A')}")
# 检查重复的 name
duplicate_names = {name: configs for name, configs in configs_by_name.items() if len(configs) > 1}
if duplicate_names:
print(f"\n[WARN] 发现重复的 name ({len(duplicate_names)} 个):")
for name, configs in duplicate_names.items():
print(f" - {name}: {len(configs)} 条记录")
for cfg in configs:
print(f" ID: {cfg['id']}, template_code: {cfg.get('template_code', 'N/A')}, 路径: {cfg.get('file_path', 'N/A')}")
# 检查未启用的记录
disabled_configs = [cfg for cfg in all_configs if cfg.get('state') != 1]
if disabled_configs:
print(f"\n[WARN] 发现未启用的模板记录 ({len(disabled_configs)} 个):")
for cfg in disabled_configs:
print(f" - ID: {cfg['id']}, 名称: {cfg['name']}, 状态: {cfg.get('state')}")
# 检查 file_path 为空的记录
empty_path_configs = [cfg for cfg in all_configs if not cfg.get('file_path')]
if empty_path_configs:
print(f"\n[WARN] 发现 file_path 为空的记录 ({len(empty_path_configs)} 个):")
for cfg in empty_path_configs:
print(f" - ID: {cfg['id']}, 名称: {cfg['name']}, template_code: {cfg.get('template_code', 'N/A')}")
cursor.close()
return {
'all_configs': all_configs,
'configs_by_code': configs_by_code,
'configs_by_name': configs_by_name,
'duplicate_codes': duplicate_codes,
'duplicate_names': duplicate_names,
'disabled_configs': disabled_configs,
'empty_path_configs': empty_path_configs
}
def check_file_field_relations(conn) -> Dict:
"""检查 f_polic_file_field 表中的关联关系"""
print("\n" + "="*80)
print("2. 检查 f_polic_file_field 表中的关联关系")
print("="*80)
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 查询所有关联关系
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, fff.state, fff.tenant_id
FROM f_polic_file_field fff
WHERE fff.tenant_id = %s
ORDER BY fff.file_id, fff.filed_id
""", (TENANT_ID,))
all_relations = cursor.fetchall()
print(f"\n总关联关系数: {len(all_relations)}")
# 检查无效的 file_id关联到不存在的文件配置
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
WHERE fff.tenant_id = %s AND fc.id IS NULL
""", (TENANT_ID,))
invalid_file_relations = cursor.fetchall()
# 检查无效的 filed_id关联到不存在的字段
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND f.id IS NULL
""", (TENANT_ID,))
invalid_field_relations = cursor.fetchall()
# 检查重复的关联关系(相同的 file_id 和 filed_id
cursor.execute("""
SELECT file_id, filed_id, COUNT(*) as count, GROUP_CONCAT(id ORDER BY id) as ids
FROM f_polic_file_field
WHERE tenant_id = %s
GROUP BY file_id, filed_id
HAVING COUNT(*) > 1
""", (TENANT_ID,))
duplicate_relations = cursor.fetchall()
# 检查关联到未启用文件的记录
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, fc.name as file_name, fc.state as file_state
FROM f_polic_file_field fff
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
WHERE fff.tenant_id = %s AND fc.state != 1
""", (TENANT_ID,))
disabled_file_relations = cursor.fetchall()
# 检查关联到未启用字段的记录
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, f.name as field_name, f.filed_code, f.state as field_state
FROM f_polic_file_field fff
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND f.state != 1
""", (TENANT_ID,))
disabled_field_relations = cursor.fetchall()
# 统计每个文件关联的字段数量
file_field_counts = defaultdict(int)
for rel in all_relations:
file_field_counts[rel['file_id']] += 1
print(f"\n文件关联字段统计:")
print(f" 有关联关系的文件数: {len(file_field_counts)}")
if file_field_counts:
max_count = max(file_field_counts.values())
min_count = min(file_field_counts.values())
avg_count = sum(file_field_counts.values()) / len(file_field_counts)
print(f" 每个文件关联字段数: 最少 {min_count}, 最多 {max_count}, 平均 {avg_count:.1f}")
# 输出检查结果
if invalid_file_relations:
print(f"\n[ERROR] 发现无效的 file_id 关联 ({len(invalid_file_relations)} 条):")
for rel in invalid_file_relations[:10]: # 只显示前10条
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
if len(invalid_file_relations) > 10:
print(f" ... 还有 {len(invalid_file_relations) - 10}")
else:
print(f"\n[OK] 所有 file_id 关联都有效")
if invalid_field_relations:
print(f"\n[ERROR] 发现无效的 filed_id 关联 ({len(invalid_field_relations)} 条):")
for rel in invalid_field_relations[:10]: # 只显示前10条
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
if len(invalid_field_relations) > 10:
print(f" ... 还有 {len(invalid_field_relations) - 10}")
else:
print(f"\n[OK] 所有 filed_id 关联都有效")
if duplicate_relations:
print(f"\n[WARN] 发现重复的关联关系 ({len(duplicate_relations)} 组):")
for dup in duplicate_relations[:10]: # 只显示前10组
print(f" - file_id: {dup['file_id']}, filed_id: {dup['filed_id']}, 重复次数: {dup['count']}, 关联ID: {dup['ids']}")
if len(duplicate_relations) > 10:
print(f" ... 还有 {len(duplicate_relations) - 10}")
else:
print(f"\n[OK] 没有重复的关联关系")
if disabled_file_relations:
print(f"\n[WARN] 发现关联到未启用文件的记录 ({len(disabled_file_relations)} 条):")
for rel in disabled_file_relations[:10]:
print(f" - 文件: {rel['file_name']} (ID: {rel['file_id']}, 状态: {rel['file_state']})")
if len(disabled_file_relations) > 10:
print(f" ... 还有 {len(disabled_file_relations) - 10}")
if disabled_field_relations:
print(f"\n[WARN] 发现关联到未启用字段的记录 ({len(disabled_field_relations)} 条):")
for rel in disabled_field_relations[:10]:
print(f" - 字段: {rel['field_name']} ({rel['filed_code']}, ID: {rel['filed_id']}, 状态: {rel['field_state']})")
if len(disabled_field_relations) > 10:
print(f" ... 还有 {len(disabled_field_relations) - 10}")
cursor.close()
return {
'all_relations': all_relations,
'invalid_file_relations': invalid_file_relations,
'invalid_field_relations': invalid_field_relations,
'duplicate_relations': duplicate_relations,
'disabled_file_relations': disabled_file_relations,
'disabled_field_relations': disabled_field_relations,
'file_field_counts': dict(file_field_counts)
}
def check_template_file_mapping(conn, file_configs: Dict) -> Dict:
"""检查模板文件与数据库记录的映射关系"""
print("\n" + "="*80)
print("3. 检查模板文件与数据库记录的映射关系")
print("="*80)
import os
templates = get_template_files()
print(f"\n本地模板文件数: {len(templates)}")
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 检查每个模板文件是否在数据库中有对应记录
missing_in_db = []
found_in_db = []
duplicate_mappings = []
for template_name, file_path in templates.items():
template_code = DOCUMENT_TYPE_MAPPING.get(template_name)
if not template_code:
continue
# 通过 name 和 template_code 查找对应的数据库记录
# 优先通过 name 精确匹配,然后通过 template_code 匹配
matching_configs = []
# 1. 通过 name 精确匹配
if template_name in file_configs['configs_by_name']:
for config in file_configs['configs_by_name'][template_name]:
if config.get('file_path'): # 有文件路径的记录
matching_configs.append(config)
# 2. 通过 template_code 匹配
if template_code in file_configs['configs_by_code']:
for config in file_configs['configs_by_code'][template_code]:
if config.get('file_path') and config not in matching_configs:
matching_configs.append(config)
if len(matching_configs) == 0:
missing_in_db.append({
'template_name': template_name,
'template_code': template_code,
'file_path': str(file_path)
})
elif len(matching_configs) == 1:
config = matching_configs[0]
found_in_db.append({
'template_name': template_name,
'template_code': template_code,
'file_id': config['id'],
'file_path': config.get('file_path'),
'name': config.get('name')
})
else:
# 多个匹配,选择 file_path 最新的(包含最新日期的)
duplicate_mappings.append({
'template_name': template_name,
'template_code': template_code,
'matching_configs': matching_configs
})
# 仍然记录第一个作为找到的记录
config = matching_configs[0]
found_in_db.append({
'template_name': template_name,
'template_code': template_code,
'file_id': config['id'],
'file_path': config.get('file_path'),
'name': config.get('name'),
'is_duplicate': True
})
print(f"\n找到数据库记录的模板: {len(found_in_db)}")
print(f"未找到数据库记录的模板: {len(missing_in_db)}")
print(f"有重复映射的模板: {len(duplicate_mappings)}")
if duplicate_mappings:
print(f"\n[WARN] 以下模板文件在数据库中有多个匹配记录:")
for item in duplicate_mappings:
print(f" - {item['template_name']} (template_code: {item['template_code']}):")
for cfg in item['matching_configs']:
print(f" * file_id: {cfg['id']}, name: {cfg.get('name')}, path: {cfg.get('file_path', 'N/A')}")
if missing_in_db:
print(f"\n[WARN] 以下模板文件在数据库中没有对应记录:")
for item in missing_in_db:
print(f" - {item['template_name']} (template_code: {item['template_code']})")
cursor.close()
return {
'found_in_db': found_in_db,
'missing_in_db': missing_in_db,
'duplicate_mappings': duplicate_mappings
}
def check_field_type_consistency(conn, relations: Dict) -> Dict:
"""检查关联关系的字段类型一致性"""
print("\n" + "="*80)
print("4. 检查关联关系的字段类型一致性")
print("="*80)
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 查询所有关联关系及其字段类型
cursor.execute("""
SELECT
fff.id,
fff.file_id,
fff.filed_id,
fc.name as file_name,
f.name as field_name,
f.filed_code,
f.field_type,
CASE
WHEN f.field_type = 1 THEN '输入字段'
WHEN f.field_type = 2 THEN '输出字段'
ELSE '未知'
END as field_type_name
FROM f_polic_file_field fff
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s
ORDER BY fff.file_id, f.field_type, f.name
""", (TENANT_ID,))
all_relations_with_type = cursor.fetchall()
# 统计字段类型分布
input_fields = [r for r in all_relations_with_type if r['field_type'] == 1]
output_fields = [r for r in all_relations_with_type if r['field_type'] == 2]
print(f"\n字段类型统计:")
print(f" 输入字段 (field_type=1): {len(input_fields)} 条关联")
print(f" 输出字段 (field_type=2): {len(output_fields)} 条关联")
# 按文件统计
file_type_counts = defaultdict(lambda: {'input': 0, 'output': 0})
for rel in all_relations_with_type:
file_id = rel['file_id']
if rel['field_type'] == 1:
file_type_counts[file_id]['input'] += 1
elif rel['field_type'] == 2:
file_type_counts[file_id]['output'] += 1
print(f"\n每个文件的字段类型分布:")
for file_id, counts in sorted(file_type_counts.items())[:10]: # 只显示前10个
print(f" 文件ID {file_id}: 输入字段 {counts['input']} 个, 输出字段 {counts['output']}")
if len(file_type_counts) > 10:
print(f" ... 还有 {len(file_type_counts) - 10} 个文件")
cursor.close()
return {
'input_fields': input_fields,
'output_fields': output_fields,
'file_type_counts': dict(file_type_counts)
}
def main():
"""主函数"""
print("="*80)
print("检查模板的 file_id 和相关关联关系")
print("="*80)
# 连接数据库
try:
conn = pymysql.connect(**DB_CONFIG)
print("\n[OK] 数据库连接成功")
except Exception as e:
print(f"\n[ERROR] 数据库连接失败: {e}")
return
try:
# 1. 检查文件配置表
file_configs = check_file_configs(conn)
# 2. 检查文件字段关联表
relations = check_file_field_relations(conn)
# 3. 检查模板文件与数据库记录的映射
template_mapping = check_template_file_mapping(conn, file_configs)
# 4. 检查字段类型一致性
field_type_info = check_field_type_consistency(conn, relations)
# 汇总报告
print("\n" + "="*80)
print("检查汇总")
print("="*80)
issues = []
if file_configs['duplicate_codes']:
issues.append(f"发现 {len(file_configs['duplicate_codes'])} 个重复的 template_code")
if file_configs['duplicate_names']:
issues.append(f"发现 {len(file_configs['duplicate_names'])} 个重复的 name")
if file_configs['empty_path_configs']:
issues.append(f"发现 {len(file_configs['empty_path_configs'])} 个 file_path 为空的记录")
if relations['invalid_file_relations']:
issues.append(f"发现 {len(relations['invalid_file_relations'])} 条无效的 file_id 关联")
if relations['invalid_field_relations']:
issues.append(f"发现 {len(relations['invalid_field_relations'])} 条无效的 filed_id 关联")
if relations['duplicate_relations']:
issues.append(f"发现 {len(relations['duplicate_relations'])} 组重复的关联关系")
if template_mapping['missing_in_db']:
issues.append(f"发现 {len(template_mapping['missing_in_db'])} 个模板文件在数据库中没有对应记录")
if issues:
print("\n[WARN] 发现以下问题:")
for issue in issues:
print(f" - {issue}")
else:
print("\n[OK] 未发现严重问题")
print(f"\n总模板记录数: {len(file_configs['all_configs'])}")
print(f"总关联关系数: {len(relations['all_relations'])}")
print(f"有关联关系的文件数: {len(relations['file_field_counts'])}")
finally:
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
import os
main()

View File

@ -0,0 +1,167 @@
# 模板校验和更新总结
## 任务完成情况
✅ **已完成所有任务**
1. ✅ 重新校验数据库中模板和数据字段对应关系
2. ✅ 删除旧的或者无效的模板信息
3. ✅ 根据template_finish文件夹下的模板文件重新上传模板到MinIO
4. ✅ 更新数据库内相关数据
5. ✅ 确保文档生成接口可以正确生成文档
## 执行结果
### 1. 模板文件扫描
- **扫描到的模板文件**: 21个
- **位置**: `template_finish/` 文件夹
### 2. 数据库更新
- **数据库中的模板数**: 50个更新前
- **标记为无效的模板**: 3个
- 2-初核模版
- 走读式谈话审批
- 走读式谈话流程
### 3. 模板处理结果
- **成功处理**: 21个模板
- **失败**: 0个
- **上传到MinIO**: 21个模板文件
- **更新数据库配置**: 21个模板记录
- **建立字段关联**: 18个模板3个模板没有占位符不需要字段关联
### 4. 字段关联统计
- **总关联字段数**: 约100+条关联关系
- **匹配的占位符**: 所有占位符都成功匹配到数据库字段
- **字段类型**: 只关联输出字段field_type=2
## 处理的模板列表
### 初核请示类
1. ✅ 请示报告卡 - 2个字段
2. ✅ 初步核实审批表 - 9个字段
3. ✅ 附件初核方案 - 8个字段
### 谈话审批类
4. ✅ 谈话通知书第一联 - 9个字段
5. ✅ 谈话通知书第二联 - 3个字段
6. ✅ 谈话通知书第三联 - 3个字段
7. ✅ 请示报告卡(初核谈话)- 3个字段
8. ✅ 谈话审批表 - 5个字段
9. ✅ 谈话前安全风险评估表 - 7个字段
10. ✅ 谈话方案 - 3个字段
11. ✅ 谈话后安全风险评估表 - 6个字段
### 谈话流程类
12. ✅ 谈话笔录 - 6个字段
13. ✅ 谈话询问对象情况摸底调查30问 - 11个字段
14. ✅ 被谈话人权利义务告知书 - 0个字段无占位符
15. ✅ 点对点交接单 - 2个字段
16. ✅ 陪送交接单 - 4个字段
17. ✅ 保密承诺书(非中共党员用)- 5个字段
18. ✅ 保密承诺书(中共党员用)- 4个字段
19. ✅ 办案人员-办案安全保密承诺书 - 1个字段
### 初核结论类
20. ✅ 请示报告卡(初核报告结论)- 0个字段无占位符
21. ✅ XXX初核情况报告 - 0个字段无占位符
## 验证测试结果
### 文档生成接口测试
测试了4个关键模板的文档生成功能
1. ✅ **初步核实审批表** - 生成成功
- 关联字段: 9个
- 文档名称: 初步核实审批表_张三.docx
- 文件路径: /615873064429507639/20251211120603/初步核实审批表_张三.docx
2. ✅ **请示报告卡** - 生成成功
- 关联字段: 3个
- 文档名称: 请示报告卡_张三.docx
- 文件路径: /615873064429507639/20251211120604/请示报告卡_张三.docx
3. ✅ **谈话通知书第一联** - 生成成功
- 关联字段: 9个
- 文档名称: 谈话通知书第一联_张三.docx
- 文件路径: /615873064429507639/20251211120605/谈话通知书第一联_张三.docx
4. ✅ **谈话前安全风险评估表** - 生成成功
- 关联字段: 7个
- 文档名称: 谈话前安全风险评估表_张三.docx
- 文件路径: /615873064429507639/20251211120606/谈话前安全风险评估表_张三.docx
**测试结果**: 4/4 成功 ✅
## 关键功能验证
### ✅ 文档名称生成
- 文档名称格式: `{模板名称}_{被核查人姓名}.docx`
- 示例: `初步核实审批表_张三.docx`
- **验证通过**: 文档名称正确生成
### ✅ 占位符替换
- 占位符格式: `{{field_code}}`
- 替换逻辑: 根据inputData中的fieldCode匹配并替换
- **验证通过**: 占位符可以正确替换
### ✅ 字段关联
- 关联表: `f_polic_file_field`
- 关联字段: 只关联输出字段field_type=2
- **验证通过**: 字段关联关系正确建立
### ✅ MinIO存储
- 存储路径: `/615873064429507639/TEMPLATE/{年}/{月}/{文件名}`
- 下载URL: 预签名URL7天有效
- **验证通过**: 文件成功上传并可下载
## 数据库表更新情况
### f_polic_file_config 表
- **更新**: 21条记录
- **新增**: 部分模板创建了新记录
- **更新**: 部分模板更新了file_path和input_data
- **状态**: 所有模板状态为1启用
### f_polic_file_field 表
- **删除**: 旧的关联关系已删除
- **创建**: 新的关联关系已建立
- **关联字段数**: 约100+条关联关系
### f_polic_field 表
- **未修改**: 字段定义表未修改
- **字段总数**: 78个字段
## 脚本文件
### 主要脚本
1. **validate_and_update_templates.py** - 主脚本
- 扫描模板文件
- 提取占位符
- 匹配字段
- 上传到MinIO
- 更新数据库
2. **verify_document_generation.py** - 验证脚本
- 测试文档生成功能
- 验证字段关联
- 验证占位符替换
## 注意事项
1. **无占位符的模板**: 3个模板没有占位符已上传到MinIO并创建数据库记录但不建立字段关联
2. **模板名称标准化**: 脚本会自动标准化模板名称(去掉括号、数字前缀等)
3. **字段匹配**: 只匹配输出字段field_type=2输入字段不建立关联
4. **无效模板**: 不在template_finish文件夹中的模板会被标记为无效state=0
## 后续建议
1. **定期校验**: 建议定期运行 `validate_and_update_templates.py` 脚本,确保模板和字段关联关系正确
2. **新增模板**: 新增模板时,确保模板文件放在 `template_finish` 文件夹中,然后运行脚本
3. **字段管理**: 如果新增字段,需要确保字段已添加到 `f_polic_field` 表中,且 `field_type=2`(输出字段)
4. **测试验证**: 每次更新模板后,建议运行 `verify_document_generation.py` 验证文档生成功能
## 完成时间
2025年12月11日