ai-business-write/check_data_sync_status.py

312 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
综合检查脚本验证数据结构、接口、测试页面和Swagger是否已同步更新
"""
import pymysql
import json
import os
from pathlib import Path
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
# 期望的字段编码列表(谈话前安全风险评估表)
PRE_INTERVIEW_FIELDS = [
'target_family_situation',
'target_social_relations',
'target_health_status',
'target_personality',
'target_tolerance',
'target_issue_severity',
'target_other_issues_possibility',
'target_previous_investigation',
'target_negative_events',
'target_other_situation',
'risk_level'
]
FILE_NAME = '谈话前安全风险评估表'
TEMPLATE_CODE = 'PRE_INTERVIEW_RISK_ASSESSMENT'
def check_database():
"""检查数据库中的数据"""
print("="*80)
print("1. 数据库检查")
print("="*80)
try:
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 检查文件配置
print("\n[1.1] 文件配置检查:")
cursor.execute("""
SELECT id, name, file_path, input_data, state
FROM f_polic_file_config
WHERE tenant_id = %s AND name = %s
""", (TENANT_ID, FILE_NAME))
file_config = cursor.fetchone()
if file_config:
print(f" ✓ 文件配置存在: {file_config['name']}")
print(f" - ID: {file_config['id']}")
print(f" - 文件路径: {file_config['file_path']}")
print(f" - 状态: {'启用' if file_config['state'] == 1 else '未启用'}")
# 检查input_data中的template_code
try:
input_data = json.loads(file_config['input_data']) if file_config['input_data'] else {}
if input_data.get('template_code') == TEMPLATE_CODE:
print(f" - 模板编码: ✓ {TEMPLATE_CODE}")
else:
print(f" - 模板编码: ✗ 期望 {TEMPLATE_CODE}, 实际 {input_data.get('template_code')}")
except:
print(f" - 模板编码: ✗ input_data格式错误")
file_config_id = file_config['id']
else:
print(f" ✗ 文件配置不存在: {FILE_NAME}")
file_config_id = None
# 检查字段
print("\n[1.2] 字段检查:")
placeholders = ','.join(['%s'] * len(PRE_INTERVIEW_FIELDS))
cursor.execute(f"""
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
AND filed_code IN ({placeholders})
ORDER BY filed_code
""", [TENANT_ID] + PRE_INTERVIEW_FIELDS)
fields = cursor.fetchall()
found_field_codes = [f['filed_code'] for f in fields]
print(f" 找到 {len(fields)}/{len(PRE_INTERVIEW_FIELDS)} 个字段")
missing_fields = set(PRE_INTERVIEW_FIELDS) - set(found_field_codes)
if missing_fields:
print(f" ✗ 缺失字段 ({len(missing_fields)} 个): {', '.join(sorted(missing_fields))}")
else:
print(f" ✓ 所有字段都已存在")
# 检查关联关系
if file_config_id:
print("\n[1.3] 关联关系检查:")
cursor.execute("""
SELECT COUNT(*) as count
FROM f_polic_file_field ff
JOIN f_polic_field f ON ff.filed_id = f.id
WHERE ff.tenant_id = %s AND ff.file_id = %s
AND f.filed_code IN ({})
""".format(placeholders), [TENANT_ID, file_config_id] + PRE_INTERVIEW_FIELDS)
relation_count = cursor.fetchone()['count']
if relation_count == len(PRE_INTERVIEW_FIELDS):
print(f" ✓ 所有字段都已正确关联 ({relation_count}/{len(PRE_INTERVIEW_FIELDS)})")
else:
print(f" ✗ 关联关系不完整 ({relation_count}/{len(PRE_INTERVIEW_FIELDS)})")
conn.close()
return {
'file_config_exists': file_config is not None,
'fields_count': len(fields),
'expected_fields_count': len(PRE_INTERVIEW_FIELDS),
'missing_fields': list(missing_fields),
'relations_count': relation_count if file_config_id else 0
}
except Exception as e:
print(f" ✗ 数据库检查失败: {e}")
return None
def check_field_service():
"""检查field_service.py是否支持新模板"""
print("\n" + "="*80)
print("2. 接口服务检查")
print("="*80)
field_service_path = Path('services/field_service.py')
document_service_path = Path('services/document_service.py')
issues = []
# 检查field_service.py
print("\n[2.1] field_service.py检查:")
if field_service_path.exists():
content = field_service_path.read_text(encoding='utf-8')
# 检查get_fields_by_business_type方法
if 'get_fields_by_business_type' in content:
# 检查是否硬编码了模板名称
if "fc.name = '初步核实审批表'" in content:
print(" ✗ get_fields_by_business_type方法硬编码了模板名称")
print(" → 已修复现在根据business_type动态查询")
issues.append('field_service_hardcoded')
else:
print(" ✓ get_fields_by_business_type方法支持动态查询")
# 检查是否支持从input_data JSON中解析template_code
if 'json.loads' in content and 'input_data' in content:
print(" ✓ 支持从input_data JSON中解析business_type")
else:
print(" ⚠ 未找到从input_data JSON解析的逻辑")
else:
print(" ✗ field_service.py文件不存在")
issues.append('field_service_missing')
# 检查document_service.py
print("\n[2.2] document_service.py检查:")
if document_service_path.exists():
content = document_service_path.read_text(encoding='utf-8')
# 检查get_file_config_by_template_code方法
if 'get_file_config_by_template_code' in content:
if 'json.loads' in content and 'input_data' in content:
print(" ✓ get_file_config_by_template_code方法支持从input_data JSON中查找template_code")
else:
print(" ✗ get_file_config_by_template_code方法可能不支持从JSON中查找")
issues.append('document_service_json_parse')
else:
print(" ✗ 未找到get_file_config_by_template_code方法")
issues.append('document_service_method_missing')
else:
print(" ✗ document_service.py文件不存在")
issues.append('document_service_missing')
return issues
def check_test_page():
"""检查测试页面"""
print("\n" + "="*80)
print("3. 测试页面检查")
print("="*80)
index_html_path = Path('static/index.html')
if not index_html_path.exists():
print(" ✗ static/index.html文件不存在")
return ['test_page_missing']
content = index_html_path.read_text(encoding='utf-8')
issues = []
print("\n[3.1] 测试页面内容检查:")
# 检查是否包含新模板的示例
if 'PRE_INTERVIEW_RISK_ASSESSMENT' in content:
print(" ✓ 包含谈话前安全风险评估表的模板编码示例")
else:
print(" ⚠ 未包含谈话前安全风险评估表的模板编码示例")
print(" → 建议:测试页面是通用的,用户可手动输入模板编码")
# 检查是否硬编码了旧模板
if "初步核实审批表" in content:
print(" ⚠ 包含旧模板'初步核实审批表'的示例(这是正常的,作为默认示例)")
# 检查字段输入是否支持动态添加
if 'addOutputField' in content and 'addInputField' in content:
print(" ✓ 支持动态添加输入和输出字段")
return issues
def check_swagger():
"""检查Swagger文档"""
print("\n" + "="*80)
print("4. Swagger文档检查")
print("="*80)
app_py_path = Path('app.py')
if not app_py_path.exists():
print(" ✗ app.py文件不存在")
return ['app_missing']
content = app_py_path.read_text(encoding='utf-8')
issues = []
print("\n[4.1] Swagger配置检查:")
# 检查Swagger是否配置
if 'Swagger' in content and 'swagger_template' in content:
print(" ✓ Swagger已配置")
else:
print(" ✗ Swagger未配置")
issues.append('swagger_not_configured')
# 检查接口文档是否完整
print("\n[4.2] 接口文档检查:")
endpoints = [
('/ai/extract', 'AI字段提取接口'),
('/api/fields', '获取字段配置接口'),
('/ai/generate-document', '文档生成接口')
]
for endpoint, description in endpoints:
if endpoint in content:
# 检查是否有Swagger文档注释
if '---' in content and description in content:
print(f"{endpoint} ({description}) 有Swagger文档")
else:
print(f"{endpoint} ({description}) 可能缺少Swagger文档注释")
else:
print(f"{endpoint} ({description}) 不存在")
issues.append(f'endpoint_missing_{endpoint}')
# Swagger文档是动态生成的不需要硬编码模板信息
print("\n[4.3] 模板信息检查:")
print(" ✓ Swagger文档是动态生成的不需要硬编码模板信息")
print(" → 接口支持所有模板通过templateCode参数指定")
return issues
def main():
"""主函数"""
print("\n" + "="*80)
print("数据结构、接口、测试页面和Swagger同步状态检查")
print("="*80)
print(f"\n检查目标: {FILE_NAME} (模板编码: {TEMPLATE_CODE})")
print(f"检查时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 执行各项检查
db_result = check_database()
service_issues = check_field_service()
test_page_issues = check_test_page()
swagger_issues = check_swagger()
# 汇总
print("\n" + "="*80)
print("检查结果汇总")
print("="*80)
all_issues = service_issues + test_page_issues + swagger_issues
if db_result:
print(f"\n数据库状态:")
print(f" - 文件配置: {'✓ 存在' if db_result['file_config_exists'] else '✗ 不存在'}")
print(f" - 字段数量: {db_result['fields_count']}/{db_result['expected_fields_count']}")
if db_result['missing_fields']:
print(f" - 缺失字段: {', '.join(db_result['missing_fields'])}")
print(f" - 关联关系: {db_result['relations_count']}/{db_result['expected_fields_count']}")
if all_issues:
print(f"\n发现的问题 ({len(all_issues)} 个):")
for issue in all_issues:
print(f" - {issue}")
else:
print("\n✓ 未发现需要修复的问题")
print("\n" + "="*80)
print("检查完成")
print("="*80)
if __name__ == '__main__':
main()