""" 综合检查脚本:验证数据结构、接口、测试页面和Swagger是否已同步更新 """ import pymysql import json import os from pathlib import Path DB_CONFIG = { 'host': '152.136.177.240', 'port': 5012, 'user': 'finyx', 'password': '6QsGK6MpePZDE57Z', 'database': 'finyx', 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 # 期望的字段编码列表(谈话前安全风险评估表) PRE_INTERVIEW_FIELDS = [ 'target_family_situation', 'target_social_relations', 'target_health_status', 'target_personality', 'target_tolerance', 'target_issue_severity', 'target_other_issues_possibility', 'target_previous_investigation', 'target_negative_events', 'target_other_situation', 'risk_level' ] FILE_NAME = '谈话前安全风险评估表' TEMPLATE_CODE = 'PRE_INTERVIEW_RISK_ASSESSMENT' def check_database(): """检查数据库中的数据""" print("="*80) print("1. 数据库检查") print("="*80) try: conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor(pymysql.cursors.DictCursor) # 检查文件配置 print("\n[1.1] 文件配置检查:") cursor.execute(""" SELECT id, name, file_path, input_data, state FROM f_polic_file_config WHERE tenant_id = %s AND name = %s """, (TENANT_ID, FILE_NAME)) file_config = cursor.fetchone() if file_config: print(f" ✓ 文件配置存在: {file_config['name']}") print(f" - ID: {file_config['id']}") print(f" - 文件路径: {file_config['file_path']}") print(f" - 状态: {'启用' if file_config['state'] == 1 else '未启用'}") # 检查input_data中的template_code try: input_data = json.loads(file_config['input_data']) if file_config['input_data'] else {} if input_data.get('template_code') == TEMPLATE_CODE: print(f" - 模板编码: ✓ {TEMPLATE_CODE}") else: print(f" - 模板编码: ✗ 期望 {TEMPLATE_CODE}, 实际 {input_data.get('template_code')}") except: print(f" - 模板编码: ✗ input_data格式错误") file_config_id = file_config['id'] else: print(f" ✗ 文件配置不存在: {FILE_NAME}") file_config_id = None # 检查字段 print("\n[1.2] 字段检查:") placeholders = ','.join(['%s'] * len(PRE_INTERVIEW_FIELDS)) cursor.execute(f""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s AND filed_code IN ({placeholders}) ORDER BY filed_code """, [TENANT_ID] + PRE_INTERVIEW_FIELDS) fields = cursor.fetchall() found_field_codes = [f['filed_code'] for f in fields] print(f" 找到 {len(fields)}/{len(PRE_INTERVIEW_FIELDS)} 个字段") missing_fields = set(PRE_INTERVIEW_FIELDS) - set(found_field_codes) if missing_fields: print(f" ✗ 缺失字段 ({len(missing_fields)} 个): {', '.join(sorted(missing_fields))}") else: print(f" ✓ 所有字段都已存在") # 检查关联关系 if file_config_id: print("\n[1.3] 关联关系检查:") cursor.execute(""" SELECT COUNT(*) as count FROM f_polic_file_field ff JOIN f_polic_field f ON ff.filed_id = f.id WHERE ff.tenant_id = %s AND ff.file_id = %s AND f.filed_code IN ({}) """.format(placeholders), [TENANT_ID, file_config_id] + PRE_INTERVIEW_FIELDS) relation_count = cursor.fetchone()['count'] if relation_count == len(PRE_INTERVIEW_FIELDS): print(f" ✓ 所有字段都已正确关联 ({relation_count}/{len(PRE_INTERVIEW_FIELDS)})") else: print(f" ✗ 关联关系不完整 ({relation_count}/{len(PRE_INTERVIEW_FIELDS)})") conn.close() return { 'file_config_exists': file_config is not None, 'fields_count': len(fields), 'expected_fields_count': len(PRE_INTERVIEW_FIELDS), 'missing_fields': list(missing_fields), 'relations_count': relation_count if file_config_id else 0 } except Exception as e: print(f" ✗ 数据库检查失败: {e}") return None def check_field_service(): """检查field_service.py是否支持新模板""" print("\n" + "="*80) print("2. 接口服务检查") print("="*80) field_service_path = Path('services/field_service.py') document_service_path = Path('services/document_service.py') issues = [] # 检查field_service.py print("\n[2.1] field_service.py检查:") if field_service_path.exists(): content = field_service_path.read_text(encoding='utf-8') # 检查get_fields_by_business_type方法 if 'get_fields_by_business_type' in content: # 检查是否硬编码了模板名称 if "fc.name = '初步核实审批表'" in content: print(" ✗ get_fields_by_business_type方法硬编码了模板名称") print(" → 已修复:现在根据business_type动态查询") issues.append('field_service_hardcoded') else: print(" ✓ get_fields_by_business_type方法支持动态查询") # 检查是否支持从input_data JSON中解析template_code if 'json.loads' in content and 'input_data' in content: print(" ✓ 支持从input_data JSON中解析business_type") else: print(" ⚠ 未找到从input_data JSON解析的逻辑") else: print(" ✗ field_service.py文件不存在") issues.append('field_service_missing') # 检查document_service.py print("\n[2.2] document_service.py检查:") if document_service_path.exists(): content = document_service_path.read_text(encoding='utf-8') # 检查get_file_config_by_template_code方法 if 'get_file_config_by_template_code' in content: if 'json.loads' in content and 'input_data' in content: print(" ✓ get_file_config_by_template_code方法支持从input_data JSON中查找template_code") else: print(" ✗ get_file_config_by_template_code方法可能不支持从JSON中查找") issues.append('document_service_json_parse') else: print(" ✗ 未找到get_file_config_by_template_code方法") issues.append('document_service_method_missing') else: print(" ✗ document_service.py文件不存在") issues.append('document_service_missing') return issues def check_test_page(): """检查测试页面""" print("\n" + "="*80) print("3. 测试页面检查") print("="*80) index_html_path = Path('static/index.html') if not index_html_path.exists(): print(" ✗ static/index.html文件不存在") return ['test_page_missing'] content = index_html_path.read_text(encoding='utf-8') issues = [] print("\n[3.1] 测试页面内容检查:") # 检查是否包含新模板的示例 if 'PRE_INTERVIEW_RISK_ASSESSMENT' in content: print(" ✓ 包含谈话前安全风险评估表的模板编码示例") else: print(" ⚠ 未包含谈话前安全风险评估表的模板编码示例") print(" → 建议:测试页面是通用的,用户可手动输入模板编码") # 检查是否硬编码了旧模板 if "初步核实审批表" in content: print(" ⚠ 包含旧模板'初步核实审批表'的示例(这是正常的,作为默认示例)") # 检查字段输入是否支持动态添加 if 'addOutputField' in content and 'addInputField' in content: print(" ✓ 支持动态添加输入和输出字段") return issues def check_swagger(): """检查Swagger文档""" print("\n" + "="*80) print("4. Swagger文档检查") print("="*80) app_py_path = Path('app.py') if not app_py_path.exists(): print(" ✗ app.py文件不存在") return ['app_missing'] content = app_py_path.read_text(encoding='utf-8') issues = [] print("\n[4.1] Swagger配置检查:") # 检查Swagger是否配置 if 'Swagger' in content and 'swagger_template' in content: print(" ✓ Swagger已配置") else: print(" ✗ Swagger未配置") issues.append('swagger_not_configured') # 检查接口文档是否完整 print("\n[4.2] 接口文档检查:") endpoints = [ ('/ai/extract', 'AI字段提取接口'), ('/api/fields', '获取字段配置接口'), ('/ai/generate-document', '文档生成接口') ] for endpoint, description in endpoints: if endpoint in content: # 检查是否有Swagger文档注释 if '---' in content and description in content: print(f" ✓ {endpoint} ({description}) 有Swagger文档") else: print(f" ⚠ {endpoint} ({description}) 可能缺少Swagger文档注释") else: print(f" ✗ {endpoint} ({description}) 不存在") issues.append(f'endpoint_missing_{endpoint}') # Swagger文档是动态生成的,不需要硬编码模板信息 print("\n[4.3] 模板信息检查:") print(" ✓ Swagger文档是动态生成的,不需要硬编码模板信息") print(" → 接口支持所有模板,通过templateCode参数指定") return issues def main(): """主函数""" print("\n" + "="*80) print("数据结构、接口、测试页面和Swagger同步状态检查") print("="*80) print(f"\n检查目标: {FILE_NAME} (模板编码: {TEMPLATE_CODE})") print(f"检查时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") # 执行各项检查 db_result = check_database() service_issues = check_field_service() test_page_issues = check_test_page() swagger_issues = check_swagger() # 汇总 print("\n" + "="*80) print("检查结果汇总") print("="*80) all_issues = service_issues + test_page_issues + swagger_issues if db_result: print(f"\n数据库状态:") print(f" - 文件配置: {'✓ 存在' if db_result['file_config_exists'] else '✗ 不存在'}") print(f" - 字段数量: {db_result['fields_count']}/{db_result['expected_fields_count']}") if db_result['missing_fields']: print(f" - 缺失字段: {', '.join(db_result['missing_fields'])}") print(f" - 关联关系: {db_result['relations_count']}/{db_result['expected_fields_count']}") if all_issues: print(f"\n发现的问题 ({len(all_issues)} 个):") for issue in all_issues: print(f" - {issue}") else: print("\n✓ 未发现需要修复的问题") print("\n" + "="*80) print("检查完成") print("="*80) if __name__ == '__main__': main()