""" 检查模板的 file_id 和相关关联关系是否正确 重点检查: 1. f_polic_file_config 表中的模板记录(file_id) 2. f_polic_file_field 表中的关联关系(file_id 和 filed_id 的对应关系) """ import sys import pymysql from pathlib import Path from typing import Dict, List, Set, Tuple from collections import defaultdict # 设置控制台编码为UTF-8(Windows兼容) if sys.platform == 'win32': try: sys.stdout.reconfigure(encoding='utf-8') sys.stderr.reconfigure(encoding='utf-8') except: pass # 数据库连接配置 DB_CONFIG = { 'host': '152.136.177.240', 'port': 5012, 'user': 'finyx', 'password': '6QsGK6MpePZDE57Z', 'database': 'finyx', 'charset': 'utf8mb4' } # 固定值 TENANT_ID = 615873064429507639 # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" # 文档类型映射(用于识别模板) DOCUMENT_TYPE_MAPPING = { "1.请示报告卡(XXX)": "REPORT_CARD", "2.初步核实审批表(XXX)": "PRELIMINARY_VERIFICATION_APPROVAL", "3.附件初核方案(XXX)": "INVESTIGATION_PLAN", "谈话通知书第一联": "NOTIFICATION_LETTER_1", "谈话通知书第二联": "NOTIFICATION_LETTER_2", "谈话通知书第三联": "NOTIFICATION_LETTER_3", "1.请示报告卡(初核谈话)": "REPORT_CARD_INTERVIEW", "2谈话审批表": "INTERVIEW_APPROVAL_FORM", "3.谈话前安全风险评估表": "PRE_INTERVIEW_RISK_ASSESSMENT", "4.谈话方案": "INTERVIEW_PLAN", "5.谈话后安全风险评估表": "POST_INTERVIEW_RISK_ASSESSMENT", "1.谈话笔录": "INTERVIEW_RECORD", "2.谈话询问对象情况摸底调查30问": "INVESTIGATION_30_QUESTIONS", "3.被谈话人权利义务告知书": "RIGHTS_OBLIGATIONS_NOTICE", "4.点对点交接单": "HANDOVER_FORM", "4.点对点交接单2": "HANDOVER_FORM_2", "5.陪送交接单(新)": "ESCORT_HANDOVER_FORM", "6.1保密承诺书(谈话对象使用-非中共党员用)": "CONFIDENTIALITY_COMMITMENT_NON_PARTY", "6.2保密承诺书(谈话对象使用-中共党员用)": "CONFIDENTIALITY_COMMITMENT_PARTY", "7.办案人员-办案安全保密承诺书": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT", "8-1请示报告卡(初核报告结论) ": "REPORT_CARD_CONCLUSION", "8.XXX初核情况报告": "INVESTIGATION_REPORT" } def get_template_files() -> Dict[str, Path]: """获取所有模板文件""" templates = {} if not TEMPLATES_DIR.exists(): return templates for root, dirs, files in os.walk(TEMPLATES_DIR): for file in files: if file.endswith('.docx') and not file.startswith('~$'): file_path = Path(root) / file base_name = Path(file).stem if base_name in DOCUMENT_TYPE_MAPPING: templates[base_name] = file_path return templates def check_file_configs(conn) -> Dict: """检查 f_polic_file_config 表中的模板记录""" print("\n" + "="*80) print("1. 检查 f_polic_file_config 表中的模板记录") print("="*80) cursor = conn.cursor(pymysql.cursors.DictCursor) # 查询所有模板记录 cursor.execute(""" SELECT id, name, template_code, file_path, state, parent_id FROM f_polic_file_config WHERE tenant_id = %s ORDER BY name """, (TENANT_ID,)) all_configs = cursor.fetchall() # 按 template_code 和 name 组织数据 configs_by_code = {} configs_by_name = {} for config in all_configs: config_id = config['id'] name = config['name'] template_code = config.get('template_code') if template_code: if template_code not in configs_by_code: configs_by_code[template_code] = [] configs_by_code[template_code].append(config) if name: if name not in configs_by_name: configs_by_name[name] = [] configs_by_name[name].append(config) print(f"\n总模板记录数: {len(all_configs)}") print(f"按 template_code 分组: {len(configs_by_code)} 个不同的 template_code") print(f"按 name 分组: {len(configs_by_name)} 个不同的 name") # 检查重复的 template_code duplicate_codes = {code: configs for code, configs in configs_by_code.items() if len(configs) > 1} if duplicate_codes: print(f"\n[WARN] 发现重复的 template_code ({len(duplicate_codes)} 个):") for code, configs in duplicate_codes.items(): print(f" - {code}: {len(configs)} 条记录") for cfg in configs: print(f" ID: {cfg['id']}, 名称: {cfg['name']}, 路径: {cfg.get('file_path', 'N/A')}") # 检查重复的 name duplicate_names = {name: configs for name, configs in configs_by_name.items() if len(configs) > 1} if duplicate_names: print(f"\n[WARN] 发现重复的 name ({len(duplicate_names)} 个):") for name, configs in duplicate_names.items(): print(f" - {name}: {len(configs)} 条记录") for cfg in configs: print(f" ID: {cfg['id']}, template_code: {cfg.get('template_code', 'N/A')}, 路径: {cfg.get('file_path', 'N/A')}") # 检查未启用的记录 disabled_configs = [cfg for cfg in all_configs if cfg.get('state') != 1] if disabled_configs: print(f"\n[WARN] 发现未启用的模板记录 ({len(disabled_configs)} 个):") for cfg in disabled_configs: print(f" - ID: {cfg['id']}, 名称: {cfg['name']}, 状态: {cfg.get('state')}") # 检查 file_path 为空的记录 empty_path_configs = [cfg for cfg in all_configs if not cfg.get('file_path')] if empty_path_configs: print(f"\n[WARN] 发现 file_path 为空的记录 ({len(empty_path_configs)} 个):") for cfg in empty_path_configs: print(f" - ID: {cfg['id']}, 名称: {cfg['name']}, template_code: {cfg.get('template_code', 'N/A')}") cursor.close() return { 'all_configs': all_configs, 'configs_by_code': configs_by_code, 'configs_by_name': configs_by_name, 'duplicate_codes': duplicate_codes, 'duplicate_names': duplicate_names, 'disabled_configs': disabled_configs, 'empty_path_configs': empty_path_configs } def check_file_field_relations(conn) -> Dict: """检查 f_polic_file_field 表中的关联关系""" print("\n" + "="*80) print("2. 检查 f_polic_file_field 表中的关联关系") print("="*80) cursor = conn.cursor(pymysql.cursors.DictCursor) # 查询所有关联关系 cursor.execute(""" SELECT fff.id, fff.file_id, fff.filed_id, fff.state, fff.tenant_id FROM f_polic_file_field fff WHERE fff.tenant_id = %s ORDER BY fff.file_id, fff.filed_id """, (TENANT_ID,)) all_relations = cursor.fetchall() print(f"\n总关联关系数: {len(all_relations)}") # 检查无效的 file_id(关联到不存在的文件配置) cursor.execute(""" SELECT fff.id, fff.file_id, fff.filed_id FROM f_polic_file_field fff LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id WHERE fff.tenant_id = %s AND fc.id IS NULL """, (TENANT_ID,)) invalid_file_relations = cursor.fetchall() # 检查无效的 filed_id(关联到不存在的字段) cursor.execute(""" SELECT fff.id, fff.file_id, fff.filed_id FROM f_polic_file_field fff LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id WHERE fff.tenant_id = %s AND f.id IS NULL """, (TENANT_ID,)) invalid_field_relations = cursor.fetchall() # 检查重复的关联关系(相同的 file_id 和 filed_id) cursor.execute(""" SELECT file_id, filed_id, COUNT(*) as count, GROUP_CONCAT(id ORDER BY id) as ids FROM f_polic_file_field WHERE tenant_id = %s GROUP BY file_id, filed_id HAVING COUNT(*) > 1 """, (TENANT_ID,)) duplicate_relations = cursor.fetchall() # 检查关联到未启用文件的记录 cursor.execute(""" SELECT fff.id, fff.file_id, fff.filed_id, fc.name as file_name, fc.state as file_state FROM f_polic_file_field fff INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id WHERE fff.tenant_id = %s AND fc.state != 1 """, (TENANT_ID,)) disabled_file_relations = cursor.fetchall() # 检查关联到未启用字段的记录 cursor.execute(""" SELECT fff.id, fff.file_id, fff.filed_id, f.name as field_name, f.filed_code, f.state as field_state FROM f_polic_file_field fff INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id WHERE fff.tenant_id = %s AND f.state != 1 """, (TENANT_ID,)) disabled_field_relations = cursor.fetchall() # 统计每个文件关联的字段数量 file_field_counts = defaultdict(int) for rel in all_relations: file_field_counts[rel['file_id']] += 1 print(f"\n文件关联字段统计:") print(f" 有关联关系的文件数: {len(file_field_counts)}") if file_field_counts: max_count = max(file_field_counts.values()) min_count = min(file_field_counts.values()) avg_count = sum(file_field_counts.values()) / len(file_field_counts) print(f" 每个文件关联字段数: 最少 {min_count}, 最多 {max_count}, 平均 {avg_count:.1f}") # 输出检查结果 if invalid_file_relations: print(f"\n[ERROR] 发现无效的 file_id 关联 ({len(invalid_file_relations)} 条):") for rel in invalid_file_relations[:10]: # 只显示前10条 print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}") if len(invalid_file_relations) > 10: print(f" ... 还有 {len(invalid_file_relations) - 10} 条") else: print(f"\n[OK] 所有 file_id 关联都有效") if invalid_field_relations: print(f"\n[ERROR] 发现无效的 filed_id 关联 ({len(invalid_field_relations)} 条):") for rel in invalid_field_relations[:10]: # 只显示前10条 print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}") if len(invalid_field_relations) > 10: print(f" ... 还有 {len(invalid_field_relations) - 10} 条") else: print(f"\n[OK] 所有 filed_id 关联都有效") if duplicate_relations: print(f"\n[WARN] 发现重复的关联关系 ({len(duplicate_relations)} 组):") for dup in duplicate_relations[:10]: # 只显示前10组 print(f" - file_id: {dup['file_id']}, filed_id: {dup['filed_id']}, 重复次数: {dup['count']}, 关联ID: {dup['ids']}") if len(duplicate_relations) > 10: print(f" ... 还有 {len(duplicate_relations) - 10} 组") else: print(f"\n[OK] 没有重复的关联关系") if disabled_file_relations: print(f"\n[WARN] 发现关联到未启用文件的记录 ({len(disabled_file_relations)} 条):") for rel in disabled_file_relations[:10]: print(f" - 文件: {rel['file_name']} (ID: {rel['file_id']}, 状态: {rel['file_state']})") if len(disabled_file_relations) > 10: print(f" ... 还有 {len(disabled_file_relations) - 10} 条") if disabled_field_relations: print(f"\n[WARN] 发现关联到未启用字段的记录 ({len(disabled_field_relations)} 条):") for rel in disabled_field_relations[:10]: print(f" - 字段: {rel['field_name']} ({rel['filed_code']}, ID: {rel['filed_id']}, 状态: {rel['field_state']})") if len(disabled_field_relations) > 10: print(f" ... 还有 {len(disabled_field_relations) - 10} 条") cursor.close() return { 'all_relations': all_relations, 'invalid_file_relations': invalid_file_relations, 'invalid_field_relations': invalid_field_relations, 'duplicate_relations': duplicate_relations, 'disabled_file_relations': disabled_file_relations, 'disabled_field_relations': disabled_field_relations, 'file_field_counts': dict(file_field_counts) } def check_template_file_mapping(conn, file_configs: Dict) -> Dict: """检查模板文件与数据库记录的映射关系""" print("\n" + "="*80) print("3. 检查模板文件与数据库记录的映射关系") print("="*80) import os templates = get_template_files() print(f"\n本地模板文件数: {len(templates)}") cursor = conn.cursor(pymysql.cursors.DictCursor) # 检查每个模板文件是否在数据库中有对应记录 missing_in_db = [] found_in_db = [] duplicate_mappings = [] for template_name, file_path in templates.items(): template_code = DOCUMENT_TYPE_MAPPING.get(template_name) if not template_code: continue # 通过 name 和 template_code 查找对应的数据库记录 # 优先通过 name 精确匹配,然后通过 template_code 匹配 matching_configs = [] # 1. 通过 name 精确匹配 if template_name in file_configs['configs_by_name']: for config in file_configs['configs_by_name'][template_name]: if config.get('file_path'): # 有文件路径的记录 matching_configs.append(config) # 2. 通过 template_code 匹配 if template_code in file_configs['configs_by_code']: for config in file_configs['configs_by_code'][template_code]: if config.get('file_path') and config not in matching_configs: matching_configs.append(config) if len(matching_configs) == 0: missing_in_db.append({ 'template_name': template_name, 'template_code': template_code, 'file_path': str(file_path) }) elif len(matching_configs) == 1: config = matching_configs[0] found_in_db.append({ 'template_name': template_name, 'template_code': template_code, 'file_id': config['id'], 'file_path': config.get('file_path'), 'name': config.get('name') }) else: # 多个匹配,选择 file_path 最新的(包含最新日期的) duplicate_mappings.append({ 'template_name': template_name, 'template_code': template_code, 'matching_configs': matching_configs }) # 仍然记录第一个作为找到的记录 config = matching_configs[0] found_in_db.append({ 'template_name': template_name, 'template_code': template_code, 'file_id': config['id'], 'file_path': config.get('file_path'), 'name': config.get('name'), 'is_duplicate': True }) print(f"\n找到数据库记录的模板: {len(found_in_db)}") print(f"未找到数据库记录的模板: {len(missing_in_db)}") print(f"有重复映射的模板: {len(duplicate_mappings)}") if duplicate_mappings: print(f"\n[WARN] 以下模板文件在数据库中有多个匹配记录:") for item in duplicate_mappings: print(f" - {item['template_name']} (template_code: {item['template_code']}):") for cfg in item['matching_configs']: print(f" * file_id: {cfg['id']}, name: {cfg.get('name')}, path: {cfg.get('file_path', 'N/A')}") if missing_in_db: print(f"\n[WARN] 以下模板文件在数据库中没有对应记录:") for item in missing_in_db: print(f" - {item['template_name']} (template_code: {item['template_code']})") cursor.close() return { 'found_in_db': found_in_db, 'missing_in_db': missing_in_db, 'duplicate_mappings': duplicate_mappings } def check_field_type_consistency(conn, relations: Dict) -> Dict: """检查关联关系的字段类型一致性""" print("\n" + "="*80) print("4. 检查关联关系的字段类型一致性") print("="*80) cursor = conn.cursor(pymysql.cursors.DictCursor) # 查询所有关联关系及其字段类型 cursor.execute(""" SELECT fff.id, fff.file_id, fff.filed_id, fc.name as file_name, f.name as field_name, f.filed_code, f.field_type, CASE WHEN f.field_type = 1 THEN '输入字段' WHEN f.field_type = 2 THEN '输出字段' ELSE '未知' END as field_type_name FROM f_polic_file_field fff INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id WHERE fff.tenant_id = %s ORDER BY fff.file_id, f.field_type, f.name """, (TENANT_ID,)) all_relations_with_type = cursor.fetchall() # 统计字段类型分布 input_fields = [r for r in all_relations_with_type if r['field_type'] == 1] output_fields = [r for r in all_relations_with_type if r['field_type'] == 2] print(f"\n字段类型统计:") print(f" 输入字段 (field_type=1): {len(input_fields)} 条关联") print(f" 输出字段 (field_type=2): {len(output_fields)} 条关联") # 按文件统计 file_type_counts = defaultdict(lambda: {'input': 0, 'output': 0}) for rel in all_relations_with_type: file_id = rel['file_id'] if rel['field_type'] == 1: file_type_counts[file_id]['input'] += 1 elif rel['field_type'] == 2: file_type_counts[file_id]['output'] += 1 print(f"\n每个文件的字段类型分布:") for file_id, counts in sorted(file_type_counts.items())[:10]: # 只显示前10个 print(f" 文件ID {file_id}: 输入字段 {counts['input']} 个, 输出字段 {counts['output']} 个") if len(file_type_counts) > 10: print(f" ... 还有 {len(file_type_counts) - 10} 个文件") cursor.close() return { 'input_fields': input_fields, 'output_fields': output_fields, 'file_type_counts': dict(file_type_counts) } def main(): """主函数""" print("="*80) print("检查模板的 file_id 和相关关联关系") print("="*80) # 连接数据库 try: conn = pymysql.connect(**DB_CONFIG) print("\n[OK] 数据库连接成功") except Exception as e: print(f"\n[ERROR] 数据库连接失败: {e}") return try: # 1. 检查文件配置表 file_configs = check_file_configs(conn) # 2. 检查文件字段关联表 relations = check_file_field_relations(conn) # 3. 检查模板文件与数据库记录的映射 template_mapping = check_template_file_mapping(conn, file_configs) # 4. 检查字段类型一致性 field_type_info = check_field_type_consistency(conn, relations) # 汇总报告 print("\n" + "="*80) print("检查汇总") print("="*80) issues = [] if file_configs['duplicate_codes']: issues.append(f"发现 {len(file_configs['duplicate_codes'])} 个重复的 template_code") if file_configs['duplicate_names']: issues.append(f"发现 {len(file_configs['duplicate_names'])} 个重复的 name") if file_configs['empty_path_configs']: issues.append(f"发现 {len(file_configs['empty_path_configs'])} 个 file_path 为空的记录") if relations['invalid_file_relations']: issues.append(f"发现 {len(relations['invalid_file_relations'])} 条无效的 file_id 关联") if relations['invalid_field_relations']: issues.append(f"发现 {len(relations['invalid_field_relations'])} 条无效的 filed_id 关联") if relations['duplicate_relations']: issues.append(f"发现 {len(relations['duplicate_relations'])} 组重复的关联关系") if template_mapping['missing_in_db']: issues.append(f"发现 {len(template_mapping['missing_in_db'])} 个模板文件在数据库中没有对应记录") if issues: print("\n[WARN] 发现以下问题:") for issue in issues: print(f" - {issue}") else: print("\n[OK] 未发现严重问题") print(f"\n总模板记录数: {len(file_configs['all_configs'])}") print(f"总关联关系数: {len(relations['all_relations'])}") print(f"有关联关系的文件数: {len(relations['file_field_counts'])}") finally: conn.close() print("\n数据库连接已关闭") if __name__ == '__main__': import os main()