ai-business-write/check_database_id_relations.py
2025-12-30 10:41:35 +08:00

540 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
检查数据库中的ID关系是否正确
功能:
1. 检查f_polic_file_config表中的数据
2. 检查f_polic_field表中的数据
3. 检查f_polic_file_field表中的关联关系
4. 验证ID关系是否正确匹配
5. 找出孤立数据和错误关联
使用方法:
python check_database_id_relations.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1
"""
import os
import sys
import pymysql
import argparse
from typing import Dict, List, Set, Optional
from collections import defaultdict
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
def print_section(title):
"""打印章节标题"""
print("\n" + "="*70)
print(f" {title}")
print("="*70)
def print_result(success, message):
"""打印结果"""
status = "[OK]" if success else "[FAIL]"
print(f"{status} {message}")
def get_db_config_from_args() -> Dict:
"""从命令行参数获取数据库配置"""
parser = argparse.ArgumentParser(
description='检查数据库中的ID关系是否正确',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python check_database_id_relations.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1
"""
)
parser.add_argument('--host', type=str, required=True, help='MySQL服务器地址')
parser.add_argument('--port', type=int, required=True, help='MySQL服务器端口')
parser.add_argument('--user', type=str, required=True, help='MySQL用户名')
parser.add_argument('--password', type=str, required=True, help='MySQL密码')
parser.add_argument('--database', type=str, required=True, help='数据库名称')
parser.add_argument('--tenant-id', type=int, required=True, help='租户ID')
parser.add_argument('--file-id', type=int, help='检查特定的文件ID')
args = parser.parse_args()
return {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database,
'charset': 'utf8mb4',
'tenant_id': args.tenant_id,
'file_id': args.file_id
}
def test_db_connection(config: Dict) -> Optional[pymysql.Connection]:
"""测试数据库连接"""
try:
conn = pymysql.connect(
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
charset=config['charset']
)
return conn
except Exception as e:
print_result(False, f"数据库连接失败: {str(e)}")
return None
def check_file_config(conn, tenant_id: int, file_id: Optional[int] = None):
"""检查f_polic_file_config表"""
print_section("检查 f_polic_file_config 表")
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
if file_id:
# 检查特定文件ID
cursor.execute("""
SELECT id, tenant_id, parent_id, name, file_path, state
FROM f_polic_file_config
WHERE id = %s AND tenant_id = %s
""", (file_id, tenant_id))
result = cursor.fetchone()
if result:
print(f"\n 文件ID {file_id} 的信息:")
print(f" - ID: {result['id']}")
print(f" - 租户ID: {result['tenant_id']}")
print(f" - 父级ID: {result['parent_id']}")
print(f" - 名称: {result['name']}")
print(f" - 文件路径: {result['file_path']}")
# 处理state字段可能是bytes或int
state_raw = result['state']
if isinstance(state_raw, bytes):
state_value = int.from_bytes(state_raw, byteorder='big')
elif state_raw is not None:
state_value = int(state_raw)
else:
state_value = 0
print(f" - 状态: {state_value} ({'启用' if state_value == 1 else '禁用'})")
if state_value != 1:
print_result(False, f"文件ID {file_id} 的状态为禁用state={state_value}")
else:
print_result(True, f"文件ID {file_id} 存在且已启用")
else:
print_result(False, f"文件ID {file_id} 不存在或不属于租户 {tenant_id}")
return
# 统计信息
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN state = 1 THEN 1 ELSE 0 END) as enabled,
SUM(CASE WHEN state = 0 THEN 1 ELSE 0 END) as disabled,
SUM(CASE WHEN file_path IS NOT NULL AND file_path != '' THEN 1 ELSE 0 END) as files,
SUM(CASE WHEN file_path IS NULL OR file_path = '' THEN 1 ELSE 0 END) as directories
FROM f_polic_file_config
WHERE tenant_id = %s
""", (tenant_id,))
stats = cursor.fetchone()
print(f"\n 统计信息:")
print(f" - 总记录数: {stats['total']}")
print(f" - 启用记录: {stats['enabled']}")
print(f" - 禁用记录: {stats['disabled']}")
print(f" - 文件节点: {stats['files']}")
print(f" - 目录节点: {stats['directories']}")
# 检查parent_id引用
cursor.execute("""
SELECT fc1.id, fc1.name, fc1.parent_id
FROM f_polic_file_config fc1
LEFT JOIN f_polic_file_config fc2 ON fc1.parent_id = fc2.id AND fc1.tenant_id = fc2.tenant_id
WHERE fc1.tenant_id = %s
AND fc1.parent_id IS NOT NULL
AND fc2.id IS NULL
""", (tenant_id,))
broken_parents = cursor.fetchall()
if broken_parents:
print(f"\n [警告] 发现 {len(broken_parents)} 个parent_id引用错误:")
for item in broken_parents[:10]:
print(f" - ID: {item['id']}, 名称: {item['name']}, parent_id: {item['parent_id']} (不存在)")
if len(broken_parents) > 10:
print(f" ... 还有 {len(broken_parents) - 10}")
else:
print_result(True, "所有parent_id引用正确")
finally:
cursor.close()
def check_fields(conn, tenant_id: int):
"""检查f_polic_field表"""
print_section("检查 f_polic_field 表")
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 统计信息
cursor.execute("""
SELECT
field_type,
COUNT(*) as total,
SUM(CASE WHEN state = 1 THEN 1 ELSE 0 END) as enabled,
SUM(CASE WHEN state = 0 THEN 1 ELSE 0 END) as disabled
FROM f_polic_field
WHERE tenant_id = %s
GROUP BY field_type
""", (tenant_id,))
stats = cursor.fetchall()
print(f"\n 统计信息:")
for stat in stats:
field_type_name = "输入字段" if stat['field_type'] == 1 else "输出字段" if stat['field_type'] == 2 else "未知"
print(f" - {field_type_name} (field_type={stat['field_type']}):")
print(f" 总记录数: {stat['total']}")
print(f" 启用: {stat['enabled']}")
print(f" 禁用: {stat['disabled']}")
# 检查重复的filed_code
cursor.execute("""
SELECT filed_code, field_type, COUNT(*) as count
FROM f_polic_field
WHERE tenant_id = %s
AND state = 1
GROUP BY filed_code, field_type
HAVING count > 1
""", (tenant_id,))
duplicates = cursor.fetchall()
if duplicates:
print(f"\n [警告] 发现重复的filed_code:")
for dup in duplicates:
print(f" - filed_code: {dup['filed_code']}, field_type: {dup['field_type']}, 重复数: {dup['count']}")
else:
print_result(True, "没有重复的filed_code")
finally:
cursor.close()
def check_file_field_relations(conn, tenant_id: int, file_id: Optional[int] = None):
"""检查f_polic_file_field表"""
print_section("检查 f_polic_file_field 表(关联关系)")
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 统计信息
cursor.execute("""
SELECT COUNT(*) as total
FROM f_polic_file_field
WHERE tenant_id = %s AND state = 1
""", (tenant_id,))
total_relations = cursor.fetchone()['total']
print(f"\n 总关联关系数: {total_relations}")
if file_id:
# 检查特定文件ID的关联关系
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, fff.state,
fc.name as file_name, fc.file_path, fc.state as file_state,
f.name as field_name, f.filed_code, f.field_type, f.state as field_state
FROM f_polic_file_field fff
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND fff.file_id = %s
""", (tenant_id, file_id))
relations = cursor.fetchall()
if relations:
print(f"\n 文件ID {file_id} 的关联关系 ({len(relations)} 条):")
for rel in relations:
print(f"\n 关联ID: {rel['id']}")
print(f" - file_id: {rel['file_id']}")
if rel['file_name']:
print(f" 模板: {rel['file_name']} (路径: {rel['file_path']})")
# 处理state字段可能是bytes或int
state_raw = rel['file_state']
if isinstance(state_raw, bytes):
file_state = int.from_bytes(state_raw, byteorder='big')
elif state_raw is not None:
file_state = int(state_raw)
else:
file_state = 0
print(f" 状态: {file_state} ({'启用' if file_state == 1 else '禁用'})")
else:
print(f" [错误] 模板不存在!")
print(f" - filed_id: {rel['filed_id']}")
if rel['field_name']:
field_type_name = "输入字段" if rel['field_type'] == 1 else "输出字段" if rel['field_type'] == 2 else "未知"
# 处理state字段可能是bytes或int
state_raw = rel['field_state']
if isinstance(state_raw, bytes):
field_state = int.from_bytes(state_raw, byteorder='big')
elif state_raw is not None:
field_state = int(state_raw)
else:
field_state = 0
print(f" 字段: {rel['field_name']} ({rel['filed_code']}, {field_type_name})")
print(f" 状态: {field_state} ({'启用' if field_state == 1 else '禁用'})")
else:
print(f" [错误] 字段不存在!")
else:
print(f"\n 文件ID {file_id} 没有关联关系")
# 检查孤立的关联关系file_id不存在
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
WHERE fff.tenant_id = %s
AND fff.state = 1
AND fc.id IS NULL
""", (tenant_id,))
orphaned_file_relations = cursor.fetchall()
if orphaned_file_relations:
print(f"\n [错误] 发现 {len(orphaned_file_relations)} 个孤立的关联关系file_id不存在:")
for rel in orphaned_file_relations[:10]:
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
if len(orphaned_file_relations) > 10:
print(f" ... 还有 {len(orphaned_file_relations) - 10}")
else:
print_result(True, "所有file_id引用正确")
# 检查孤立的关联关系filed_id不存在
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s
AND fff.state = 1
AND f.id IS NULL
""", (tenant_id,))
orphaned_field_relations = cursor.fetchall()
if orphaned_field_relations:
print(f"\n [错误] 发现 {len(orphaned_field_relations)} 个孤立的关联关系filed_id不存在:")
for rel in orphaned_field_relations[:10]:
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
if len(orphaned_field_relations) > 10:
print(f" ... 还有 {len(orphaned_field_relations) - 10}")
else:
print_result(True, "所有filed_id引用正确")
# 检查关联到禁用模板或字段的关联关系
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id,
fc.state as file_state, f.state as field_state
FROM f_polic_file_field fff
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s
AND fff.state = 1
AND (fc.state != 1 OR f.state != 1)
""", (tenant_id,))
disabled_relations = cursor.fetchall()
if disabled_relations:
print(f"\n [警告] 发现 {len(disabled_relations)} 个关联到禁用模板或字段的关联关系:")
for rel in disabled_relations[:10]:
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
print(f" 模板状态: {rel['file_state']}, 字段状态: {rel['field_state']}")
if len(disabled_relations) > 10:
print(f" ... 还有 {len(disabled_relations) - 10}")
else:
print_result(True, "所有关联关系都关联到启用的模板和字段")
finally:
cursor.close()
def check_specific_file(conn, tenant_id: int, file_id: int):
"""检查特定文件ID的完整信息"""
print_section(f"详细检查文件ID {file_id}")
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 1. 检查文件配置
cursor.execute("""
SELECT id, tenant_id, parent_id, name, file_path, state, created_time, updated_time
FROM f_polic_file_config
WHERE id = %s AND tenant_id = %s
""", (file_id, tenant_id))
file_config = cursor.fetchone()
if not file_config:
print_result(False, f"文件ID {file_id} 不存在或不属于租户 {tenant_id}")
return
print(f"\n 文件配置信息:")
print(f" - ID: {file_config['id']}")
print(f" - 租户ID: {file_config['tenant_id']}")
print(f" - 父级ID: {file_config['parent_id']}")
print(f" - 名称: {file_config['name']}")
print(f" - 文件路径: {file_config['file_path']}")
# 处理state字段可能是bytes或int
state_raw = file_config['state']
if isinstance(state_raw, bytes):
file_state = int.from_bytes(state_raw, byteorder='big')
elif state_raw is not None:
file_state = int(state_raw)
else:
file_state = 0
print(f" - 状态: {file_state} ({'启用' if file_state == 1 else '禁用'})")
print(f" - 创建时间: {file_config['created_time']}")
print(f" - 更新时间: {file_config['updated_time']}")
# 2. 检查父级
if file_config['parent_id']:
cursor.execute("""
SELECT id, name, file_path, state
FROM f_polic_file_config
WHERE id = %s AND tenant_id = %s
""", (file_config['parent_id'], tenant_id))
parent = cursor.fetchone()
if parent:
# 处理state字段可能是bytes或int
state_raw = parent['state']
if isinstance(state_raw, bytes):
parent_state = int.from_bytes(state_raw, byteorder='big')
elif state_raw is not None:
parent_state = int(state_raw)
else:
parent_state = 0
print(f"\n 父级信息:")
print(f" - ID: {parent['id']}")
print(f" - 名称: {parent['name']}")
print(f" - 状态: {parent_state} ({'启用' if parent_state == 1 else '禁用'})")
else:
print(f"\n [错误] 父级ID {file_config['parent_id']} 不存在!")
# 3. 检查关联的字段
cursor.execute("""
SELECT fff.id as relation_id, fff.filed_id,
f.name as field_name, f.filed_code, f.field_type, f.state as field_state
FROM f_polic_file_field fff
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND fff.file_id = %s AND fff.state = 1
ORDER BY f.field_type, f.filed_code
""", (tenant_id, file_id))
relations = cursor.fetchall()
print(f"\n 关联的字段 ({len(relations)} 个):")
input_fields = []
output_fields = []
for rel in relations:
field_type_name = "输入字段" if rel['field_type'] == 1 else "输出字段" if rel['field_type'] == 2 else "未知"
# 处理state字段可能是bytes或int
state_raw = rel['field_state']
if isinstance(state_raw, bytes):
field_state = int.from_bytes(state_raw, byteorder='big')
elif state_raw is not None:
field_state = int(state_raw)
else:
field_state = 0
field_info = f" - {rel['field_name']} ({rel['filed_code']}, {field_type_name})"
if field_state != 1:
field_info += f" [状态: 禁用]"
if not rel['field_name']:
field_info += f" [错误: 字段不存在!]"
if rel['field_type'] == 1:
input_fields.append(field_info)
else:
output_fields.append(field_info)
if input_fields:
print(f"\n 输入字段 ({len(input_fields)} 个):")
for info in input_fields:
print(info)
if output_fields:
print(f"\n 输出字段 ({len(output_fields)} 个):")
for info in output_fields:
print(info)
# 4. 检查是否有孤立的关联关系
cursor.execute("""
SELECT fff.id, fff.filed_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND fff.file_id = %s AND fff.state = 1 AND f.id IS NULL
""", (tenant_id, file_id))
orphaned = cursor.fetchall()
if orphaned:
print(f"\n [错误] 发现 {len(orphaned)} 个孤立的关联关系(字段不存在):")
for rel in orphaned:
print(f" - 关联ID: {rel['id']}, filed_id: {rel['filed_id']}")
finally:
cursor.close()
def main():
"""主函数"""
print_section("数据库ID关系检查工具")
# 获取配置
config = get_db_config_from_args()
# 显示配置信息
print_section("配置信息")
print(f" 数据库服务器: {config['host']}:{config['port']}")
print(f" 数据库名称: {config['database']}")
print(f" 用户名: {config['user']}")
print(f" 租户ID: {config['tenant_id']}")
if config.get('file_id'):
print(f" 检查文件ID: {config['file_id']}")
# 连接数据库
print_section("连接数据库")
conn = test_db_connection(config)
if not conn:
return
print_result(True, "数据库连接成功")
try:
tenant_id = config['tenant_id']
file_id = config.get('file_id')
# 检查各个表
check_file_config(conn, tenant_id, file_id)
check_fields(conn, tenant_id)
check_file_field_relations(conn, tenant_id, file_id)
# 如果指定了文件ID进行详细检查
if file_id:
check_specific_file(conn, tenant_id, file_id)
# 总结
print_section("检查完成")
print("请查看上述检查结果,找出问题所在")
finally:
conn.close()
print_result(True, "数据库连接已关闭")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n[中断] 用户取消操作")
sys.exit(0)
except Exception as e:
print(f"\n[错误] 发生异常: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)