ai-business-write/test_document_generation_api.py
2025-12-26 09:16:31 +08:00

429 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
测试文档生成接口
1. 测试模板读取功能
2. 测试占位符识别和替换
3. 验证生成的文档
"""
import os
import pymysql
from pathlib import Path
from typing import Dict, List, Set
from dotenv import load_dotenv
import re
from docx import Document
import requests
import json
import tempfile
import shutil
# 加载环境变量
load_dotenv()
# 数据库配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
# API配置
API_BASE_URL = os.getenv('API_BASE_URL', 'http://localhost:5000')
# 项目根目录
PROJECT_ROOT = Path(__file__).parent
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
def print_section(title):
"""打印章节标题"""
print("\n" + "="*70)
print(f" {title}")
print("="*70)
def print_result(success, message):
"""打印结果"""
status = "[OK]" if success else "[FAIL]"
print(f"{status} {message}")
def get_actual_tenant_id(conn) -> int:
"""获取数据库中的实际tenant_id"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute("SELECT DISTINCT tenant_id FROM f_polic_file_config LIMIT 1")
result = cursor.fetchone()
if result:
return result['tenant_id']
return 1
finally:
cursor.close()
def get_test_templates(conn, tenant_id: int, limit: int = 5) -> List[Dict]:
"""获取测试用的模板列表"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, name, file_path
FROM f_polic_file_config
WHERE tenant_id = %s
AND file_path IS NOT NULL
AND file_path != ''
AND file_path LIKE 'template_finish/%%'
AND state = 1
ORDER BY id
LIMIT %s
"""
cursor.execute(sql, (tenant_id, limit))
templates = cursor.fetchall()
return templates
finally:
cursor.close()
def extract_placeholders_from_docx(file_path: Path) -> Set[str]:
"""从docx文件中提取所有占位符"""
placeholders = set()
placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}')
try:
doc = Document(file_path)
# 从段落中提取
for paragraph in doc.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
placeholders.add(field_code)
# 从表格中提取
for table in doc.tables:
try:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
placeholders.add(field_code)
except:
continue
except Exception as e:
print(f" [错误] 读取文件失败: {str(e)}")
return placeholders
def check_template_file_exists(file_path: str) -> bool:
"""检查模板文件是否存在"""
local_file = PROJECT_ROOT / file_path
return local_file.exists() and local_file.is_file()
def test_template_reading(file_path: str) -> tuple[bool, str, Set[str]]:
"""测试模板读取功能"""
local_file = PROJECT_ROOT / file_path
if not local_file.exists():
return False, f"文件不存在: {local_file}", set()
try:
# 尝试读取文档
doc = Document(local_file)
# 提取占位符
placeholders = extract_placeholders_from_docx(local_file)
return True, "读取成功", placeholders
except Exception as e:
return False, f"读取失败: {str(e)}", set()
def test_document_generation_api(template_id: int, template_name: str, file_path: str, placeholders: Set[str], api_url: str) -> Dict:
"""测试文档生成API"""
# 构建测试数据
input_data = []
test_values = {
'target_name': '张三',
'target_organization': '测试单位',
'target_position': '测试职务',
'target_organization_and_position': '测试单位-测试职务',
'target_age': '35',
'target_gender': '',
'target_contact': '13800138000',
'target_address': '测试地址',
'target_political_status': '中共党员',
'investigation_team_code': 'DC2025001',
'investigation_team_leader_name': '李四',
'investigation_team_member_names': '王五、赵六',
'investigation_unit_name': '调查单位',
'appointment_time': '2025-12-16 14:00',
'appointment_location': '会议室A',
'approval_time': '2025-12-16',
'clue_source': '群众举报',
'handler_name': '处理人',
'handling_department': '处理部门',
'department_opinion': '同意',
'investigation_location': '调查地点',
'target_basic_info': '基本信息',
'target_contact': '联系方式',
'target_date_of_birth': '1990-01-01',
'target_date_of_birth_full': '1990年1月1日',
'target_education': '本科',
'target_education_level': '大学',
'target_ethnicity': '汉族',
'target_family_situation': '家庭情况',
'target_id_number': '110101199001011234',
'target_issue_description': '问题描述',
'target_place_of_origin': '北京',
'target_professional_rank': '正科级',
'target_registered_address': '户籍地址',
'target_social_relations': '社会关系',
'target_work_basic_info': '工作基本信息'
}
for placeholder in sorted(placeholders):
value = test_values.get(placeholder, f'测试值_{placeholder}')
input_data.append({
"fieldCode": placeholder,
"fieldValue": value
})
payload = {
"fileId": template_id,
"inputData": input_data
}
try:
print(f" 请求URL: {api_url}/api/document/generate")
print(f" 请求体: {json.dumps(payload, ensure_ascii=False, indent=2)}")
response = requests.post(
f"{api_url}/api/document/generate",
json=payload,
timeout=60
)
result = {
'success': response.status_code == 200,
'status_code': response.status_code,
'response': None,
'error': None
}
if response.status_code == 200:
result['response'] = response.json()
else:
result['error'] = response.text
return result
except requests.exceptions.ConnectionError:
return {
'success': False,
'status_code': None,
'response': None,
'error': '无法连接到API服务器请确保服务已启动'
}
except requests.exceptions.Timeout:
return {
'success': False,
'status_code': None,
'response': None,
'error': '请求超时'
}
except Exception as e:
return {
'success': False,
'status_code': None,
'response': None,
'error': f'请求异常: {str(e)}'
}
def verify_generated_document(file_path: str, placeholders: Set[str]) -> Dict:
"""验证生成的文档(如果能够下载)"""
# 这里可以添加下载和验证逻辑
# 目前只返回占位符验证信息
return {
'placeholders_found': len(placeholders),
'placeholders_list': sorted(placeholders)
}
def main():
"""主函数"""
print_section("文档生成接口测试")
# 1. 连接数据库
print_section("1. 连接数据库")
try:
conn = pymysql.connect(**DB_CONFIG)
print_result(True, "数据库连接成功")
except Exception as e:
print_result(False, f"数据库连接失败: {str(e)}")
return
try:
# 2. 获取实际的tenant_id
tenant_id = get_actual_tenant_id(conn)
print(f" 实际tenant_id: {tenant_id}")
# 3. 获取测试模板
print_section("2. 获取测试模板")
test_templates = get_test_templates(conn, tenant_id, limit=5)
print_result(True, f"找到 {len(test_templates)} 个测试模板")
if not test_templates:
print_result(False, "没有找到可测试的模板")
return
# 4. 测试每个模板
print_section("3. 测试模板读取和占位符识别")
test_results = []
for i, template in enumerate(test_templates, 1):
template_id = template['id']
template_name = template['name']
file_path = template['file_path']
print(f"\n 模板 {i}: {template_name}")
print(f" ID: {template_id}")
print(f" 路径: {file_path}")
# 检查文件是否存在
if not check_template_file_exists(file_path):
print_result(False, f"文件不存在: {file_path}")
test_results.append({
'template_id': template_id,
'template_name': template_name,
'file_path': file_path,
'file_exists': False,
'read_success': False,
'placeholders': set(),
'api_test': None
})
continue
print_result(True, "文件存在")
# 测试模板读取
read_success, read_message, placeholders = test_template_reading(file_path)
print_result(read_success, read_message)
if read_success:
print(f" 占位符数量: {len(placeholders)}")
if placeholders:
print(f" 占位符: {sorted(placeholders)}")
else:
print(f" [警告] 未找到占位符")
test_results.append({
'template_id': template_id,
'template_name': template_name,
'file_path': file_path,
'file_exists': True,
'read_success': read_success,
'placeholders': placeholders,
'api_test': None
})
# 5. 测试API接口
print_section("4. 测试文档生成API接口")
# 检查API是否可用
try:
response = requests.get(f"{API_BASE_URL}/api/file-configs", timeout=5)
api_available = response.status_code == 200
except:
api_available = False
if not api_available:
print_result(False, f"API服务不可用: {API_BASE_URL}")
print(" [提示] 请确保Flask服务已启动")
print(" [提示] 可以手动测试API使用以下命令:")
print(f" python app.py")
else:
print_result(True, f"API服务可用: {API_BASE_URL}")
# 选择有占位符的模板进行API测试
templates_with_placeholders = [r for r in test_results if r['read_success'] and r['placeholders']]
if templates_with_placeholders:
test_template = templates_with_placeholders[0]
print(f"\n 测试模板: {test_template['template_name']}")
print(f" 模板ID: {test_template['template_id']}")
print(f" 占位符: {sorted(test_template['placeholders'])}")
api_result = test_document_generation_api(
test_template['template_id'],
test_template['template_name'],
test_template['file_path'],
test_template['placeholders'],
API_BASE_URL
)
test_template['api_test'] = api_result
if api_result['success']:
print_result(True, "API调用成功")
print(f" 响应: {json.dumps(api_result['response'], ensure_ascii=False, indent=2)}")
else:
print_result(False, f"API调用失败: {api_result.get('error', '未知错误')}")
if api_result.get('status_code'):
print(f" 状态码: {api_result['status_code']}")
else:
print_result(False, "没有找到有占位符的模板进行API测试")
# 6. 生成测试报告
print_section("5. 测试结果汇总")
total_templates = len(test_results)
file_exists_count = sum(1 for r in test_results if r['file_exists'])
read_success_count = sum(1 for r in test_results if r['read_success'])
with_placeholders_count = sum(1 for r in test_results if r['placeholders'])
api_tested_count = sum(1 for r in test_results if r.get('api_test') is not None)
api_success_count = sum(1 for r in test_results if r.get('api_test') and r.get('api_test', {}).get('success', False))
print(f" 测试模板总数: {total_templates}")
print(f" 文件存在: {file_exists_count}/{total_templates}")
print(f" 读取成功: {read_success_count}/{total_templates}")
print(f" 有占位符: {with_placeholders_count}/{total_templates}")
print(f" API测试: {api_tested_count}/{total_templates}")
if api_tested_count > 0:
print(f" API成功: {api_success_count}/{api_tested_count}")
# 7. 详细结果
print_section("6. 详细测试结果")
for i, result in enumerate(test_results, 1):
print(f"\n 模板 {i}: {result['template_name']}")
print(f" ID: {result['template_id']}")
print(f" 文件存在: {'' if result['file_exists'] else ''}")
print(f" 读取成功: {'' if result['read_success'] else ''}")
print(f" 占位符数量: {len(result['placeholders'])}")
if result['api_test']:
api_test = result['api_test']
print(f" API测试: {'成功' if api_test['success'] else '失败'}")
if not api_test['success']:
print(f" 错误: {api_test.get('error', '未知错误')}")
finally:
conn.close()
print_result(True, "数据库连接已关闭")
print_section("测试完成")
if __name__ == "__main__":
main()