""" 分析和修复字段编码问题 1. 分析f_polic_file_field表中的重复项 2. 检查f_polic_field表中的中文field_code 3. 根据占位符与字段对照表更新field_code 4. 合并重复项并更新关联表 """ import os import json import pymysql import re from typing import Dict, List, Optional, Tuple from datetime import datetime from pathlib import Path # 数据库连接配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 CURRENT_TIME = datetime.now() # 从占位符与字段对照表文档中提取的字段映射 # 格式: {字段名称: field_code} FIELD_NAME_TO_CODE_MAPPING = { # 基本信息字段 '被核查人姓名': 'target_name', '被核查人员单位及职务': 'target_organization_and_position', '被核查人员单位': 'target_organization', '被核查人员职务': 'target_position', '被核查人员性别': 'target_gender', '被核查人员出生年月': 'target_date_of_birth', '被核查人员出生年月日': 'target_date_of_birth_full', '被核查人员年龄': 'target_age', '被核查人员文化程度': 'target_education_level', '被核查人员政治面貌': 'target_political_status', '被核查人员职级': 'target_professional_rank', '被核查人员身份证号': 'target_id_number', '被核查人员身份证件及号码': 'target_id_number', '被核查人员住址': 'target_address', '被核查人员户籍住址': 'target_registered_address', '被核查人员联系方式': 'target_contact', '被核查人员籍贯': 'target_place_of_origin', '被核查人员民族': 'target_ethnicity', # 问题相关字段 '线索来源': 'clue_source', '主要问题线索': 'target_issue_description', '被核查人问题描述': 'target_problem_description', # 审批相关字段 '初步核实审批表承办部门意见': 'department_opinion', '初步核实审批表填表人': 'filler_name', '批准时间': 'approval_time', # 核查相关字段 '核查单位名称': 'investigation_unit_name', '核查组代号': 'investigation_team_code', '核查组组长姓名': 'investigation_team_leader_name', '核查组成员姓名': 'investigation_team_member_names', '核查地点': 'investigation_location', # 风险评估相关字段 '被核查人员家庭情况': 'target_family_situation', '被核查人员社会关系': 'target_social_relations', '被核查人员健康状况': 'target_health_status', '被核查人员性格特征': 'target_personality', '被核查人员承受能力': 'target_tolerance', '被核查人员涉及问题严重程度': 'target_issue_severity', '被核查人员涉及其他问题的可能性': 'target_other_issues_possibility', '被核查人员此前被审查情况': 'target_previous_investigation', '被核查人员社会负面事件': 'target_negative_events', '被核查人员其他情况': 'target_other_situation', '风险等级': 'risk_level', # 其他字段 '线索信息': 'clue_info', '被核查人员工作基本情况线索': 'target_basic_info_clue', '被核查人员工作基本情况': 'target_work_basic_info', '请示报告卡请示时间': 'report_card_request_time', '应到时间': 'appointment_time', '应到地点': 'appointment_location', '承办部门': 'handling_department', '承办人': 'handler_name', '谈话通知时间': 'notification_time', '谈话通知地点': 'notification_location', '被核查人员本人认识和态度': 'target_attitude', '纪委名称': 'commission_name', } def is_chinese(text: str) -> bool: """判断字符串是否包含中文字符""" if not text: return False return bool(re.search(r'[\u4e00-\u9fff]', text)) def analyze_f_polic_field(conn) -> Dict: """分析f_polic_field表,找出中文field_code和重复项""" cursor = conn.cursor(pymysql.cursors.DictCursor) print("\n" + "="*80) print("1. 分析 f_polic_field 表") print("="*80) # 查询所有字段 cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s ORDER BY name, filed_code """, (TENANT_ID,)) fields = cursor.fetchall() print(f"\n总共找到 {len(fields)} 个字段记录") # 找出中文field_code chinese_field_codes = [] for field in fields: if is_chinese(field['filed_code']): chinese_field_codes.append(field) print(f"\n发现 {len(chinese_field_codes)} 个中文field_code:") for field in chinese_field_codes: print(f" - ID: {field['id']}, 名称: {field['name']}, field_code: {field['filed_code']}") # 找出重复的字段名称 name_to_fields = {} for field in fields: name = field['name'] if name not in name_to_fields: name_to_fields[name] = [] name_to_fields[name].append(field) duplicates = {name: fields_list for name, fields_list in name_to_fields.items() if len(fields_list) > 1} print(f"\n发现 {len(duplicates)} 个重复的字段名称:") for name, fields_list in duplicates.items(): print(f"\n 字段名称: {name} (共 {len(fields_list)} 条记录)") for field in fields_list: print(f" - ID: {field['id']}, field_code: {field['filed_code']}, " f"field_type: {field['field_type']}, state: {field['state']}") # 找出重复的field_code code_to_fields = {} for field in fields: code = field['filed_code'] if code not in code_to_fields: code_to_fields[code] = [] code_to_fields[code].append(field) duplicate_codes = {code: fields_list for code, fields_list in code_to_fields.items() if len(fields_list) > 1} print(f"\n发现 {len(duplicate_codes)} 个重复的field_code:") for code, fields_list in duplicate_codes.items(): print(f"\n field_code: {code} (共 {len(fields_list)} 条记录)") for field in fields_list: print(f" - ID: {field['id']}, 名称: {field['name']}, " f"field_type: {field['field_type']}, state: {field['state']}") return { 'all_fields': fields, 'chinese_field_codes': chinese_field_codes, 'duplicate_names': duplicates, 'duplicate_codes': duplicate_codes } def analyze_f_polic_file_field(conn) -> Dict: """分析f_polic_file_field表,找出重复项""" cursor = conn.cursor(pymysql.cursors.DictCursor) print("\n" + "="*80) print("2. 分析 f_polic_file_field 表") print("="*80) # 查询所有关联关系 cursor.execute(""" SELECT fff.id, fff.file_id, fff.filed_id, fc.name as file_name, f.name as field_name, f.filed_code FROM f_polic_file_field fff LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id LEFT JOIN f_polic_field f ON fff.filed_id = f.id WHERE fff.tenant_id = %s ORDER BY fff.file_id, fff.filed_id """, (TENANT_ID,)) relations = cursor.fetchall() print(f"\n总共找到 {len(relations)} 个关联关系") # 找出重复的关联关系(相同的file_id和filed_id) relation_key_to_records = {} for rel in relations: key = (rel['file_id'], rel['filed_id']) if key not in relation_key_to_records: relation_key_to_records[key] = [] relation_key_to_records[key].append(rel) duplicates = {key: records for key, records in relation_key_to_records.items() if len(records) > 1} print(f"\n发现 {len(duplicates)} 个重复的关联关系:") for (file_id, filed_id), records in duplicates.items(): print(f"\n 文件ID: {file_id}, 字段ID: {filed_id} (共 {len(records)} 条记录)") for record in records: print(f" - 关联ID: {record['id']}, 文件: {record['file_name']}, " f"字段: {record['field_name']} ({record['filed_code']})") # 统计使用中文field_code的关联关系 chinese_relations = [rel for rel in relations if rel['filed_code'] and is_chinese(rel['filed_code'])] print(f"\n发现 {len(chinese_relations)} 个使用中文field_code的关联关系:") for rel in chinese_relations[:10]: # 只显示前10个 print(f" - 文件: {rel['file_name']}, 字段: {rel['field_name']}, " f"field_code: {rel['filed_code']}") if len(chinese_relations) > 10: print(f" ... 还有 {len(chinese_relations) - 10} 个") return { 'all_relations': relations, 'duplicate_relations': duplicates, 'chinese_relations': chinese_relations } def get_correct_field_code(field_name: str, current_code: str) -> Optional[str]: """根据字段名称获取正确的field_code""" # 首先从映射表中查找 if field_name in FIELD_NAME_TO_CODE_MAPPING: return FIELD_NAME_TO_CODE_MAPPING[field_name] # 如果当前code已经是英文且符合规范,保留 if current_code and not is_chinese(current_code) and re.match(r'^[a-z_]+$', current_code): return current_code return None def fix_f_polic_field(conn, dry_run: bool = True) -> Dict: """修复f_polic_field表中的问题""" cursor = conn.cursor(pymysql.cursors.DictCursor) print("\n" + "="*80) print("3. 修复 f_polic_field 表") print("="*80) if dry_run: print("\n[DRY RUN模式 - 不会实际修改数据库]") # 获取所有字段 cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s """, (TENANT_ID,)) fields = cursor.fetchall() updates = [] merges = [] # 按字段名称分组,找出需要合并的重复项 name_to_fields = {} for field in fields: name = field['name'] if name not in name_to_fields: name_to_fields[name] = [] name_to_fields[name].append(field) # 处理每个字段名称 for field_name, field_list in name_to_fields.items(): if len(field_list) == 1: # 单个字段,检查是否需要更新field_code field = field_list[0] correct_code = get_correct_field_code(field['name'], field['filed_code']) if correct_code and correct_code != field['filed_code']: updates.append({ 'id': field['id'], 'name': field['name'], 'old_code': field['filed_code'], 'new_code': correct_code, 'field_type': field['field_type'] }) else: # 多个字段,需要合并 # 找出最佳的field_code best_field = None best_code = None for field in field_list: correct_code = get_correct_field_code(field['name'], field['filed_code']) if correct_code: if not best_field or (field['state'] == 1 and best_field['state'] == 0): best_field = field best_code = correct_code # 如果没找到最佳字段,选择第一个启用的,或者第一个 if not best_field: enabled_fields = [f for f in field_list if f['state'] == 1] best_field = enabled_fields[0] if enabled_fields else field_list[0] best_code = get_correct_field_code(best_field['name'], best_field['filed_code']) if not best_code: # 生成一个基于名称的code best_code = field_name.lower().replace('被核查人员', 'target_').replace('被核查人', 'target_') best_code = re.sub(r'[^\w]', '_', best_code) best_code = re.sub(r'_+', '_', best_code).strip('_') # 确定要保留的字段和要删除的字段 keep_field = best_field remove_fields = [f for f in field_list if f['id'] != keep_field['id']] # 更新保留字段的field_code if best_code and best_code != keep_field['filed_code']: updates.append({ 'id': keep_field['id'], 'name': keep_field['name'], 'old_code': keep_field['filed_code'], 'new_code': best_code, 'field_type': keep_field['field_type'] }) merges.append({ 'keep_field_id': keep_field['id'], 'keep_field_name': keep_field['name'], 'keep_field_code': best_code or keep_field['filed_code'], 'remove_field_ids': [f['id'] for f in remove_fields], 'remove_fields': remove_fields }) # 显示更新计划 print(f"\n需要更新 {len(updates)} 个字段的field_code:") for update in updates: print(f" - ID: {update['id']}, 名称: {update['name']}, " f"{update['old_code']} -> {update['new_code']}") print(f"\n需要合并 {len(merges)} 组重复字段:") for merge in merges: print(f"\n 保留字段: ID={merge['keep_field_id']}, 名称={merge['keep_field_name']}, " f"field_code={merge['keep_field_code']}") print(f" 删除字段: {len(merge['remove_field_ids'])} 个") for remove_field in merge['remove_fields']: print(f" - ID: {remove_field['id']}, field_code: {remove_field['filed_code']}, " f"field_type: {remove_field['field_type']}, state: {remove_field['state']}") # 执行更新 if not dry_run: print("\n开始执行更新...") # 1. 先更新field_code for update in updates: cursor.execute(""" UPDATE f_polic_field SET filed_code = %s, updated_time = %s, updated_by = %s WHERE id = %s """, (update['new_code'], CURRENT_TIME, UPDATED_BY, update['id'])) print(f" ✓ 更新字段 ID {update['id']}: {update['old_code']} -> {update['new_code']}") # 2. 合并重复字段:先更新关联表,再删除重复字段 for merge in merges: keep_id = merge['keep_field_id'] for remove_id in merge['remove_field_ids']: # 更新f_polic_file_field表中的关联 cursor.execute(""" UPDATE f_polic_file_field SET filed_id = %s, updated_time = %s, updated_by = %s WHERE filed_id = %s AND tenant_id = %s """, (keep_id, CURRENT_TIME, UPDATED_BY, remove_id, TENANT_ID)) # 删除重复的字段记录 cursor.execute(""" DELETE FROM f_polic_field WHERE id = %s AND tenant_id = %s """, (remove_id, TENANT_ID)) print(f" ✓ 合并字段: 保留 ID {keep_id}, 删除 {len(merge['remove_field_ids'])} 个重复字段") conn.commit() print("\n✓ 更新完成") else: print("\n[DRY RUN] 以上操作不会实际执行") return { 'updates': updates, 'merges': merges } def fix_f_polic_file_field(conn, dry_run: bool = True) -> Dict: """修复f_polic_file_field表中的重复项""" cursor = conn.cursor(pymysql.cursors.DictCursor) print("\n" + "="*80) print("4. 修复 f_polic_file_field 表") print("="*80) if dry_run: print("\n[DRY RUN模式 - 不会实际修改数据库]") # 找出重复的关联关系 cursor.execute(""" SELECT file_id, filed_id, COUNT(*) as count, GROUP_CONCAT(id) as ids FROM f_polic_file_field WHERE tenant_id = %s GROUP BY file_id, filed_id HAVING count > 1 """, (TENANT_ID,)) duplicates = cursor.fetchall() print(f"\n发现 {len(duplicates)} 组重复的关联关系") deletes = [] for dup in duplicates: file_id = dup['file_id'] filed_id = dup['filed_id'] ids = [int(id_str) for id_str in dup['ids'].split(',')] # 保留第一个,删除其他的 keep_id = ids[0] remove_ids = ids[1:] deletes.append({ 'file_id': file_id, 'filed_id': filed_id, 'keep_id': keep_id, 'remove_ids': remove_ids }) print(f"\n 文件ID: {file_id}, 字段ID: {filed_id}") print(f" 保留关联ID: {keep_id}") print(f" 删除关联ID: {', '.join(map(str, remove_ids))}") # 执行删除 if not dry_run: print("\n开始删除重复的关联关系...") for delete in deletes: for remove_id in delete['remove_ids']: cursor.execute(""" DELETE FROM f_polic_file_field WHERE id = %s AND tenant_id = %s """, (remove_id, TENANT_ID)) print(f" ✓ 删除文件ID {delete['file_id']} 和字段ID {delete['filed_id']} 的重复关联") conn.commit() print("\n✓ 删除完成") else: print("\n[DRY RUN] 以上操作不会实际执行") return { 'deletes': deletes } def check_other_tables(conn): """检查其他可能受影响的表""" cursor = conn.cursor(pymysql.cursors.DictCursor) print("\n" + "="*80) print("5. 检查其他关联表") print("="*80) # 检查f_polic_task表 print("\n检查 f_polic_task 表...") try: cursor.execute(""" SELECT COUNT(*) as count FROM f_polic_task WHERE tenant_id = %s """, (TENANT_ID,)) task_count = cursor.fetchone()['count'] print(f" 找到 {task_count} 个任务记录") # 检查是否有引用字段ID的列 cursor.execute("DESCRIBE f_polic_task") columns = [col['Field'] for col in cursor.fetchall()] print(f" 表字段: {', '.join(columns)}") # 检查是否有引用f_polic_field的字段 field_refs = [col for col in columns if 'field' in col.lower() or 'filed' in col.lower()] if field_refs: print(f" 可能引用字段的列: {', '.join(field_refs)}") except Exception as e: print(f" 检查f_polic_task表时出错: {e}") # 检查f_polic_file表 print("\n检查 f_polic_file 表...") try: cursor.execute(""" SELECT COUNT(*) as count FROM f_polic_file WHERE tenant_id = %s """, (TENANT_ID,)) file_count = cursor.fetchone()['count'] print(f" 找到 {file_count} 个文件记录") cursor.execute("DESCRIBE f_polic_file") columns = [col['Field'] for col in cursor.fetchall()] print(f" 表字段: {', '.join(columns)}") except Exception as e: print(f" 检查f_polic_file表时出错: {e}") def main(): """主函数""" print("="*80) print("字段编码问题分析和修复工具") print("="*80) try: conn = pymysql.connect(**DB_CONFIG) # 1. 分析f_polic_field表 field_analysis = analyze_f_polic_field(conn) # 2. 分析f_polic_file_field表 relation_analysis = analyze_f_polic_file_field(conn) # 3. 检查其他表 check_other_tables(conn) # 4. 询问是否执行修复 print("\n" + "="*80) print("分析完成") print("="*80) print("\n是否执行修复?") print("1. 先执行DRY RUN(不实际修改数据库)") print("2. 直接执行修复(会修改数据库)") print("3. 仅查看分析结果,不执行修复") choice = input("\n请选择 (1/2/3,默认1): ").strip() or "1" if choice == "1": # DRY RUN print("\n" + "="*80) print("执行DRY RUN...") print("="*80) fix_f_polic_field(conn, dry_run=True) fix_f_polic_file_field(conn, dry_run=True) print("\n" + "="*80) confirm = input("DRY RUN完成。是否执行实际修复?(y/n,默认n): ").strip().lower() if confirm == 'y': print("\n执行实际修复...") fix_f_polic_field(conn, dry_run=False) fix_f_polic_file_field(conn, dry_run=False) print("\n✓ 修复完成!") elif choice == "2": # 直接执行 print("\n" + "="*80) print("执行修复...") print("="*80) fix_f_polic_field(conn, dry_run=False) fix_f_polic_file_field(conn, dry_run=False) print("\n✓ 修复完成!") else: print("\n仅查看分析结果,未执行修复") conn.close() except Exception as e: print(f"\n✗ 执行失败: {e}") import traceback traceback.print_exc() if __name__ == '__main__': main()