""" 测试文档生成接口 1. 测试模板读取功能 2. 测试占位符识别和替换 3. 验证生成的文档 """ import os import pymysql from pathlib import Path from typing import Dict, List, Set from dotenv import load_dotenv import re from docx import Document import requests import json import tempfile import shutil # 加载环境变量 load_dotenv() # 数据库配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } # API配置 API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:5000') # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" def print_section(title): """打印章节标题""" print("\n" + "="*70) print(f" {title}") print("="*70) def print_result(success, message): """打印结果""" status = "[OK]" if success else "[FAIL]" print(f"{status} {message}") def get_actual_tenant_id(conn) -> int: """获取数据库中的实际tenant_id""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: cursor.execute("SELECT DISTINCT tenant_id FROM f_polic_file_config LIMIT 1") result = cursor.fetchone() if result: return result['tenant_id'] return 1 finally: cursor.close() def get_test_templates(conn, tenant_id: int, limit: int = 5) -> List[Dict]: """获取测试用的模板列表""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, name, file_path FROM f_polic_file_config WHERE tenant_id = %s AND file_path IS NOT NULL AND file_path != '' AND file_path LIKE 'template_finish/%%' AND state = 1 ORDER BY id LIMIT %s """ cursor.execute(sql, (tenant_id, limit)) templates = cursor.fetchall() return templates finally: cursor.close() def extract_placeholders_from_docx(file_path: Path) -> Set[str]: """从docx文件中提取所有占位符""" placeholders = set() placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}') try: doc = Document(file_path) # 从段落中提取 for paragraph in doc.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: placeholders.add(field_code) # 从表格中提取 for table in doc.tables: try: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: placeholders.add(field_code) except: continue except Exception as e: print(f" [错误] 读取文件失败: {str(e)}") return placeholders def check_template_file_exists(file_path: str) -> bool: """检查模板文件是否存在""" local_file = PROJECT_ROOT / file_path return local_file.exists() and local_file.is_file() def test_template_reading(file_path: str) -> tuple[bool, str, Set[str]]: """测试模板读取功能""" local_file = PROJECT_ROOT / file_path if not local_file.exists(): return False, f"文件不存在: {local_file}", set() try: # 尝试读取文档 doc = Document(local_file) # 提取占位符 placeholders = extract_placeholders_from_docx(local_file) return True, "读取成功", placeholders except Exception as e: return False, f"读取失败: {str(e)}", set() def test_document_generation_api(template_id: int, template_name: str, file_path: str, placeholders: Set[str], api_url: str) -> Dict: """测试文档生成API""" # 构建测试数据 input_data = [] test_values = { 'target_name': '张三', 'target_organization': '测试单位', 'target_position': '测试职务', 'target_organization_and_position': '测试单位-测试职务', 'target_age': '35', 'target_gender': '男', 'target_contact': '13800138000', 'target_address': '测试地址', 'target_political_status': '中共党员', 'investigation_team_code': 'DC2025001', 'investigation_team_leader_name': '李四', 'investigation_team_member_names': '王五、赵六', 'investigation_unit_name': '调查单位', 'appointment_time': '2025-12-16 14:00', 'appointment_location': '会议室A', 'approval_time': '2025-12-16', 'clue_source': '群众举报', 'handler_name': '处理人', 'handling_department': '处理部门', 'department_opinion': '同意', 'investigation_location': '调查地点', 'target_basic_info': '基本信息', 'target_contact': '联系方式', 'target_date_of_birth': '1990-01-01', 'target_date_of_birth_full': '1990年1月1日', 'target_education': '本科', 'target_education_level': '大学', 'target_ethnicity': '汉族', 'target_family_situation': '家庭情况', 'target_id_number': '110101199001011234', 'target_issue_description': '问题描述', 'target_place_of_origin': '北京', 'target_professional_rank': '正科级', 'target_registered_address': '户籍地址', 'target_social_relations': '社会关系', 'target_work_basic_info': '工作基本信息' } for placeholder in sorted(placeholders): value = test_values.get(placeholder, f'测试值_{placeholder}') input_data.append({ "fieldCode": placeholder, "fieldValue": value }) payload = { "fileId": template_id, "inputData": input_data } try: print(f" 请求URL: {api_url}/api/document/generate") print(f" 请求体: {json.dumps(payload, ensure_ascii=False, indent=2)}") response = requests.post( f"{api_url}/api/document/generate", json=payload, timeout=60 ) result = { 'success': response.status_code == 200, 'status_code': response.status_code, 'response': None, 'error': None } if response.status_code == 200: result['response'] = response.json() else: result['error'] = response.text return result except requests.exceptions.ConnectionError: return { 'success': False, 'status_code': None, 'response': None, 'error': '无法连接到API服务器,请确保服务已启动' } except requests.exceptions.Timeout: return { 'success': False, 'status_code': None, 'response': None, 'error': '请求超时' } except Exception as e: return { 'success': False, 'status_code': None, 'response': None, 'error': f'请求异常: {str(e)}' } def verify_generated_document(file_path: str, placeholders: Set[str]) -> Dict: """验证生成的文档(如果能够下载)""" # 这里可以添加下载和验证逻辑 # 目前只返回占位符验证信息 return { 'placeholders_found': len(placeholders), 'placeholders_list': sorted(placeholders) } def main(): """主函数""" print_section("文档生成接口测试") # 1. 连接数据库 print_section("1. 连接数据库") try: conn = pymysql.connect(**DB_CONFIG) print_result(True, "数据库连接成功") except Exception as e: print_result(False, f"数据库连接失败: {str(e)}") return try: # 2. 获取实际的tenant_id tenant_id = get_actual_tenant_id(conn) print(f" 实际tenant_id: {tenant_id}") # 3. 获取测试模板 print_section("2. 获取测试模板") test_templates = get_test_templates(conn, tenant_id, limit=5) print_result(True, f"找到 {len(test_templates)} 个测试模板") if not test_templates: print_result(False, "没有找到可测试的模板") return # 4. 测试每个模板 print_section("3. 测试模板读取和占位符识别") test_results = [] for i, template in enumerate(test_templates, 1): template_id = template['id'] template_name = template['name'] file_path = template['file_path'] print(f"\n 模板 {i}: {template_name}") print(f" ID: {template_id}") print(f" 路径: {file_path}") # 检查文件是否存在 if not check_template_file_exists(file_path): print_result(False, f"文件不存在: {file_path}") test_results.append({ 'template_id': template_id, 'template_name': template_name, 'file_path': file_path, 'file_exists': False, 'read_success': False, 'placeholders': set(), 'api_test': None }) continue print_result(True, "文件存在") # 测试模板读取 read_success, read_message, placeholders = test_template_reading(file_path) print_result(read_success, read_message) if read_success: print(f" 占位符数量: {len(placeholders)}") if placeholders: print(f" 占位符: {sorted(placeholders)}") else: print(f" [警告] 未找到占位符") test_results.append({ 'template_id': template_id, 'template_name': template_name, 'file_path': file_path, 'file_exists': True, 'read_success': read_success, 'placeholders': placeholders, 'api_test': None }) # 5. 测试API接口 print_section("4. 测试文档生成API接口") # 检查API是否可用 try: response = requests.get(f"{API_BASE_URL}/api/file-configs", timeout=5) api_available = response.status_code == 200 except: api_available = False if not api_available: print_result(False, f"API服务不可用: {API_BASE_URL}") print(" [提示] 请确保Flask服务已启动") print(" [提示] 可以手动测试API,使用以下命令:") print(f" python app.py") else: print_result(True, f"API服务可用: {API_BASE_URL}") # 选择有占位符的模板进行API测试 templates_with_placeholders = [r for r in test_results if r['read_success'] and r['placeholders']] if templates_with_placeholders: test_template = templates_with_placeholders[0] print(f"\n 测试模板: {test_template['template_name']}") print(f" 模板ID: {test_template['template_id']}") print(f" 占位符: {sorted(test_template['placeholders'])}") api_result = test_document_generation_api( test_template['template_id'], test_template['template_name'], test_template['file_path'], test_template['placeholders'], API_BASE_URL ) test_template['api_test'] = api_result if api_result['success']: print_result(True, "API调用成功") print(f" 响应: {json.dumps(api_result['response'], ensure_ascii=False, indent=2)}") else: print_result(False, f"API调用失败: {api_result.get('error', '未知错误')}") if api_result.get('status_code'): print(f" 状态码: {api_result['status_code']}") else: print_result(False, "没有找到有占位符的模板进行API测试") # 6. 生成测试报告 print_section("5. 测试结果汇总") total_templates = len(test_results) file_exists_count = sum(1 for r in test_results if r['file_exists']) read_success_count = sum(1 for r in test_results if r['read_success']) with_placeholders_count = sum(1 for r in test_results if r['placeholders']) api_tested_count = sum(1 for r in test_results if r.get('api_test') is not None) api_success_count = sum(1 for r in test_results if r.get('api_test') and r.get('api_test', {}).get('success', False)) print(f" 测试模板总数: {total_templates}") print(f" 文件存在: {file_exists_count}/{total_templates}") print(f" 读取成功: {read_success_count}/{total_templates}") print(f" 有占位符: {with_placeholders_count}/{total_templates}") print(f" API测试: {api_tested_count}/{total_templates}") if api_tested_count > 0: print(f" API成功: {api_success_count}/{api_tested_count}") # 7. 详细结果 print_section("6. 详细测试结果") for i, result in enumerate(test_results, 1): print(f"\n 模板 {i}: {result['template_name']}") print(f" ID: {result['template_id']}") print(f" 文件存在: {'是' if result['file_exists'] else '否'}") print(f" 读取成功: {'是' if result['read_success'] else '否'}") print(f" 占位符数量: {len(result['placeholders'])}") if result['api_test']: api_test = result['api_test'] print(f" API测试: {'成功' if api_test['success'] else '失败'}") if not api_test['success']: print(f" 错误: {api_test.get('error', '未知错误')}") finally: conn.close() print_result(True, "数据库连接已关闭") print_section("测试完成") if __name__ == "__main__": main()