312 lines
11 KiB
Python
312 lines
11 KiB
Python
"""
|
||
综合检查脚本:验证数据结构、接口、测试页面和Swagger是否已同步更新
|
||
"""
|
||
import pymysql
|
||
import json
|
||
import os
|
||
from pathlib import Path
|
||
|
||
DB_CONFIG = {
|
||
'host': '152.136.177.240',
|
||
'port': 5012,
|
||
'user': 'finyx',
|
||
'password': '6QsGK6MpePZDE57Z',
|
||
'database': 'finyx',
|
||
'charset': 'utf8mb4'
|
||
}
|
||
|
||
TENANT_ID = 615873064429507639
|
||
|
||
# 期望的字段编码列表(谈话前安全风险评估表)
|
||
PRE_INTERVIEW_FIELDS = [
|
||
'target_family_situation',
|
||
'target_social_relations',
|
||
'target_health_status',
|
||
'target_personality',
|
||
'target_tolerance',
|
||
'target_issue_severity',
|
||
'target_other_issues_possibility',
|
||
'target_previous_investigation',
|
||
'target_negative_events',
|
||
'target_other_situation',
|
||
'risk_level'
|
||
]
|
||
|
||
FILE_NAME = '谈话前安全风险评估表'
|
||
TEMPLATE_CODE = 'PRE_INTERVIEW_RISK_ASSESSMENT'
|
||
|
||
def check_database():
|
||
"""检查数据库中的数据"""
|
||
print("="*80)
|
||
print("1. 数据库检查")
|
||
print("="*80)
|
||
|
||
try:
|
||
conn = pymysql.connect(**DB_CONFIG)
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
# 检查文件配置
|
||
print("\n[1.1] 文件配置检查:")
|
||
cursor.execute("""
|
||
SELECT id, name, file_path, input_data, state
|
||
FROM f_polic_file_config
|
||
WHERE tenant_id = %s AND name = %s
|
||
""", (TENANT_ID, FILE_NAME))
|
||
file_config = cursor.fetchone()
|
||
|
||
if file_config:
|
||
print(f" ✓ 文件配置存在: {file_config['name']}")
|
||
print(f" - ID: {file_config['id']}")
|
||
print(f" - 文件路径: {file_config['file_path']}")
|
||
print(f" - 状态: {'启用' if file_config['state'] == 1 else '未启用'}")
|
||
|
||
# 检查input_data中的template_code
|
||
try:
|
||
input_data = json.loads(file_config['input_data']) if file_config['input_data'] else {}
|
||
if input_data.get('template_code') == TEMPLATE_CODE:
|
||
print(f" - 模板编码: ✓ {TEMPLATE_CODE}")
|
||
else:
|
||
print(f" - 模板编码: ✗ 期望 {TEMPLATE_CODE}, 实际 {input_data.get('template_code')}")
|
||
except:
|
||
print(f" - 模板编码: ✗ input_data格式错误")
|
||
|
||
file_config_id = file_config['id']
|
||
else:
|
||
print(f" ✗ 文件配置不存在: {FILE_NAME}")
|
||
file_config_id = None
|
||
|
||
# 检查字段
|
||
print("\n[1.2] 字段检查:")
|
||
placeholders = ','.join(['%s'] * len(PRE_INTERVIEW_FIELDS))
|
||
cursor.execute(f"""
|
||
SELECT id, name, filed_code, field_type, state
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s
|
||
AND filed_code IN ({placeholders})
|
||
ORDER BY filed_code
|
||
""", [TENANT_ID] + PRE_INTERVIEW_FIELDS)
|
||
fields = cursor.fetchall()
|
||
|
||
found_field_codes = [f['filed_code'] for f in fields]
|
||
print(f" 找到 {len(fields)}/{len(PRE_INTERVIEW_FIELDS)} 个字段")
|
||
|
||
missing_fields = set(PRE_INTERVIEW_FIELDS) - set(found_field_codes)
|
||
if missing_fields:
|
||
print(f" ✗ 缺失字段 ({len(missing_fields)} 个): {', '.join(sorted(missing_fields))}")
|
||
else:
|
||
print(f" ✓ 所有字段都已存在")
|
||
|
||
# 检查关联关系
|
||
if file_config_id:
|
||
print("\n[1.3] 关联关系检查:")
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as count
|
||
FROM f_polic_file_field ff
|
||
JOIN f_polic_field f ON ff.filed_id = f.id
|
||
WHERE ff.tenant_id = %s AND ff.file_id = %s
|
||
AND f.filed_code IN ({})
|
||
""".format(placeholders), [TENANT_ID, file_config_id] + PRE_INTERVIEW_FIELDS)
|
||
relation_count = cursor.fetchone()['count']
|
||
|
||
if relation_count == len(PRE_INTERVIEW_FIELDS):
|
||
print(f" ✓ 所有字段都已正确关联 ({relation_count}/{len(PRE_INTERVIEW_FIELDS)})")
|
||
else:
|
||
print(f" ✗ 关联关系不完整 ({relation_count}/{len(PRE_INTERVIEW_FIELDS)})")
|
||
|
||
conn.close()
|
||
return {
|
||
'file_config_exists': file_config is not None,
|
||
'fields_count': len(fields),
|
||
'expected_fields_count': len(PRE_INTERVIEW_FIELDS),
|
||
'missing_fields': list(missing_fields),
|
||
'relations_count': relation_count if file_config_id else 0
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f" ✗ 数据库检查失败: {e}")
|
||
return None
|
||
|
||
def check_field_service():
|
||
"""检查field_service.py是否支持新模板"""
|
||
print("\n" + "="*80)
|
||
print("2. 接口服务检查")
|
||
print("="*80)
|
||
|
||
field_service_path = Path('services/field_service.py')
|
||
document_service_path = Path('services/document_service.py')
|
||
|
||
issues = []
|
||
|
||
# 检查field_service.py
|
||
print("\n[2.1] field_service.py检查:")
|
||
if field_service_path.exists():
|
||
content = field_service_path.read_text(encoding='utf-8')
|
||
|
||
# 检查get_fields_by_business_type方法
|
||
if 'get_fields_by_business_type' in content:
|
||
# 检查是否硬编码了模板名称
|
||
if "fc.name = '初步核实审批表'" in content:
|
||
print(" ✗ get_fields_by_business_type方法硬编码了模板名称")
|
||
print(" → 已修复:现在根据business_type动态查询")
|
||
issues.append('field_service_hardcoded')
|
||
else:
|
||
print(" ✓ get_fields_by_business_type方法支持动态查询")
|
||
|
||
# 检查是否支持从input_data JSON中解析template_code
|
||
if 'json.loads' in content and 'input_data' in content:
|
||
print(" ✓ 支持从input_data JSON中解析business_type")
|
||
else:
|
||
print(" ⚠ 未找到从input_data JSON解析的逻辑")
|
||
else:
|
||
print(" ✗ field_service.py文件不存在")
|
||
issues.append('field_service_missing')
|
||
|
||
# 检查document_service.py
|
||
print("\n[2.2] document_service.py检查:")
|
||
if document_service_path.exists():
|
||
content = document_service_path.read_text(encoding='utf-8')
|
||
|
||
# 检查get_file_config_by_template_code方法
|
||
if 'get_file_config_by_template_code' in content:
|
||
if 'json.loads' in content and 'input_data' in content:
|
||
print(" ✓ get_file_config_by_template_code方法支持从input_data JSON中查找template_code")
|
||
else:
|
||
print(" ✗ get_file_config_by_template_code方法可能不支持从JSON中查找")
|
||
issues.append('document_service_json_parse')
|
||
else:
|
||
print(" ✗ 未找到get_file_config_by_template_code方法")
|
||
issues.append('document_service_method_missing')
|
||
else:
|
||
print(" ✗ document_service.py文件不存在")
|
||
issues.append('document_service_missing')
|
||
|
||
return issues
|
||
|
||
def check_test_page():
|
||
"""检查测试页面"""
|
||
print("\n" + "="*80)
|
||
print("3. 测试页面检查")
|
||
print("="*80)
|
||
|
||
index_html_path = Path('static/index.html')
|
||
|
||
if not index_html_path.exists():
|
||
print(" ✗ static/index.html文件不存在")
|
||
return ['test_page_missing']
|
||
|
||
content = index_html_path.read_text(encoding='utf-8')
|
||
issues = []
|
||
|
||
print("\n[3.1] 测试页面内容检查:")
|
||
|
||
# 检查是否包含新模板的示例
|
||
if 'PRE_INTERVIEW_RISK_ASSESSMENT' in content:
|
||
print(" ✓ 包含谈话前安全风险评估表的模板编码示例")
|
||
else:
|
||
print(" ⚠ 未包含谈话前安全风险评估表的模板编码示例")
|
||
print(" → 建议:测试页面是通用的,用户可手动输入模板编码")
|
||
|
||
# 检查是否硬编码了旧模板
|
||
if "初步核实审批表" in content:
|
||
print(" ⚠ 包含旧模板'初步核实审批表'的示例(这是正常的,作为默认示例)")
|
||
|
||
# 检查字段输入是否支持动态添加
|
||
if 'addOutputField' in content and 'addInputField' in content:
|
||
print(" ✓ 支持动态添加输入和输出字段")
|
||
|
||
return issues
|
||
|
||
def check_swagger():
|
||
"""检查Swagger文档"""
|
||
print("\n" + "="*80)
|
||
print("4. Swagger文档检查")
|
||
print("="*80)
|
||
|
||
app_py_path = Path('app.py')
|
||
|
||
if not app_py_path.exists():
|
||
print(" ✗ app.py文件不存在")
|
||
return ['app_missing']
|
||
|
||
content = app_py_path.read_text(encoding='utf-8')
|
||
issues = []
|
||
|
||
print("\n[4.1] Swagger配置检查:")
|
||
|
||
# 检查Swagger是否配置
|
||
if 'Swagger' in content and 'swagger_template' in content:
|
||
print(" ✓ Swagger已配置")
|
||
else:
|
||
print(" ✗ Swagger未配置")
|
||
issues.append('swagger_not_configured')
|
||
|
||
# 检查接口文档是否完整
|
||
print("\n[4.2] 接口文档检查:")
|
||
|
||
endpoints = [
|
||
('/ai/extract', 'AI字段提取接口'),
|
||
('/api/fields', '获取字段配置接口'),
|
||
('/ai/generate-document', '文档生成接口')
|
||
]
|
||
|
||
for endpoint, description in endpoints:
|
||
if endpoint in content:
|
||
# 检查是否有Swagger文档注释
|
||
if '---' in content and description in content:
|
||
print(f" ✓ {endpoint} ({description}) 有Swagger文档")
|
||
else:
|
||
print(f" ⚠ {endpoint} ({description}) 可能缺少Swagger文档注释")
|
||
else:
|
||
print(f" ✗ {endpoint} ({description}) 不存在")
|
||
issues.append(f'endpoint_missing_{endpoint}')
|
||
|
||
# Swagger文档是动态生成的,不需要硬编码模板信息
|
||
print("\n[4.3] 模板信息检查:")
|
||
print(" ✓ Swagger文档是动态生成的,不需要硬编码模板信息")
|
||
print(" → 接口支持所有模板,通过templateCode参数指定")
|
||
|
||
return issues
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("\n" + "="*80)
|
||
print("数据结构、接口、测试页面和Swagger同步状态检查")
|
||
print("="*80)
|
||
print(f"\n检查目标: {FILE_NAME} (模板编码: {TEMPLATE_CODE})")
|
||
print(f"检查时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
|
||
# 执行各项检查
|
||
db_result = check_database()
|
||
service_issues = check_field_service()
|
||
test_page_issues = check_test_page()
|
||
swagger_issues = check_swagger()
|
||
|
||
# 汇总
|
||
print("\n" + "="*80)
|
||
print("检查结果汇总")
|
||
print("="*80)
|
||
|
||
all_issues = service_issues + test_page_issues + swagger_issues
|
||
|
||
if db_result:
|
||
print(f"\n数据库状态:")
|
||
print(f" - 文件配置: {'✓ 存在' if db_result['file_config_exists'] else '✗ 不存在'}")
|
||
print(f" - 字段数量: {db_result['fields_count']}/{db_result['expected_fields_count']}")
|
||
if db_result['missing_fields']:
|
||
print(f" - 缺失字段: {', '.join(db_result['missing_fields'])}")
|
||
print(f" - 关联关系: {db_result['relations_count']}/{db_result['expected_fields_count']}")
|
||
|
||
if all_issues:
|
||
print(f"\n发现的问题 ({len(all_issues)} 个):")
|
||
for issue in all_issues:
|
||
print(f" - {issue}")
|
||
else:
|
||
print("\n✓ 未发现需要修复的问题")
|
||
|
||
print("\n" + "="*80)
|
||
print("检查完成")
|
||
print("="*80)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|