552 lines
19 KiB
Python
552 lines
19 KiB
Python
"""
|
||
检查并修复 f_polic_file_field 表的关联关系
|
||
1. 检查无效的关联(关联到不存在的 file_id 或 filed_id)
|
||
2. 检查重复的关联关系
|
||
3. 检查关联到已删除或未启用的字段/文件
|
||
4. 根据其他表的数据更新关联关系
|
||
"""
|
||
import pymysql
|
||
import os
|
||
from typing import Dict, List, Tuple
|
||
from collections import defaultdict
|
||
|
||
# 数据库连接配置
|
||
DB_CONFIG = {
|
||
'host': os.getenv('DB_HOST', '152.136.177.240'),
|
||
'port': int(os.getenv('DB_PORT', 5012)),
|
||
'user': os.getenv('DB_USER', 'finyx'),
|
||
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
|
||
'database': os.getenv('DB_NAME', 'finyx'),
|
||
'charset': 'utf8mb4'
|
||
}
|
||
|
||
TENANT_ID = 615873064429507639
|
||
|
||
|
||
def check_invalid_relations(conn) -> Dict:
|
||
"""检查无效的关联关系(关联到不存在的 file_id 或 filed_id)"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
print("\n" + "="*80)
|
||
print("1. 检查无效的关联关系")
|
||
print("="*80)
|
||
|
||
# 检查关联到不存在的 file_id
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id, fff.tenant_id
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
|
||
WHERE fff.tenant_id = %s AND fc.id IS NULL
|
||
""", (TENANT_ID,))
|
||
invalid_file_relations = cursor.fetchall()
|
||
|
||
# 检查关联到不存在的 filed_id
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id, fff.tenant_id
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s AND f.id IS NULL
|
||
""", (TENANT_ID,))
|
||
invalid_field_relations = cursor.fetchall()
|
||
|
||
print(f"\n关联到不存在的 file_id: {len(invalid_file_relations)} 条")
|
||
if invalid_file_relations:
|
||
print(" 详情:")
|
||
for rel in invalid_file_relations[:10]:
|
||
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
|
||
if len(invalid_file_relations) > 10:
|
||
print(f" ... 还有 {len(invalid_file_relations) - 10} 条")
|
||
|
||
print(f"\n关联到不存在的 filed_id: {len(invalid_field_relations)} 条")
|
||
if invalid_field_relations:
|
||
print(" 详情:")
|
||
for rel in invalid_field_relations[:10]:
|
||
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
|
||
if len(invalid_field_relations) > 10:
|
||
print(f" ... 还有 {len(invalid_field_relations) - 10} 条")
|
||
|
||
return {
|
||
'invalid_file_relations': invalid_file_relations,
|
||
'invalid_field_relations': invalid_field_relations
|
||
}
|
||
|
||
|
||
def check_duplicate_relations(conn) -> Dict:
|
||
"""检查重复的关联关系(相同的 file_id 和 filed_id)"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
print("\n" + "="*80)
|
||
print("2. 检查重复的关联关系")
|
||
print("="*80)
|
||
|
||
# 查找重复的关联关系
|
||
cursor.execute("""
|
||
SELECT file_id, filed_id, COUNT(*) as count, GROUP_CONCAT(id ORDER BY id) as ids
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
GROUP BY file_id, filed_id
|
||
HAVING COUNT(*) > 1
|
||
ORDER BY count DESC
|
||
""", (TENANT_ID,))
|
||
duplicates = cursor.fetchall()
|
||
|
||
print(f"\n发现 {len(duplicates)} 个重复的关联关系:")
|
||
|
||
duplicate_details = []
|
||
for dup in duplicates:
|
||
ids = [int(id_str) for id_str in dup['ids'].split(',')]
|
||
duplicate_details.append({
|
||
'file_id': dup['file_id'],
|
||
'filed_id': dup['filed_id'],
|
||
'count': dup['count'],
|
||
'ids': ids
|
||
})
|
||
print(f"\n 文件ID: {dup['file_id']}, 字段ID: {dup['filed_id']} (共 {dup['count']} 条)")
|
||
print(f" 关联ID列表: {ids}")
|
||
|
||
return {
|
||
'duplicates': duplicate_details
|
||
}
|
||
|
||
|
||
def check_disabled_relations(conn) -> Dict:
|
||
"""检查关联到已删除或未启用的字段/文件"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
print("\n" + "="*80)
|
||
print("3. 检查关联到已删除或未启用的字段/文件")
|
||
print("="*80)
|
||
|
||
# 检查关联到未启用的文件
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id, fc.name as file_name, fc.state as file_state
|
||
FROM f_polic_file_field fff
|
||
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
|
||
WHERE fff.tenant_id = %s AND fc.state = 0
|
||
""", (TENANT_ID,))
|
||
disabled_file_relations = cursor.fetchall()
|
||
|
||
# 检查关联到未启用的字段
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id, f.name as field_name, f.filed_code, f.state as field_state
|
||
FROM f_polic_file_field fff
|
||
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s AND f.state = 0
|
||
""", (TENANT_ID,))
|
||
disabled_field_relations = cursor.fetchall()
|
||
|
||
print(f"\n关联到未启用的文件: {len(disabled_file_relations)} 条")
|
||
if disabled_file_relations:
|
||
print(" 详情:")
|
||
for rel in disabled_file_relations[:10]:
|
||
print(f" - 关联ID: {rel['id']}, 文件: {rel['file_name']} (ID: {rel['file_id']})")
|
||
if len(disabled_file_relations) > 10:
|
||
print(f" ... 还有 {len(disabled_file_relations) - 10} 条")
|
||
|
||
print(f"\n关联到未启用的字段: {len(disabled_field_relations)} 条")
|
||
if disabled_field_relations:
|
||
print(" 详情:")
|
||
for rel in disabled_field_relations[:10]:
|
||
print(f" - 关联ID: {rel['id']}, 字段: {rel['field_name']} ({rel['filed_code']}, ID: {rel['filed_id']})")
|
||
if len(disabled_field_relations) > 10:
|
||
print(f" ... 还有 {len(disabled_field_relations) - 10} 条")
|
||
|
||
return {
|
||
'disabled_file_relations': disabled_file_relations,
|
||
'disabled_field_relations': disabled_field_relations
|
||
}
|
||
|
||
|
||
def check_missing_relations(conn) -> Dict:
|
||
"""检查应该存在但缺失的关联关系(文件节点应该有输出字段关联)"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
print("\n" + "="*80)
|
||
print("4. 检查缺失的关联关系")
|
||
print("="*80)
|
||
|
||
# 获取所有有 template_code 的文件节点(这些应该是文件,不是目录)
|
||
cursor.execute("""
|
||
SELECT fc.id, fc.name, fc.template_code
|
||
FROM f_polic_file_config fc
|
||
WHERE fc.tenant_id = %s AND fc.template_code IS NOT NULL AND fc.state = 1
|
||
""", (TENANT_ID,))
|
||
file_configs = cursor.fetchall()
|
||
|
||
# 获取所有启用的输出字段
|
||
cursor.execute("""
|
||
SELECT id, name, filed_code
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s AND field_type = 2 AND state = 1
|
||
""", (TENANT_ID,))
|
||
output_fields = cursor.fetchall()
|
||
|
||
# 获取现有的关联关系
|
||
cursor.execute("""
|
||
SELECT file_id, filed_id
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
""", (TENANT_ID,))
|
||
existing_relations = {(rel['file_id'], rel['filed_id']) for rel in cursor.fetchall()}
|
||
|
||
print(f"\n文件节点总数: {len(file_configs)}")
|
||
print(f"输出字段总数: {len(output_fields)}")
|
||
print(f"现有关联关系总数: {len(existing_relations)}")
|
||
|
||
# 这里不自动创建缺失的关联,因为不是所有文件都需要所有字段
|
||
# 只显示统计信息
|
||
print("\n注意: 缺失的关联关系需要根据业务逻辑手动创建")
|
||
|
||
return {
|
||
'file_configs': file_configs,
|
||
'output_fields': output_fields,
|
||
'existing_relations': existing_relations
|
||
}
|
||
|
||
|
||
def check_field_type_consistency(conn) -> Dict:
|
||
"""检查关联关系的字段类型一致性(f_polic_file_field 应该只关联输出字段)"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
print("\n" + "="*80)
|
||
print("5. 检查字段类型一致性")
|
||
print("="*80)
|
||
|
||
# 检查是否关联了输入字段(field_type=1)
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id,
|
||
fc.name as file_name, fc.template_code, f.name as field_name, f.filed_code, f.field_type
|
||
FROM f_polic_file_field fff
|
||
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
|
||
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s AND f.field_type = 1
|
||
ORDER BY fc.name, f.name
|
||
""", (TENANT_ID,))
|
||
input_field_relations = cursor.fetchall()
|
||
|
||
print(f"\n关联到输入字段 (field_type=1) 的记录: {len(input_field_relations)} 条")
|
||
if input_field_relations:
|
||
print(" 注意: f_polic_file_field 表通常只应该关联输出字段 (field_type=2)")
|
||
print(" 根据业务逻辑,输入字段不需要通过此表关联")
|
||
print(" 详情:")
|
||
for rel in input_field_relations:
|
||
print(f" - 关联ID: {rel['id']}, 文件: {rel['file_name']} (code: {rel['template_code']}), "
|
||
f"字段: {rel['field_name']} ({rel['filed_code']}, type={rel['field_type']})")
|
||
else:
|
||
print(" ✓ 所有关联都是输出字段")
|
||
|
||
return {
|
||
'input_field_relations': input_field_relations
|
||
}
|
||
|
||
|
||
def fix_invalid_relations(conn, dry_run: bool = True) -> Dict:
|
||
"""修复无效的关联关系"""
|
||
cursor = conn.cursor()
|
||
|
||
print("\n" + "="*80)
|
||
print("修复无效的关联关系")
|
||
print("="*80)
|
||
|
||
if dry_run:
|
||
print("\n[DRY RUN模式 - 不会实际修改数据库]")
|
||
|
||
# 获取无效的关联
|
||
invalid_file_relations = check_invalid_relations(conn)['invalid_file_relations']
|
||
invalid_field_relations = check_invalid_relations(conn)['invalid_field_relations']
|
||
|
||
all_invalid_ids = set()
|
||
for rel in invalid_file_relations:
|
||
all_invalid_ids.add(rel['id'])
|
||
for rel in invalid_field_relations:
|
||
all_invalid_ids.add(rel['id'])
|
||
|
||
if not all_invalid_ids:
|
||
print("\n✓ 没有无效的关联关系需要删除")
|
||
return {'deleted': 0}
|
||
|
||
print(f"\n准备删除 {len(all_invalid_ids)} 条无效的关联关系")
|
||
|
||
if not dry_run:
|
||
placeholders = ','.join(['%s'] * len(all_invalid_ids))
|
||
cursor.execute(f"""
|
||
DELETE FROM f_polic_file_field
|
||
WHERE id IN ({placeholders})
|
||
""", list(all_invalid_ids))
|
||
conn.commit()
|
||
print(f"✓ 已删除 {cursor.rowcount} 条无效的关联关系")
|
||
else:
|
||
print(f"[DRY RUN] 将删除以下关联ID: {sorted(all_invalid_ids)}")
|
||
|
||
return {'deleted': len(all_invalid_ids) if not dry_run else 0}
|
||
|
||
|
||
def fix_input_field_relations(conn, dry_run: bool = True) -> Dict:
|
||
"""删除关联到输入字段的记录(f_polic_file_field 应该只关联输出字段)"""
|
||
cursor = conn.cursor()
|
||
|
||
print("\n" + "="*80)
|
||
print("删除关联到输入字段的记录")
|
||
print("="*80)
|
||
|
||
if dry_run:
|
||
print("\n[DRY RUN模式 - 不会实际修改数据库]")
|
||
|
||
# 获取关联到输入字段的记录
|
||
input_field_relations = check_field_type_consistency(conn)['input_field_relations']
|
||
|
||
if not input_field_relations:
|
||
print("\n✓ 没有关联到输入字段的记录需要删除")
|
||
return {'deleted': 0}
|
||
|
||
ids_to_delete = [rel['id'] for rel in input_field_relations]
|
||
|
||
print(f"\n准备删除 {len(ids_to_delete)} 条关联到输入字段的记录")
|
||
|
||
if not dry_run:
|
||
placeholders = ','.join(['%s'] * len(ids_to_delete))
|
||
cursor.execute(f"""
|
||
DELETE FROM f_polic_file_field
|
||
WHERE id IN ({placeholders})
|
||
""", ids_to_delete)
|
||
conn.commit()
|
||
print(f"✓ 已删除 {cursor.rowcount} 条关联到输入字段的记录")
|
||
else:
|
||
print(f"[DRY RUN] 将删除以下关联ID: {sorted(ids_to_delete)}")
|
||
|
||
return {'deleted': len(ids_to_delete) if not dry_run else 0}
|
||
|
||
|
||
def fix_duplicate_relations(conn, dry_run: bool = True) -> Dict:
|
||
"""修复重复的关联关系(保留第一条,删除其他)"""
|
||
cursor = conn.cursor()
|
||
|
||
print("\n" + "="*80)
|
||
print("修复重复的关联关系")
|
||
print("="*80)
|
||
|
||
if dry_run:
|
||
print("\n[DRY RUN模式 - 不会实际修改数据库]")
|
||
|
||
duplicates = check_duplicate_relations(conn)['duplicates']
|
||
|
||
if not duplicates:
|
||
print("\n✓ 没有重复的关联关系需要修复")
|
||
return {'deleted': 0}
|
||
|
||
ids_to_delete = []
|
||
for dup in duplicates:
|
||
# 保留第一条(ID最小的),删除其他的
|
||
ids_to_delete.extend(dup['ids'][1:])
|
||
|
||
print(f"\n准备删除 {len(ids_to_delete)} 条重复的关联关系")
|
||
|
||
if not dry_run:
|
||
placeholders = ','.join(['%s'] * len(ids_to_delete))
|
||
cursor.execute(f"""
|
||
DELETE FROM f_polic_file_field
|
||
WHERE id IN ({placeholders})
|
||
""", ids_to_delete)
|
||
conn.commit()
|
||
print(f"✓ 已删除 {cursor.rowcount} 条重复的关联关系")
|
||
else:
|
||
print(f"[DRY RUN] 将删除以下关联ID: {sorted(ids_to_delete)}")
|
||
|
||
return {'deleted': len(ids_to_delete) if not dry_run else 0}
|
||
|
||
|
||
def get_statistics(conn) -> Dict:
|
||
"""获取统计信息"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
print("\n" + "="*80)
|
||
print("统计信息")
|
||
print("="*80)
|
||
|
||
# 总关联数
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as total
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
""", (TENANT_ID,))
|
||
total_relations = cursor.fetchone()['total']
|
||
|
||
# 有效的关联数(关联到存在的、启用的文件和字段)
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as total
|
||
FROM f_polic_file_field fff
|
||
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id AND fc.state = 1
|
||
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id AND f.state = 1
|
||
WHERE fff.tenant_id = %s
|
||
""", (TENANT_ID,))
|
||
valid_relations = cursor.fetchone()['total']
|
||
|
||
# 关联的文件数
|
||
cursor.execute("""
|
||
SELECT COUNT(DISTINCT file_id) as total
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
""", (TENANT_ID,))
|
||
related_files = cursor.fetchone()['total']
|
||
|
||
# 关联的字段数
|
||
cursor.execute("""
|
||
SELECT COUNT(DISTINCT filed_id) as total
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
""", (TENANT_ID,))
|
||
related_fields = cursor.fetchone()['total']
|
||
|
||
print(f"\n总关联数: {total_relations}")
|
||
print(f"有效关联数: {valid_relations}")
|
||
print(f"关联的文件数: {related_files}")
|
||
print(f"关联的字段数: {related_fields}")
|
||
|
||
return {
|
||
'total_relations': total_relations,
|
||
'valid_relations': valid_relations,
|
||
'related_files': related_files,
|
||
'related_fields': related_fields
|
||
}
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("="*80)
|
||
print("检查并修复 f_polic_file_field 表的关联关系")
|
||
print("="*80)
|
||
|
||
try:
|
||
conn = pymysql.connect(**DB_CONFIG)
|
||
print("✓ 数据库连接成功\n")
|
||
except Exception as e:
|
||
print(f"✗ 数据库连接失败: {e}")
|
||
return
|
||
|
||
try:
|
||
# 1. 检查无效的关联关系
|
||
invalid_result = check_invalid_relations(conn)
|
||
|
||
# 2. 检查重复的关联关系
|
||
duplicate_result = check_duplicate_relations(conn)
|
||
|
||
# 3. 检查关联到已删除或未启用的字段/文件
|
||
disabled_result = check_disabled_relations(conn)
|
||
|
||
# 4. 检查缺失的关联关系
|
||
missing_result = check_missing_relations(conn)
|
||
|
||
# 5. 检查字段类型一致性
|
||
type_result = check_field_type_consistency(conn)
|
||
|
||
# 6. 获取统计信息
|
||
stats = get_statistics(conn)
|
||
|
||
# 总结
|
||
print("\n" + "="*80)
|
||
print("检查总结")
|
||
print("="*80)
|
||
|
||
has_issues = (
|
||
len(invalid_result['invalid_file_relations']) > 0 or
|
||
len(invalid_result['invalid_field_relations']) > 0 or
|
||
len(duplicate_result['duplicates']) > 0
|
||
)
|
||
|
||
has_issues = (
|
||
len(invalid_result['invalid_file_relations']) > 0 or
|
||
len(invalid_result['invalid_field_relations']) > 0 or
|
||
len(duplicate_result['duplicates']) > 0 or
|
||
len(type_result['input_field_relations']) > 0
|
||
)
|
||
|
||
if has_issues:
|
||
print("\n⚠ 发现以下问题:")
|
||
print(f" - 无效的 file_id 关联: {len(invalid_result['invalid_file_relations'])} 条")
|
||
print(f" - 无效的 filed_id 关联: {len(invalid_result['invalid_field_relations'])} 条")
|
||
print(f" - 重复的关联关系: {len(duplicate_result['duplicates'])} 组")
|
||
print(f" - 关联到未启用的文件: {len(disabled_result['disabled_file_relations'])} 条")
|
||
print(f" - 关联到未启用的字段: {len(disabled_result['disabled_field_relations'])} 条")
|
||
print(f" - 关联到输入字段: {len(type_result['input_field_relations'])} 条")
|
||
|
||
print("\n是否要修复这些问题?")
|
||
print("运行以下命令进行修复:")
|
||
print(" python check_and_fix_file_field_relations.py --fix")
|
||
else:
|
||
print("\n✓ 未发现需要修复的问题")
|
||
|
||
print("\n" + "="*80)
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 检查过程中发生错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
finally:
|
||
conn.close()
|
||
print("\n数据库连接已关闭")
|
||
|
||
|
||
def fix_main():
|
||
"""修复主函数"""
|
||
print("="*80)
|
||
print("修复 f_polic_file_field 表的关联关系")
|
||
print("="*80)
|
||
|
||
try:
|
||
conn = pymysql.connect(**DB_CONFIG)
|
||
print("✓ 数据库连接成功\n")
|
||
except Exception as e:
|
||
print(f"✗ 数据库连接失败: {e}")
|
||
return
|
||
|
||
try:
|
||
# 先进行干运行
|
||
print("\n[第一步] 干运行检查...")
|
||
invalid_result = check_invalid_relations(conn)
|
||
duplicate_result = check_duplicate_relations(conn)
|
||
|
||
# 修复无效的关联关系
|
||
print("\n[第二步] 修复无效的关联关系...")
|
||
fix_invalid_relations(conn, dry_run=False)
|
||
|
||
# 修复重复的关联关系
|
||
print("\n[第三步] 修复重复的关联关系...")
|
||
fix_duplicate_relations(conn, dry_run=False)
|
||
|
||
# 删除关联到输入字段的记录
|
||
print("\n[第四步] 删除关联到输入字段的记录...")
|
||
fix_input_field_relations(conn, dry_run=False)
|
||
|
||
# 重新获取统计信息
|
||
print("\n[第五步] 修复后的统计信息...")
|
||
stats = get_statistics(conn)
|
||
|
||
print("\n" + "="*80)
|
||
print("修复完成")
|
||
print("="*80)
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 修复过程中发生错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
conn.rollback()
|
||
finally:
|
||
conn.close()
|
||
print("\n数据库连接已关闭")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import sys
|
||
|
||
if '--fix' in sys.argv:
|
||
# 确认操作
|
||
print("\n⚠ 警告: 这将修改数据库!")
|
||
response = input("确认要继续吗? (yes/no): ")
|
||
if response.lower() == 'yes':
|
||
fix_main()
|
||
else:
|
||
print("操作已取消")
|
||
else:
|
||
main()
|
||
|