540 lines
22 KiB
Python
540 lines
22 KiB
Python
"""
|
||
检查数据库中的ID关系是否正确
|
||
|
||
功能:
|
||
1. 检查f_polic_file_config表中的数据
|
||
2. 检查f_polic_field表中的数据
|
||
3. 检查f_polic_file_field表中的关联关系
|
||
4. 验证ID关系是否正确匹配
|
||
5. 找出孤立数据和错误关联
|
||
|
||
使用方法:
|
||
python check_database_id_relations.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1
|
||
"""
|
||
import os
|
||
import sys
|
||
import pymysql
|
||
import argparse
|
||
from typing import Dict, List, Set, Optional
|
||
from collections import defaultdict
|
||
|
||
# 设置输出编码为UTF-8(Windows兼容)
|
||
if sys.platform == 'win32':
|
||
import io
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
||
|
||
|
||
def print_section(title):
|
||
"""打印章节标题"""
|
||
print("\n" + "="*70)
|
||
print(f" {title}")
|
||
print("="*70)
|
||
|
||
|
||
def print_result(success, message):
|
||
"""打印结果"""
|
||
status = "[OK]" if success else "[FAIL]"
|
||
print(f"{status} {message}")
|
||
|
||
|
||
def get_db_config_from_args() -> Dict:
|
||
"""从命令行参数获取数据库配置"""
|
||
parser = argparse.ArgumentParser(
|
||
description='检查数据库中的ID关系是否正确',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
示例:
|
||
python check_database_id_relations.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1
|
||
"""
|
||
)
|
||
|
||
parser.add_argument('--host', type=str, required=True, help='MySQL服务器地址')
|
||
parser.add_argument('--port', type=int, required=True, help='MySQL服务器端口')
|
||
parser.add_argument('--user', type=str, required=True, help='MySQL用户名')
|
||
parser.add_argument('--password', type=str, required=True, help='MySQL密码')
|
||
parser.add_argument('--database', type=str, required=True, help='数据库名称')
|
||
parser.add_argument('--tenant-id', type=int, required=True, help='租户ID')
|
||
parser.add_argument('--file-id', type=int, help='检查特定的文件ID')
|
||
|
||
args = parser.parse_args()
|
||
|
||
return {
|
||
'host': args.host,
|
||
'port': args.port,
|
||
'user': args.user,
|
||
'password': args.password,
|
||
'database': args.database,
|
||
'charset': 'utf8mb4',
|
||
'tenant_id': args.tenant_id,
|
||
'file_id': args.file_id
|
||
}
|
||
|
||
|
||
def test_db_connection(config: Dict) -> Optional[pymysql.Connection]:
|
||
"""测试数据库连接"""
|
||
try:
|
||
conn = pymysql.connect(
|
||
host=config['host'],
|
||
port=config['port'],
|
||
user=config['user'],
|
||
password=config['password'],
|
||
database=config['database'],
|
||
charset=config['charset']
|
||
)
|
||
return conn
|
||
except Exception as e:
|
||
print_result(False, f"数据库连接失败: {str(e)}")
|
||
return None
|
||
|
||
|
||
def check_file_config(conn, tenant_id: int, file_id: Optional[int] = None):
|
||
"""检查f_polic_file_config表"""
|
||
print_section("检查 f_polic_file_config 表")
|
||
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
if file_id:
|
||
# 检查特定文件ID
|
||
cursor.execute("""
|
||
SELECT id, tenant_id, parent_id, name, file_path, state
|
||
FROM f_polic_file_config
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (file_id, tenant_id))
|
||
result = cursor.fetchone()
|
||
|
||
if result:
|
||
print(f"\n 文件ID {file_id} 的信息:")
|
||
print(f" - ID: {result['id']}")
|
||
print(f" - 租户ID: {result['tenant_id']}")
|
||
print(f" - 父级ID: {result['parent_id']}")
|
||
print(f" - 名称: {result['name']}")
|
||
print(f" - 文件路径: {result['file_path']}")
|
||
# 处理state字段(可能是bytes或int)
|
||
state_raw = result['state']
|
||
if isinstance(state_raw, bytes):
|
||
state_value = int.from_bytes(state_raw, byteorder='big')
|
||
elif state_raw is not None:
|
||
state_value = int(state_raw)
|
||
else:
|
||
state_value = 0
|
||
print(f" - 状态: {state_value} ({'启用' if state_value == 1 else '禁用'})")
|
||
|
||
if state_value != 1:
|
||
print_result(False, f"文件ID {file_id} 的状态为禁用(state={state_value})")
|
||
else:
|
||
print_result(True, f"文件ID {file_id} 存在且已启用")
|
||
else:
|
||
print_result(False, f"文件ID {file_id} 不存在或不属于租户 {tenant_id}")
|
||
return
|
||
|
||
# 统计信息
|
||
cursor.execute("""
|
||
SELECT
|
||
COUNT(*) as total,
|
||
SUM(CASE WHEN state = 1 THEN 1 ELSE 0 END) as enabled,
|
||
SUM(CASE WHEN state = 0 THEN 1 ELSE 0 END) as disabled,
|
||
SUM(CASE WHEN file_path IS NOT NULL AND file_path != '' THEN 1 ELSE 0 END) as files,
|
||
SUM(CASE WHEN file_path IS NULL OR file_path = '' THEN 1 ELSE 0 END) as directories
|
||
FROM f_polic_file_config
|
||
WHERE tenant_id = %s
|
||
""", (tenant_id,))
|
||
stats = cursor.fetchone()
|
||
|
||
print(f"\n 统计信息:")
|
||
print(f" - 总记录数: {stats['total']}")
|
||
print(f" - 启用记录: {stats['enabled']}")
|
||
print(f" - 禁用记录: {stats['disabled']}")
|
||
print(f" - 文件节点: {stats['files']}")
|
||
print(f" - 目录节点: {stats['directories']}")
|
||
|
||
# 检查parent_id引用
|
||
cursor.execute("""
|
||
SELECT fc1.id, fc1.name, fc1.parent_id
|
||
FROM f_polic_file_config fc1
|
||
LEFT JOIN f_polic_file_config fc2 ON fc1.parent_id = fc2.id AND fc1.tenant_id = fc2.tenant_id
|
||
WHERE fc1.tenant_id = %s
|
||
AND fc1.parent_id IS NOT NULL
|
||
AND fc2.id IS NULL
|
||
""", (tenant_id,))
|
||
broken_parents = cursor.fetchall()
|
||
|
||
if broken_parents:
|
||
print(f"\n [警告] 发现 {len(broken_parents)} 个parent_id引用错误:")
|
||
for item in broken_parents[:10]:
|
||
print(f" - ID: {item['id']}, 名称: {item['name']}, parent_id: {item['parent_id']} (不存在)")
|
||
if len(broken_parents) > 10:
|
||
print(f" ... 还有 {len(broken_parents) - 10} 个")
|
||
else:
|
||
print_result(True, "所有parent_id引用正确")
|
||
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def check_fields(conn, tenant_id: int):
|
||
"""检查f_polic_field表"""
|
||
print_section("检查 f_polic_field 表")
|
||
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
# 统计信息
|
||
cursor.execute("""
|
||
SELECT
|
||
field_type,
|
||
COUNT(*) as total,
|
||
SUM(CASE WHEN state = 1 THEN 1 ELSE 0 END) as enabled,
|
||
SUM(CASE WHEN state = 0 THEN 1 ELSE 0 END) as disabled
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s
|
||
GROUP BY field_type
|
||
""", (tenant_id,))
|
||
stats = cursor.fetchall()
|
||
|
||
print(f"\n 统计信息:")
|
||
for stat in stats:
|
||
field_type_name = "输入字段" if stat['field_type'] == 1 else "输出字段" if stat['field_type'] == 2 else "未知"
|
||
print(f" - {field_type_name} (field_type={stat['field_type']}):")
|
||
print(f" 总记录数: {stat['total']}")
|
||
print(f" 启用: {stat['enabled']}")
|
||
print(f" 禁用: {stat['disabled']}")
|
||
|
||
# 检查重复的filed_code
|
||
cursor.execute("""
|
||
SELECT filed_code, field_type, COUNT(*) as count
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s
|
||
AND state = 1
|
||
GROUP BY filed_code, field_type
|
||
HAVING count > 1
|
||
""", (tenant_id,))
|
||
duplicates = cursor.fetchall()
|
||
|
||
if duplicates:
|
||
print(f"\n [警告] 发现重复的filed_code:")
|
||
for dup in duplicates:
|
||
print(f" - filed_code: {dup['filed_code']}, field_type: {dup['field_type']}, 重复数: {dup['count']}")
|
||
else:
|
||
print_result(True, "没有重复的filed_code")
|
||
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def check_file_field_relations(conn, tenant_id: int, file_id: Optional[int] = None):
|
||
"""检查f_polic_file_field表"""
|
||
print_section("检查 f_polic_file_field 表(关联关系)")
|
||
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
# 统计信息
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as total
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s AND state = 1
|
||
""", (tenant_id,))
|
||
total_relations = cursor.fetchone()['total']
|
||
|
||
print(f"\n 总关联关系数: {total_relations}")
|
||
|
||
if file_id:
|
||
# 检查特定文件ID的关联关系
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id, fff.state,
|
||
fc.name as file_name, fc.file_path, fc.state as file_state,
|
||
f.name as field_name, f.filed_code, f.field_type, f.state as field_state
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
|
||
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s AND fff.file_id = %s
|
||
""", (tenant_id, file_id))
|
||
relations = cursor.fetchall()
|
||
|
||
if relations:
|
||
print(f"\n 文件ID {file_id} 的关联关系 ({len(relations)} 条):")
|
||
for rel in relations:
|
||
print(f"\n 关联ID: {rel['id']}")
|
||
print(f" - file_id: {rel['file_id']}")
|
||
if rel['file_name']:
|
||
print(f" 模板: {rel['file_name']} (路径: {rel['file_path']})")
|
||
# 处理state字段(可能是bytes或int)
|
||
state_raw = rel['file_state']
|
||
if isinstance(state_raw, bytes):
|
||
file_state = int.from_bytes(state_raw, byteorder='big')
|
||
elif state_raw is not None:
|
||
file_state = int(state_raw)
|
||
else:
|
||
file_state = 0
|
||
print(f" 状态: {file_state} ({'启用' if file_state == 1 else '禁用'})")
|
||
else:
|
||
print(f" [错误] 模板不存在!")
|
||
print(f" - filed_id: {rel['filed_id']}")
|
||
if rel['field_name']:
|
||
field_type_name = "输入字段" if rel['field_type'] == 1 else "输出字段" if rel['field_type'] == 2 else "未知"
|
||
# 处理state字段(可能是bytes或int)
|
||
state_raw = rel['field_state']
|
||
if isinstance(state_raw, bytes):
|
||
field_state = int.from_bytes(state_raw, byteorder='big')
|
||
elif state_raw is not None:
|
||
field_state = int(state_raw)
|
||
else:
|
||
field_state = 0
|
||
print(f" 字段: {rel['field_name']} ({rel['filed_code']}, {field_type_name})")
|
||
print(f" 状态: {field_state} ({'启用' if field_state == 1 else '禁用'})")
|
||
else:
|
||
print(f" [错误] 字段不存在!")
|
||
else:
|
||
print(f"\n 文件ID {file_id} 没有关联关系")
|
||
|
||
# 检查孤立的关联关系(file_id不存在)
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
|
||
WHERE fff.tenant_id = %s
|
||
AND fff.state = 1
|
||
AND fc.id IS NULL
|
||
""", (tenant_id,))
|
||
orphaned_file_relations = cursor.fetchall()
|
||
|
||
if orphaned_file_relations:
|
||
print(f"\n [错误] 发现 {len(orphaned_file_relations)} 个孤立的关联关系(file_id不存在):")
|
||
for rel in orphaned_file_relations[:10]:
|
||
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
|
||
if len(orphaned_file_relations) > 10:
|
||
print(f" ... 还有 {len(orphaned_file_relations) - 10} 个")
|
||
else:
|
||
print_result(True, "所有file_id引用正确")
|
||
|
||
# 检查孤立的关联关系(filed_id不存在)
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s
|
||
AND fff.state = 1
|
||
AND f.id IS NULL
|
||
""", (tenant_id,))
|
||
orphaned_field_relations = cursor.fetchall()
|
||
|
||
if orphaned_field_relations:
|
||
print(f"\n [错误] 发现 {len(orphaned_field_relations)} 个孤立的关联关系(filed_id不存在):")
|
||
for rel in orphaned_field_relations[:10]:
|
||
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
|
||
if len(orphaned_field_relations) > 10:
|
||
print(f" ... 还有 {len(orphaned_field_relations) - 10} 个")
|
||
else:
|
||
print_result(True, "所有filed_id引用正确")
|
||
|
||
# 检查关联到禁用模板或字段的关联关系
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.file_id, fff.filed_id,
|
||
fc.state as file_state, f.state as field_state
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
|
||
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s
|
||
AND fff.state = 1
|
||
AND (fc.state != 1 OR f.state != 1)
|
||
""", (tenant_id,))
|
||
disabled_relations = cursor.fetchall()
|
||
|
||
if disabled_relations:
|
||
print(f"\n [警告] 发现 {len(disabled_relations)} 个关联到禁用模板或字段的关联关系:")
|
||
for rel in disabled_relations[:10]:
|
||
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
|
||
print(f" 模板状态: {rel['file_state']}, 字段状态: {rel['field_state']}")
|
||
if len(disabled_relations) > 10:
|
||
print(f" ... 还有 {len(disabled_relations) - 10} 个")
|
||
else:
|
||
print_result(True, "所有关联关系都关联到启用的模板和字段")
|
||
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def check_specific_file(conn, tenant_id: int, file_id: int):
|
||
"""检查特定文件ID的完整信息"""
|
||
print_section(f"详细检查文件ID {file_id}")
|
||
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
# 1. 检查文件配置
|
||
cursor.execute("""
|
||
SELECT id, tenant_id, parent_id, name, file_path, state, created_time, updated_time
|
||
FROM f_polic_file_config
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (file_id, tenant_id))
|
||
file_config = cursor.fetchone()
|
||
|
||
if not file_config:
|
||
print_result(False, f"文件ID {file_id} 不存在或不属于租户 {tenant_id}")
|
||
return
|
||
|
||
print(f"\n 文件配置信息:")
|
||
print(f" - ID: {file_config['id']}")
|
||
print(f" - 租户ID: {file_config['tenant_id']}")
|
||
print(f" - 父级ID: {file_config['parent_id']}")
|
||
print(f" - 名称: {file_config['name']}")
|
||
print(f" - 文件路径: {file_config['file_path']}")
|
||
# 处理state字段(可能是bytes或int)
|
||
state_raw = file_config['state']
|
||
if isinstance(state_raw, bytes):
|
||
file_state = int.from_bytes(state_raw, byteorder='big')
|
||
elif state_raw is not None:
|
||
file_state = int(state_raw)
|
||
else:
|
||
file_state = 0
|
||
print(f" - 状态: {file_state} ({'启用' if file_state == 1 else '禁用'})")
|
||
print(f" - 创建时间: {file_config['created_time']}")
|
||
print(f" - 更新时间: {file_config['updated_time']}")
|
||
|
||
# 2. 检查父级
|
||
if file_config['parent_id']:
|
||
cursor.execute("""
|
||
SELECT id, name, file_path, state
|
||
FROM f_polic_file_config
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (file_config['parent_id'], tenant_id))
|
||
parent = cursor.fetchone()
|
||
if parent:
|
||
# 处理state字段(可能是bytes或int)
|
||
state_raw = parent['state']
|
||
if isinstance(state_raw, bytes):
|
||
parent_state = int.from_bytes(state_raw, byteorder='big')
|
||
elif state_raw is not None:
|
||
parent_state = int(state_raw)
|
||
else:
|
||
parent_state = 0
|
||
print(f"\n 父级信息:")
|
||
print(f" - ID: {parent['id']}")
|
||
print(f" - 名称: {parent['name']}")
|
||
print(f" - 状态: {parent_state} ({'启用' if parent_state == 1 else '禁用'})")
|
||
else:
|
||
print(f"\n [错误] 父级ID {file_config['parent_id']} 不存在!")
|
||
|
||
# 3. 检查关联的字段
|
||
cursor.execute("""
|
||
SELECT fff.id as relation_id, fff.filed_id,
|
||
f.name as field_name, f.filed_code, f.field_type, f.state as field_state
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s AND fff.file_id = %s AND fff.state = 1
|
||
ORDER BY f.field_type, f.filed_code
|
||
""", (tenant_id, file_id))
|
||
relations = cursor.fetchall()
|
||
|
||
print(f"\n 关联的字段 ({len(relations)} 个):")
|
||
input_fields = []
|
||
output_fields = []
|
||
for rel in relations:
|
||
field_type_name = "输入字段" if rel['field_type'] == 1 else "输出字段" if rel['field_type'] == 2 else "未知"
|
||
# 处理state字段(可能是bytes或int)
|
||
state_raw = rel['field_state']
|
||
if isinstance(state_raw, bytes):
|
||
field_state = int.from_bytes(state_raw, byteorder='big')
|
||
elif state_raw is not None:
|
||
field_state = int(state_raw)
|
||
else:
|
||
field_state = 0
|
||
field_info = f" - {rel['field_name']} ({rel['filed_code']}, {field_type_name})"
|
||
if field_state != 1:
|
||
field_info += f" [状态: 禁用]"
|
||
if not rel['field_name']:
|
||
field_info += f" [错误: 字段不存在!]"
|
||
|
||
if rel['field_type'] == 1:
|
||
input_fields.append(field_info)
|
||
else:
|
||
output_fields.append(field_info)
|
||
|
||
if input_fields:
|
||
print(f"\n 输入字段 ({len(input_fields)} 个):")
|
||
for info in input_fields:
|
||
print(info)
|
||
|
||
if output_fields:
|
||
print(f"\n 输出字段 ({len(output_fields)} 个):")
|
||
for info in output_fields:
|
||
print(info)
|
||
|
||
# 4. 检查是否有孤立的关联关系
|
||
cursor.execute("""
|
||
SELECT fff.id, fff.filed_id
|
||
FROM f_polic_file_field fff
|
||
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
|
||
WHERE fff.tenant_id = %s AND fff.file_id = %s AND fff.state = 1 AND f.id IS NULL
|
||
""", (tenant_id, file_id))
|
||
orphaned = cursor.fetchall()
|
||
|
||
if orphaned:
|
||
print(f"\n [错误] 发现 {len(orphaned)} 个孤立的关联关系(字段不存在):")
|
||
for rel in orphaned:
|
||
print(f" - 关联ID: {rel['id']}, filed_id: {rel['filed_id']}")
|
||
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print_section("数据库ID关系检查工具")
|
||
|
||
# 获取配置
|
||
config = get_db_config_from_args()
|
||
|
||
# 显示配置信息
|
||
print_section("配置信息")
|
||
print(f" 数据库服务器: {config['host']}:{config['port']}")
|
||
print(f" 数据库名称: {config['database']}")
|
||
print(f" 用户名: {config['user']}")
|
||
print(f" 租户ID: {config['tenant_id']}")
|
||
if config.get('file_id'):
|
||
print(f" 检查文件ID: {config['file_id']}")
|
||
|
||
# 连接数据库
|
||
print_section("连接数据库")
|
||
conn = test_db_connection(config)
|
||
if not conn:
|
||
return
|
||
|
||
print_result(True, "数据库连接成功")
|
||
|
||
try:
|
||
tenant_id = config['tenant_id']
|
||
file_id = config.get('file_id')
|
||
|
||
# 检查各个表
|
||
check_file_config(conn, tenant_id, file_id)
|
||
check_fields(conn, tenant_id)
|
||
check_file_field_relations(conn, tenant_id, file_id)
|
||
|
||
# 如果指定了文件ID,进行详细检查
|
||
if file_id:
|
||
check_specific_file(conn, tenant_id, file_id)
|
||
|
||
# 总结
|
||
print_section("检查完成")
|
||
print("请查看上述检查结果,找出问题所在")
|
||
|
||
finally:
|
||
conn.close()
|
||
print_result(True, "数据库连接已关闭")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print("\n\n[中断] 用户取消操作")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
print(f"\n[错误] 发生异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|