ai-business-write/analyze_and_fix_field_code_issues.py

583 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
分析和修复字段编码问题
1. 分析f_polic_file_field表中的重复项
2. 检查f_polic_field表中的中文field_code
3. 根据占位符与字段对照表更新field_code
4. 合并重复项并更新关联表
"""
import os
import json
import pymysql
import re
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from pathlib import Path
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
CURRENT_TIME = datetime.now()
# 从占位符与字段对照表文档中提取的字段映射
# 格式: {字段名称: field_code}
FIELD_NAME_TO_CODE_MAPPING = {
# 基本信息字段
'被核查人姓名': 'target_name',
'被核查人员单位及职务': 'target_organization_and_position',
'被核查人员单位': 'target_organization',
'被核查人员职务': 'target_position',
'被核查人员性别': 'target_gender',
'被核查人员出生年月': 'target_date_of_birth',
'被核查人员出生年月日': 'target_date_of_birth_full',
'被核查人员年龄': 'target_age',
'被核查人员文化程度': 'target_education_level',
'被核查人员政治面貌': 'target_political_status',
'被核查人员职级': 'target_professional_rank',
'被核查人员身份证号': 'target_id_number',
'被核查人员身份证件及号码': 'target_id_number',
'被核查人员住址': 'target_address',
'被核查人员户籍住址': 'target_registered_address',
'被核查人员联系方式': 'target_contact',
'被核查人员籍贯': 'target_place_of_origin',
'被核查人员民族': 'target_ethnicity',
# 问题相关字段
'线索来源': 'clue_source',
'主要问题线索': 'target_issue_description',
'被核查人问题描述': 'target_problem_description',
# 审批相关字段
'初步核实审批表承办部门意见': 'department_opinion',
'初步核实审批表填表人': 'filler_name',
'批准时间': 'approval_time',
# 核查相关字段
'核查单位名称': 'investigation_unit_name',
'核查组代号': 'investigation_team_code',
'核查组组长姓名': 'investigation_team_leader_name',
'核查组成员姓名': 'investigation_team_member_names',
'核查地点': 'investigation_location',
# 风险评估相关字段
'被核查人员家庭情况': 'target_family_situation',
'被核查人员社会关系': 'target_social_relations',
'被核查人员健康状况': 'target_health_status',
'被核查人员性格特征': 'target_personality',
'被核查人员承受能力': 'target_tolerance',
'被核查人员涉及问题严重程度': 'target_issue_severity',
'被核查人员涉及其他问题的可能性': 'target_other_issues_possibility',
'被核查人员此前被审查情况': 'target_previous_investigation',
'被核查人员社会负面事件': 'target_negative_events',
'被核查人员其他情况': 'target_other_situation',
'风险等级': 'risk_level',
# 其他字段
'线索信息': 'clue_info',
'被核查人员工作基本情况线索': 'target_basic_info_clue',
'被核查人员工作基本情况': 'target_work_basic_info',
'请示报告卡请示时间': 'report_card_request_time',
'应到时间': 'appointment_time',
'应到地点': 'appointment_location',
'承办部门': 'handling_department',
'承办人': 'handler_name',
'谈话通知时间': 'notification_time',
'谈话通知地点': 'notification_location',
'被核查人员本人认识和态度': 'target_attitude',
'纪委名称': 'commission_name',
}
def is_chinese(text: str) -> bool:
"""判断字符串是否包含中文字符"""
if not text:
return False
return bool(re.search(r'[\u4e00-\u9fff]', text))
def analyze_f_polic_field(conn) -> Dict:
"""分析f_polic_field表找出中文field_code和重复项"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("1. 分析 f_polic_field 表")
print("="*80)
# 查询所有字段
cursor.execute("""
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
ORDER BY name, filed_code
""", (TENANT_ID,))
fields = cursor.fetchall()
print(f"\n总共找到 {len(fields)} 个字段记录")
# 找出中文field_code
chinese_field_codes = []
for field in fields:
if is_chinese(field['filed_code']):
chinese_field_codes.append(field)
print(f"\n发现 {len(chinese_field_codes)} 个中文field_code:")
for field in chinese_field_codes:
print(f" - ID: {field['id']}, 名称: {field['name']}, field_code: {field['filed_code']}")
# 找出重复的字段名称
name_to_fields = {}
for field in fields:
name = field['name']
if name not in name_to_fields:
name_to_fields[name] = []
name_to_fields[name].append(field)
duplicates = {name: fields_list for name, fields_list in name_to_fields.items()
if len(fields_list) > 1}
print(f"\n发现 {len(duplicates)} 个重复的字段名称:")
for name, fields_list in duplicates.items():
print(f"\n 字段名称: {name} (共 {len(fields_list)} 条记录)")
for field in fields_list:
print(f" - ID: {field['id']}, field_code: {field['filed_code']}, "
f"field_type: {field['field_type']}, state: {field['state']}")
# 找出重复的field_code
code_to_fields = {}
for field in fields:
code = field['filed_code']
if code not in code_to_fields:
code_to_fields[code] = []
code_to_fields[code].append(field)
duplicate_codes = {code: fields_list for code, fields_list in code_to_fields.items()
if len(fields_list) > 1}
print(f"\n发现 {len(duplicate_codes)} 个重复的field_code:")
for code, fields_list in duplicate_codes.items():
print(f"\n field_code: {code} (共 {len(fields_list)} 条记录)")
for field in fields_list:
print(f" - ID: {field['id']}, 名称: {field['name']}, "
f"field_type: {field['field_type']}, state: {field['state']}")
return {
'all_fields': fields,
'chinese_field_codes': chinese_field_codes,
'duplicate_names': duplicates,
'duplicate_codes': duplicate_codes
}
def analyze_f_polic_file_field(conn) -> Dict:
"""分析f_polic_file_field表找出重复项"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("2. 分析 f_polic_file_field 表")
print("="*80)
# 查询所有关联关系
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id,
fc.name as file_name, f.name as field_name, f.filed_code
FROM f_polic_file_field fff
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id
LEFT JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fff.tenant_id = %s
ORDER BY fff.file_id, fff.filed_id
""", (TENANT_ID,))
relations = cursor.fetchall()
print(f"\n总共找到 {len(relations)} 个关联关系")
# 找出重复的关联关系相同的file_id和filed_id
relation_key_to_records = {}
for rel in relations:
key = (rel['file_id'], rel['filed_id'])
if key not in relation_key_to_records:
relation_key_to_records[key] = []
relation_key_to_records[key].append(rel)
duplicates = {key: records for key, records in relation_key_to_records.items()
if len(records) > 1}
print(f"\n发现 {len(duplicates)} 个重复的关联关系:")
for (file_id, filed_id), records in duplicates.items():
print(f"\n 文件ID: {file_id}, 字段ID: {filed_id} (共 {len(records)} 条记录)")
for record in records:
print(f" - 关联ID: {record['id']}, 文件: {record['file_name']}, "
f"字段: {record['field_name']} ({record['filed_code']})")
# 统计使用中文field_code的关联关系
chinese_relations = [rel for rel in relations if rel['filed_code'] and is_chinese(rel['filed_code'])]
print(f"\n发现 {len(chinese_relations)} 个使用中文field_code的关联关系:")
for rel in chinese_relations[:10]: # 只显示前10个
print(f" - 文件: {rel['file_name']}, 字段: {rel['field_name']}, "
f"field_code: {rel['filed_code']}")
if len(chinese_relations) > 10:
print(f" ... 还有 {len(chinese_relations) - 10}")
return {
'all_relations': relations,
'duplicate_relations': duplicates,
'chinese_relations': chinese_relations
}
def get_correct_field_code(field_name: str, current_code: str) -> Optional[str]:
"""根据字段名称获取正确的field_code"""
# 首先从映射表中查找
if field_name in FIELD_NAME_TO_CODE_MAPPING:
return FIELD_NAME_TO_CODE_MAPPING[field_name]
# 如果当前code已经是英文且符合规范保留
if current_code and not is_chinese(current_code) and re.match(r'^[a-z_]+$', current_code):
return current_code
return None
def fix_f_polic_field(conn, dry_run: bool = True) -> Dict:
"""修复f_polic_field表中的问题"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("3. 修复 f_polic_field 表")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
# 获取所有字段
cursor.execute("""
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
""", (TENANT_ID,))
fields = cursor.fetchall()
updates = []
merges = []
# 按字段名称分组,找出需要合并的重复项
name_to_fields = {}
for field in fields:
name = field['name']
if name not in name_to_fields:
name_to_fields[name] = []
name_to_fields[name].append(field)
# 处理每个字段名称
for field_name, field_list in name_to_fields.items():
if len(field_list) == 1:
# 单个字段检查是否需要更新field_code
field = field_list[0]
correct_code = get_correct_field_code(field['name'], field['filed_code'])
if correct_code and correct_code != field['filed_code']:
updates.append({
'id': field['id'],
'name': field['name'],
'old_code': field['filed_code'],
'new_code': correct_code,
'field_type': field['field_type']
})
else:
# 多个字段,需要合并
# 找出最佳的field_code
best_field = None
best_code = None
for field in field_list:
correct_code = get_correct_field_code(field['name'], field['filed_code'])
if correct_code:
if not best_field or (field['state'] == 1 and best_field['state'] == 0):
best_field = field
best_code = correct_code
# 如果没找到最佳字段,选择第一个启用的,或者第一个
if not best_field:
enabled_fields = [f for f in field_list if f['state'] == 1]
best_field = enabled_fields[0] if enabled_fields else field_list[0]
best_code = get_correct_field_code(best_field['name'], best_field['filed_code'])
if not best_code:
# 生成一个基于名称的code
best_code = field_name.lower().replace('被核查人员', 'target_').replace('被核查人', 'target_')
best_code = re.sub(r'[^\w]', '_', best_code)
best_code = re.sub(r'_+', '_', best_code).strip('_')
# 确定要保留的字段和要删除的字段
keep_field = best_field
remove_fields = [f for f in field_list if f['id'] != keep_field['id']]
# 更新保留字段的field_code
if best_code and best_code != keep_field['filed_code']:
updates.append({
'id': keep_field['id'],
'name': keep_field['name'],
'old_code': keep_field['filed_code'],
'new_code': best_code,
'field_type': keep_field['field_type']
})
merges.append({
'keep_field_id': keep_field['id'],
'keep_field_name': keep_field['name'],
'keep_field_code': best_code or keep_field['filed_code'],
'remove_field_ids': [f['id'] for f in remove_fields],
'remove_fields': remove_fields
})
# 显示更新计划
print(f"\n需要更新 {len(updates)} 个字段的field_code:")
for update in updates:
print(f" - ID: {update['id']}, 名称: {update['name']}, "
f"{update['old_code']} -> {update['new_code']}")
print(f"\n需要合并 {len(merges)} 组重复字段:")
for merge in merges:
print(f"\n 保留字段: ID={merge['keep_field_id']}, 名称={merge['keep_field_name']}, "
f"field_code={merge['keep_field_code']}")
print(f" 删除字段: {len(merge['remove_field_ids'])}")
for remove_field in merge['remove_fields']:
print(f" - ID: {remove_field['id']}, field_code: {remove_field['filed_code']}, "
f"field_type: {remove_field['field_type']}, state: {remove_field['state']}")
# 执行更新
if not dry_run:
print("\n开始执行更新...")
# 1. 先更新field_code
for update in updates:
cursor.execute("""
UPDATE f_polic_field
SET filed_code = %s, updated_time = %s, updated_by = %s
WHERE id = %s
""", (update['new_code'], CURRENT_TIME, UPDATED_BY, update['id']))
print(f" ✓ 更新字段 ID {update['id']}: {update['old_code']} -> {update['new_code']}")
# 2. 合并重复字段:先更新关联表,再删除重复字段
for merge in merges:
keep_id = merge['keep_field_id']
for remove_id in merge['remove_field_ids']:
# 更新f_polic_file_field表中的关联
cursor.execute("""
UPDATE f_polic_file_field
SET filed_id = %s, updated_time = %s, updated_by = %s
WHERE filed_id = %s AND tenant_id = %s
""", (keep_id, CURRENT_TIME, UPDATED_BY, remove_id, TENANT_ID))
# 删除重复的字段记录
cursor.execute("""
DELETE FROM f_polic_field
WHERE id = %s AND tenant_id = %s
""", (remove_id, TENANT_ID))
print(f" ✓ 合并字段: 保留 ID {keep_id}, 删除 {len(merge['remove_field_ids'])} 个重复字段")
conn.commit()
print("\n✓ 更新完成")
else:
print("\n[DRY RUN] 以上操作不会实际执行")
return {
'updates': updates,
'merges': merges
}
def fix_f_polic_file_field(conn, dry_run: bool = True) -> Dict:
"""修复f_polic_file_field表中的重复项"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("4. 修复 f_polic_file_field 表")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
# 找出重复的关联关系
cursor.execute("""
SELECT file_id, filed_id, COUNT(*) as count, GROUP_CONCAT(id) as ids
FROM f_polic_file_field
WHERE tenant_id = %s
GROUP BY file_id, filed_id
HAVING count > 1
""", (TENANT_ID,))
duplicates = cursor.fetchall()
print(f"\n发现 {len(duplicates)} 组重复的关联关系")
deletes = []
for dup in duplicates:
file_id = dup['file_id']
filed_id = dup['filed_id']
ids = [int(id_str) for id_str in dup['ids'].split(',')]
# 保留第一个,删除其他的
keep_id = ids[0]
remove_ids = ids[1:]
deletes.append({
'file_id': file_id,
'filed_id': filed_id,
'keep_id': keep_id,
'remove_ids': remove_ids
})
print(f"\n 文件ID: {file_id}, 字段ID: {filed_id}")
print(f" 保留关联ID: {keep_id}")
print(f" 删除关联ID: {', '.join(map(str, remove_ids))}")
# 执行删除
if not dry_run:
print("\n开始删除重复的关联关系...")
for delete in deletes:
for remove_id in delete['remove_ids']:
cursor.execute("""
DELETE FROM f_polic_file_field
WHERE id = %s AND tenant_id = %s
""", (remove_id, TENANT_ID))
print(f" ✓ 删除文件ID {delete['file_id']} 和字段ID {delete['filed_id']} 的重复关联")
conn.commit()
print("\n✓ 删除完成")
else:
print("\n[DRY RUN] 以上操作不会实际执行")
return {
'deletes': deletes
}
def check_other_tables(conn):
"""检查其他可能受影响的表"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("5. 检查其他关联表")
print("="*80)
# 检查f_polic_task表
print("\n检查 f_polic_task 表...")
try:
cursor.execute("""
SELECT COUNT(*) as count
FROM f_polic_task
WHERE tenant_id = %s
""", (TENANT_ID,))
task_count = cursor.fetchone()['count']
print(f" 找到 {task_count} 个任务记录")
# 检查是否有引用字段ID的列
cursor.execute("DESCRIBE f_polic_task")
columns = [col['Field'] for col in cursor.fetchall()]
print(f" 表字段: {', '.join(columns)}")
# 检查是否有引用f_polic_field的字段
field_refs = [col for col in columns if 'field' in col.lower() or 'filed' in col.lower()]
if field_refs:
print(f" 可能引用字段的列: {', '.join(field_refs)}")
except Exception as e:
print(f" 检查f_polic_task表时出错: {e}")
# 检查f_polic_file表
print("\n检查 f_polic_file 表...")
try:
cursor.execute("""
SELECT COUNT(*) as count
FROM f_polic_file
WHERE tenant_id = %s
""", (TENANT_ID,))
file_count = cursor.fetchone()['count']
print(f" 找到 {file_count} 个文件记录")
cursor.execute("DESCRIBE f_polic_file")
columns = [col['Field'] for col in cursor.fetchall()]
print(f" 表字段: {', '.join(columns)}")
except Exception as e:
print(f" 检查f_polic_file表时出错: {e}")
def main():
"""主函数"""
print("="*80)
print("字段编码问题分析和修复工具")
print("="*80)
try:
conn = pymysql.connect(**DB_CONFIG)
# 1. 分析f_polic_field表
field_analysis = analyze_f_polic_field(conn)
# 2. 分析f_polic_file_field表
relation_analysis = analyze_f_polic_file_field(conn)
# 3. 检查其他表
check_other_tables(conn)
# 4. 询问是否执行修复
print("\n" + "="*80)
print("分析完成")
print("="*80)
print("\n是否执行修复?")
print("1. 先执行DRY RUN不实际修改数据库")
print("2. 直接执行修复(会修改数据库)")
print("3. 仅查看分析结果,不执行修复")
choice = input("\n请选择 (1/2/3默认1): ").strip() or "1"
if choice == "1":
# DRY RUN
print("\n" + "="*80)
print("执行DRY RUN...")
print("="*80)
fix_f_polic_field(conn, dry_run=True)
fix_f_polic_file_field(conn, dry_run=True)
print("\n" + "="*80)
confirm = input("DRY RUN完成。是否执行实际修复(y/n默认n): ").strip().lower()
if confirm == 'y':
print("\n执行实际修复...")
fix_f_polic_field(conn, dry_run=False)
fix_f_polic_file_field(conn, dry_run=False)
print("\n✓ 修复完成!")
elif choice == "2":
# 直接执行
print("\n" + "="*80)
print("执行修复...")
print("="*80)
fix_f_polic_field(conn, dry_run=False)
fix_f_polic_file_field(conn, dry_run=False)
print("\n✓ 修复完成!")
else:
print("\n仅查看分析结果,未执行修复")
conn.close()
except Exception as e:
print(f"\n✗ 执行失败: {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()