ai-business-write/test_scripts/test_api_endpoints.py

261 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
测试API接口是否可以正确解析数据和生成文档
"""
import requests
import json
from typing import Dict, Any
# API基础URL
BASE_URL = "http://localhost:7500"
def test_extract_api():
"""测试AI解析接口"""
print("=" * 80)
print("测试 AI解析接口 (/ai/extract)")
print("=" * 80)
url = f"{BASE_URL}/ai/extract"
# 测试数据
request_data = {
"inputData": [
{
"fieldCode": "clue_info",
"fieldValue": "被举报用户名称是张三年龄44岁某公司总经理男性1980年5月出生本科文化程度中共党员正处级。主要问题线索违反国家计划生育有关政策规定于2010年10月生育二胎。线索来源群众举报。"
},
{
"fieldCode": "target_basic_info_clue",
"fieldValue": "被核查人员工作基本情况张三1980年5月生本科文化中共党员现为某公司总经理正处级。"
}
],
"outputData": [
{"fieldCode": "target_name"},
{"fieldCode": "target_gender"},
{"fieldCode": "target_age"},
{"fieldCode": "target_date_of_birth"},
{"fieldCode": "target_organization_and_position"},
{"fieldCode": "target_organization"},
{"fieldCode": "target_position"},
{"fieldCode": "target_education_level"},
{"fieldCode": "target_political_status"},
{"fieldCode": "target_professional_rank"},
{"fieldCode": "clue_source"},
{"fieldCode": "target_issue_description"}
]
}
print("\n请求数据:")
print(json.dumps(request_data, ensure_ascii=False, indent=2))
print("\n正在发送请求...")
try:
response = requests.post(url, json=request_data, timeout=60)
print(f"\n响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
print("\n响应数据:")
print(json.dumps(result, ensure_ascii=False, indent=2))
if result.get('isSuccess'):
print("\n✓ 解析成功!")
if result.get('data', {}).get('outData'):
print("\n提取的字段:")
for item in result['data']['outData']:
value = item.get('fieldValue', '(空)')
print(f" - {item.get('fieldCode')}: {value}")
return True
else:
print(f"\n✗ 解析失败: {result.get('errorMsg', '未知错误')}")
return False
else:
print(f"\n✗ HTTP错误: {response.status_code}")
print(f"响应内容: {response.text}")
return False
except requests.exceptions.Timeout:
print("\n✗ 请求超时60秒")
return False
except requests.exceptions.ConnectionError:
print(f"\n✗ 连接错误: 无法连接到 {BASE_URL}")
print("请确保服务已启动: python app.py")
return False
except Exception as e:
print(f"\n✗ 发生错误: {str(e)}")
return False
def test_generate_document_api():
"""测试文档生成接口"""
print("\n" + "=" * 80)
print("测试 文档生成接口 (/ai/generate-document)")
print("=" * 80)
url = f"{BASE_URL}/ai/generate-document"
# 测试数据
request_data = {
"inputData": [
{"fieldCode": "target_name", "fieldValue": "张三"},
{"fieldCode": "target_gender", "fieldValue": ""},
{"fieldCode": "target_age", "fieldValue": "44"},
{"fieldCode": "target_date_of_birth", "fieldValue": "198005"},
{"fieldCode": "target_organization_and_position", "fieldValue": "某公司总经理"},
{"fieldCode": "target_organization", "fieldValue": "某公司"},
{"fieldCode": "target_position", "fieldValue": "总经理"},
{"fieldCode": "target_education_level", "fieldValue": "本科"},
{"fieldCode": "target_political_status", "fieldValue": "中共党员"},
{"fieldCode": "target_professional_rank", "fieldValue": "正处级"},
{"fieldCode": "clue_source", "fieldValue": "群众举报"},
{"fieldCode": "target_issue_description", "fieldValue": "违反国家计划生育有关政策规定于2010年10月生育二胎。"},
{"fieldCode": "department_opinion", "fieldValue": "建议进行初步核实"},
{"fieldCode": "filler_name", "fieldValue": "李四"}
],
"fpolicFieldParamFileList": [
{
"fileId": 1,
"fileName": "初步核实审批表.doc",
"templateCode": "PRELIMINARY_VERIFICATION_APPROVAL"
}
]
}
print("\n请求数据:")
print(json.dumps(request_data, ensure_ascii=False, indent=2))
print("\n正在发送请求...")
try:
response = requests.post(url, json=request_data, timeout=120)
print(f"\n响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
print("\n响应数据:")
print(json.dumps(result, ensure_ascii=False, indent=2))
if result.get('isSuccess'):
print("\n✓ 文档生成成功!")
if result.get('data'):
data = result['data']
print(f"\n文档ID: {data.get('documentId', 'N/A')}")
print(f"文档名称: {data.get('documentName', 'N/A')}")
if data.get('fpolicFieldParamFileList'):
print("\n生成的文件:")
for file_info in data['fpolicFieldParamFileList']:
print(f" - 文件ID: {file_info.get('fileId')}")
print(f" 文件名: {file_info.get('fileName')}")
print(f" 文件路径: {file_info.get('filePath')}")
return True
else:
print(f"\n✗ 文档生成失败: {result.get('errorMsg', '未知错误')}")
return False
else:
print(f"\n✗ HTTP错误: {response.status_code}")
print(f"响应内容: {response.text}")
return False
except requests.exceptions.Timeout:
print("\n✗ 请求超时120秒")
return False
except requests.exceptions.ConnectionError:
print(f"\n✗ 连接错误: 无法连接到 {BASE_URL}")
print("请确保服务已启动: python app.py")
return False
except Exception as e:
print(f"\n✗ 发生错误: {str(e)}")
return False
def test_fields_api():
"""测试字段配置接口"""
print("\n" + "=" * 80)
print("测试 字段配置接口 (/api/fields)")
print("=" * 80)
url = f"{BASE_URL}/api/fields"
params = {"businessType": "INVESTIGATION"}
print(f"\n请求URL: {url}?{params}")
print("正在发送请求...")
try:
response = requests.get(url, params=params, timeout=30)
print(f"\n响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
print("\n响应数据:")
print(json.dumps(result, ensure_ascii=False, indent=2))
if result.get('isSuccess'):
print("\n✓ 获取字段配置成功!")
if result.get('data', {}).get('fields'):
fields = result['data']['fields']
input_count = len(fields.get('input_fields', []))
output_count = len(fields.get('output_fields', []))
print(f"\n输入字段数: {input_count}")
print(f"输出字段数: {output_count}")
return True
else:
print(f"\n✗ 获取字段配置失败: {result.get('errorMsg', '未知错误')}")
return False
else:
print(f"\n✗ HTTP错误: {response.status_code}")
print(f"响应内容: {response.text}")
return False
except requests.exceptions.ConnectionError:
print(f"\n✗ 连接错误: 无法连接到 {BASE_URL}")
print("请确保服务已启动: python app.py")
return False
except Exception as e:
print(f"\n✗ 发生错误: {str(e)}")
return False
def main():
"""主函数"""
print("API接口测试工具")
print("=" * 80)
print(f"测试目标: {BASE_URL}")
print("=" * 80)
results = []
# 测试字段配置接口
results.append(("字段配置接口", test_fields_api()))
# 测试AI解析接口
results.append(("AI解析接口", test_extract_api()))
# 测试文档生成接口
results.append(("文档生成接口", test_generate_document_api()))
# 打印汇总
print("\n" + "=" * 80)
print("测试汇总")
print("=" * 80)
for name, success in results:
status = "✓ 通过" if success else "✗ 失败"
print(f"{name}: {status}")
total = len(results)
passed = sum(1 for _, success in results if success)
print(f"\n总计: {passed}/{total} 通过")
if passed == total:
print("\n✓ 所有测试通过!")
else:
print(f"\n{total - passed} 个测试失败,请检查上述错误信息")
if __name__ == '__main__':
main()