780 lines
28 KiB
Python
780 lines
28 KiB
Python
"""
|
||
跨数据库同步模板、字段和关联关系
|
||
|
||
功能:
|
||
1. 从.env文件读取源数据库配置
|
||
2. 同步到目标数据库(10.100.31.21)
|
||
3. 处理ID映射关系(两个数据库的ID不同)
|
||
4. 根据业务逻辑(name, filed_code, file_path)匹配数据
|
||
|
||
使用方法:
|
||
python sync_templates_between_databases.py --target-host 10.100.31.21 --target-port 3306 --target-user finyx --target-password FknJYz3FA5WDYtsd --target-database finyx --target-tenant-id 1
|
||
"""
|
||
import os
|
||
import sys
|
||
import pymysql
|
||
import argparse
|
||
from pathlib import Path
|
||
from typing import Dict, List, Set, Optional, Tuple
|
||
from dotenv import load_dotenv
|
||
|
||
# 设置输出编码为UTF-8(Windows兼容)
|
||
if sys.platform == 'win32':
|
||
import io
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
||
|
||
# 加载环境变量
|
||
load_dotenv()
|
||
|
||
# 项目根目录
|
||
PROJECT_ROOT = Path(__file__).parent
|
||
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
|
||
|
||
CREATED_BY = 655162080928945152
|
||
UPDATED_BY = 655162080928945152
|
||
|
||
|
||
def print_section(title):
|
||
"""打印章节标题"""
|
||
print("\n" + "="*70)
|
||
print(f" {title}")
|
||
print("="*70)
|
||
|
||
|
||
def print_result(success, message):
|
||
"""打印结果"""
|
||
status = "[OK]" if success else "[FAIL]"
|
||
print(f"{status} {message}")
|
||
|
||
|
||
def generate_id():
|
||
"""生成ID"""
|
||
import time
|
||
return int(time.time() * 1000000)
|
||
|
||
|
||
def get_source_db_config() -> Dict:
|
||
"""从.env文件读取源数据库配置"""
|
||
db_host = os.getenv('DB_HOST')
|
||
db_port = os.getenv('DB_PORT')
|
||
db_user = os.getenv('DB_USER')
|
||
db_password = os.getenv('DB_PASSWORD')
|
||
db_name = os.getenv('DB_NAME')
|
||
|
||
if not all([db_host, db_port, db_user, db_password, db_name]):
|
||
raise ValueError(
|
||
"源数据库配置不完整,请在.env文件中配置以下环境变量:\n"
|
||
"DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME"
|
||
)
|
||
|
||
return {
|
||
'host': db_host,
|
||
'port': int(db_port),
|
||
'user': db_user,
|
||
'password': db_password,
|
||
'database': db_name,
|
||
'charset': 'utf8mb4'
|
||
}
|
||
|
||
|
||
def get_target_db_config_from_args() -> Dict:
|
||
"""从命令行参数获取目标数据库配置"""
|
||
parser = argparse.ArgumentParser(
|
||
description='跨数据库同步模板、字段和关联关系',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
示例:
|
||
python sync_templates_between_databases.py --target-host 10.100.31.21 --target-port 3306 --target-user finyx --target-password FknJYz3FA5WDYtsd --target-database finyx --target-tenant-id 1
|
||
"""
|
||
)
|
||
|
||
parser.add_argument('--target-host', type=str, required=True, help='目标MySQL服务器地址')
|
||
parser.add_argument('--target-port', type=int, required=True, help='目标MySQL服务器端口')
|
||
parser.add_argument('--target-user', type=str, required=True, help='目标MySQL用户名')
|
||
parser.add_argument('--target-password', type=str, required=True, help='目标MySQL密码')
|
||
parser.add_argument('--target-database', type=str, required=True, help='目标数据库名称')
|
||
parser.add_argument('--target-tenant-id', type=int, required=True, help='目标租户ID')
|
||
parser.add_argument('--source-tenant-id', type=int, help='源租户ID(如果不指定,将使用数据库中的第一个tenant_id)')
|
||
parser.add_argument('--dry-run', action='store_true', help='预览模式(不实际更新数据库)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
return {
|
||
'host': args.target_host,
|
||
'port': args.target_port,
|
||
'user': args.target_user,
|
||
'password': args.target_password,
|
||
'database': args.target_database,
|
||
'charset': 'utf8mb4',
|
||
'tenant_id': args.target_tenant_id,
|
||
'source_tenant_id': args.source_tenant_id,
|
||
'dry_run': args.dry_run
|
||
}
|
||
|
||
|
||
def test_db_connection(config: Dict, label: str) -> Optional[pymysql.Connection]:
|
||
"""测试数据库连接"""
|
||
try:
|
||
conn = pymysql.connect(
|
||
host=config['host'],
|
||
port=config['port'],
|
||
user=config['user'],
|
||
password=config['password'],
|
||
database=config['database'],
|
||
charset=config['charset']
|
||
)
|
||
return conn
|
||
except Exception as e:
|
||
print_result(False, f"{label}数据库连接失败: {str(e)}")
|
||
return None
|
||
|
||
|
||
def get_source_tenant_id(conn) -> int:
|
||
"""获取源数据库中的tenant_id"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
cursor.execute("SELECT DISTINCT tenant_id FROM f_polic_file_config LIMIT 1")
|
||
result = cursor.fetchone()
|
||
if result:
|
||
return result['tenant_id']
|
||
return 1
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def read_source_fields(conn, tenant_id: int) -> Tuple[Dict[str, Dict], Dict[str, Dict]]:
|
||
"""
|
||
从源数据库读取字段数据
|
||
|
||
Returns:
|
||
(input_fields_dict, output_fields_dict)
|
||
key: filed_code, value: 字段信息
|
||
"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
sql = """
|
||
SELECT id, tenant_id, name, filed_code, field_type, state
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s
|
||
AND state = 1
|
||
ORDER BY field_type, filed_code
|
||
"""
|
||
cursor.execute(sql, (tenant_id,))
|
||
fields = cursor.fetchall()
|
||
|
||
input_fields = {}
|
||
output_fields = {}
|
||
|
||
for field in fields:
|
||
field_info = {
|
||
'id': field['id'],
|
||
'tenant_id': field['tenant_id'],
|
||
'name': field['name'],
|
||
'filed_code': field['filed_code'],
|
||
'field_type': field['field_type'],
|
||
'state': field['state']
|
||
}
|
||
|
||
if field['field_type'] == 1:
|
||
input_fields[field['filed_code']] = field_info
|
||
elif field['field_type'] == 2:
|
||
output_fields[field['filed_code']] = field_info
|
||
|
||
return input_fields, output_fields
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def read_source_templates(conn, tenant_id: int) -> Dict[str, Dict]:
|
||
"""
|
||
从源数据库读取模板数据
|
||
|
||
Returns:
|
||
key: file_path (如果为空则使用name), value: 模板信息
|
||
"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
sql = """
|
||
SELECT id, tenant_id, parent_id, name, file_path, state
|
||
FROM f_polic_file_config
|
||
WHERE tenant_id = %s
|
||
AND state = 1
|
||
ORDER BY file_path, name
|
||
"""
|
||
cursor.execute(sql, (tenant_id,))
|
||
templates = cursor.fetchall()
|
||
|
||
result = {}
|
||
for template in templates:
|
||
# 使用file_path作为key,如果没有file_path则使用name
|
||
key = template['file_path'] if template['file_path'] else f"DIR:{template['name']}"
|
||
result[key] = {
|
||
'id': template['id'],
|
||
'tenant_id': template['tenant_id'],
|
||
'parent_id': template['parent_id'],
|
||
'name': template['name'],
|
||
'file_path': template['file_path'],
|
||
'state': template['state']
|
||
}
|
||
|
||
return result
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def read_source_relations(conn, tenant_id: int) -> Dict[int, List[int]]:
|
||
"""
|
||
从源数据库读取字段关联关系
|
||
|
||
Returns:
|
||
key: file_id, value: [filed_id列表]
|
||
"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
sql = """
|
||
SELECT file_id, filed_id
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
AND state = 1
|
||
"""
|
||
cursor.execute(sql, (tenant_id,))
|
||
relations = cursor.fetchall()
|
||
|
||
result = {}
|
||
for rel in relations:
|
||
file_id = rel['file_id']
|
||
filed_id = rel['filed_id']
|
||
if file_id not in result:
|
||
result[file_id] = []
|
||
result[file_id].append(filed_id)
|
||
|
||
return result
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def sync_fields_to_target(conn, tenant_id: int, source_input_fields: Dict, source_output_fields: Dict,
|
||
dry_run: bool = False) -> Tuple[Dict[int, int], Dict[int, int]]:
|
||
"""
|
||
同步字段到目标数据库
|
||
|
||
Returns:
|
||
(input_field_id_map, output_field_id_map)
|
||
key: 源字段ID, value: 目标字段ID
|
||
"""
|
||
print_section("同步字段到目标数据库")
|
||
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
# 1. 获取目标数据库中的现有字段
|
||
cursor.execute("""
|
||
SELECT id, filed_code, field_type
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s
|
||
AND state = 1
|
||
""", (tenant_id,))
|
||
existing_fields = cursor.fetchall()
|
||
|
||
existing_by_code = {}
|
||
for field in existing_fields:
|
||
key = (field['filed_code'], field['field_type'])
|
||
existing_by_code[key] = field['id']
|
||
|
||
print(f" 目标数据库现有字段: {len(existing_fields)} 个")
|
||
|
||
# 2. 同步输入字段
|
||
print("\n 同步输入字段...")
|
||
input_field_id_map = {}
|
||
input_created = 0
|
||
input_matched = 0
|
||
|
||
for code, source_field in source_input_fields.items():
|
||
key = (code, 1)
|
||
if key in existing_by_code:
|
||
# 字段已存在,使用现有ID
|
||
target_id = existing_by_code[key]
|
||
input_field_id_map[source_field['id']] = target_id
|
||
input_matched += 1
|
||
else:
|
||
# 创建新字段
|
||
target_id = generate_id()
|
||
input_field_id_map[source_field['id']] = target_id
|
||
|
||
if not dry_run:
|
||
insert_cursor = conn.cursor()
|
||
try:
|
||
insert_cursor.execute("""
|
||
INSERT INTO f_polic_field
|
||
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
""", (
|
||
target_id,
|
||
tenant_id,
|
||
source_field['name'],
|
||
source_field['filed_code'],
|
||
1,
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
conn.commit()
|
||
input_created += 1
|
||
finally:
|
||
insert_cursor.close()
|
||
else:
|
||
input_created += 1
|
||
|
||
print(f" 匹配: {input_matched} 个,创建: {input_created} 个")
|
||
|
||
# 3. 同步输出字段
|
||
print("\n 同步输出字段...")
|
||
output_field_id_map = {}
|
||
output_created = 0
|
||
output_matched = 0
|
||
|
||
for code, source_field in source_output_fields.items():
|
||
key = (code, 2)
|
||
if key in existing_by_code:
|
||
# 字段已存在,使用现有ID
|
||
target_id = existing_by_code[key]
|
||
output_field_id_map[source_field['id']] = target_id
|
||
output_matched += 1
|
||
else:
|
||
# 创建新字段
|
||
target_id = generate_id()
|
||
output_field_id_map[source_field['id']] = target_id
|
||
|
||
if not dry_run:
|
||
insert_cursor = conn.cursor()
|
||
try:
|
||
insert_cursor.execute("""
|
||
INSERT INTO f_polic_field
|
||
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
""", (
|
||
target_id,
|
||
tenant_id,
|
||
source_field['name'],
|
||
source_field['filed_code'],
|
||
2,
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
conn.commit()
|
||
output_created += 1
|
||
finally:
|
||
insert_cursor.close()
|
||
else:
|
||
output_created += 1
|
||
|
||
print(f" 匹配: {output_matched} 个,创建: {output_created} 个")
|
||
|
||
return input_field_id_map, output_field_id_map
|
||
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def sync_templates_to_target(conn, tenant_id: int, source_templates: Dict,
|
||
dry_run: bool = False) -> Dict[int, int]:
|
||
"""
|
||
同步模板到目标数据库
|
||
|
||
Returns:
|
||
template_id_map: key: 源模板ID, value: 目标模板ID
|
||
"""
|
||
print_section("同步模板到目标数据库")
|
||
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
# 1. 获取目标数据库中的现有模板
|
||
cursor.execute("""
|
||
SELECT id, name, file_path, parent_id
|
||
FROM f_polic_file_config
|
||
WHERE tenant_id = %s
|
||
AND state = 1
|
||
""", (tenant_id,))
|
||
existing_templates = cursor.fetchall()
|
||
|
||
existing_by_path = {}
|
||
existing_by_name = {}
|
||
for template in existing_templates:
|
||
if template['file_path']:
|
||
existing_by_path[template['file_path']] = template
|
||
else:
|
||
# 目录节点
|
||
name = template['name']
|
||
if name not in existing_by_name:
|
||
existing_by_name[name] = []
|
||
existing_by_name[name].append(template)
|
||
|
||
print(f" 目标数据库现有模板: {len(existing_templates)} 个")
|
||
|
||
# 2. 先处理目录节点(按层级顺序)
|
||
print("\n 同步目录节点...")
|
||
template_id_map = {}
|
||
dir_created = 0
|
||
dir_matched = 0
|
||
|
||
# 分离目录和文件
|
||
dir_templates = {}
|
||
file_templates = {}
|
||
for key, source_template in source_templates.items():
|
||
if source_template['file_path']:
|
||
file_templates[key] = source_template
|
||
else:
|
||
dir_templates[key] = source_template
|
||
|
||
# 构建目录层级关系(需要先处理父目录)
|
||
# 按parent_id分组,先处理没有parent_id的,再处理有parent_id的
|
||
dirs_by_level = {}
|
||
for key, source_template in dir_templates.items():
|
||
level = 0
|
||
current = source_template
|
||
while current.get('parent_id'):
|
||
level += 1
|
||
# 查找父目录
|
||
parent_found = False
|
||
for t in dir_templates.values():
|
||
if t['id'] == current['parent_id']:
|
||
current = t
|
||
parent_found = True
|
||
break
|
||
if not parent_found:
|
||
break
|
||
|
||
if level not in dirs_by_level:
|
||
dirs_by_level[level] = []
|
||
dirs_by_level[level].append((key, source_template))
|
||
|
||
# 按层级顺序处理目录
|
||
for level in sorted(dirs_by_level.keys()):
|
||
for key, source_template in dirs_by_level[level]:
|
||
source_id = source_template['id']
|
||
name = source_template['name']
|
||
|
||
# 查找匹配的目录(通过名称和parent_id)
|
||
matched = None
|
||
target_parent_id = None
|
||
if source_template['parent_id']:
|
||
target_parent_id = template_id_map.get(source_template['parent_id'])
|
||
|
||
for existing in existing_by_name.get(name, []):
|
||
if not existing['file_path']: # 确保是目录节点
|
||
# 检查parent_id是否匹配
|
||
if existing['parent_id'] == target_parent_id:
|
||
matched = existing
|
||
break
|
||
|
||
if matched:
|
||
target_id = matched['id']
|
||
template_id_map[source_id] = target_id
|
||
dir_matched += 1
|
||
else:
|
||
target_id = generate_id()
|
||
template_id_map[source_id] = target_id
|
||
|
||
if not dry_run:
|
||
insert_cursor = conn.cursor()
|
||
try:
|
||
insert_cursor.execute("""
|
||
INSERT INTO f_polic_file_config
|
||
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, NULL, NOW(), %s, NOW(), %s, 1)
|
||
""", (
|
||
target_id,
|
||
tenant_id,
|
||
target_parent_id,
|
||
name,
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
conn.commit()
|
||
dir_created += 1
|
||
finally:
|
||
insert_cursor.close()
|
||
else:
|
||
dir_created += 1
|
||
|
||
print(f" 匹配: {dir_matched} 个,创建: {dir_created} 个")
|
||
|
||
# 3. 处理文件节点
|
||
print("\n 同步文件节点...")
|
||
file_created = 0
|
||
file_matched = 0
|
||
file_updated = 0
|
||
|
||
for key, source_template in file_templates.items():
|
||
source_id = source_template['id']
|
||
file_path = source_template['file_path']
|
||
name = source_template['name']
|
||
|
||
# 通过file_path匹配
|
||
matched = existing_by_path.get(file_path)
|
||
|
||
if matched:
|
||
target_id = matched['id']
|
||
template_id_map[source_id] = target_id
|
||
file_matched += 1
|
||
|
||
# 检查是否需要更新
|
||
target_parent_id = None
|
||
if source_template['parent_id']:
|
||
target_parent_id = template_id_map.get(source_template['parent_id'])
|
||
|
||
if matched['parent_id'] != target_parent_id or matched['name'] != name:
|
||
file_updated += 1
|
||
if not dry_run:
|
||
update_cursor = conn.cursor()
|
||
try:
|
||
update_cursor.execute("""
|
||
UPDATE f_polic_file_config
|
||
SET parent_id = %s, name = %s, updated_time = NOW(), updated_by = %s
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (target_parent_id, name, UPDATED_BY, target_id, tenant_id))
|
||
conn.commit()
|
||
finally:
|
||
update_cursor.close()
|
||
else:
|
||
target_id = generate_id()
|
||
template_id_map[source_id] = target_id
|
||
|
||
if not dry_run:
|
||
insert_cursor = conn.cursor()
|
||
try:
|
||
# 处理parent_id映射
|
||
target_parent_id = None
|
||
if source_template['parent_id']:
|
||
target_parent_id = template_id_map.get(source_template['parent_id'])
|
||
|
||
insert_cursor.execute("""
|
||
INSERT INTO f_polic_file_config
|
||
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
""", (
|
||
target_id,
|
||
tenant_id,
|
||
target_parent_id,
|
||
name,
|
||
file_path,
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
conn.commit()
|
||
file_created += 1
|
||
finally:
|
||
insert_cursor.close()
|
||
else:
|
||
file_created += 1
|
||
|
||
print(f" 匹配: {file_matched} 个,创建: {file_created} 个,更新: {file_updated} 个")
|
||
|
||
return template_id_map
|
||
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def sync_relations_to_target(conn, tenant_id: int, source_relations: Dict[int, List[int]],
|
||
template_id_map: Dict[int, int],
|
||
input_field_id_map: Dict[int, int],
|
||
output_field_id_map: Dict[int, int],
|
||
dry_run: bool = False):
|
||
"""同步字段关联关系到目标数据库"""
|
||
print_section("同步字段关联关系到目标数据库")
|
||
|
||
# 1. 清理现有关联关系
|
||
print("1. 清理现有关联关系...")
|
||
if not dry_run:
|
||
cursor = conn.cursor()
|
||
try:
|
||
cursor.execute("""
|
||
DELETE FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
""", (tenant_id,))
|
||
deleted_count = cursor.rowcount
|
||
conn.commit()
|
||
print_result(True, f"删除了 {deleted_count} 条旧关联关系")
|
||
finally:
|
||
cursor.close()
|
||
else:
|
||
print(" [预览模式] 将清理所有现有关联关系")
|
||
|
||
# 2. 创建新的关联关系
|
||
print("\n2. 创建新的关联关系...")
|
||
all_field_id_map = {**input_field_id_map, **output_field_id_map}
|
||
|
||
relations_created = 0
|
||
relations_skipped = 0
|
||
|
||
for source_file_id, source_field_ids in source_relations.items():
|
||
# 获取目标file_id
|
||
target_file_id = template_id_map.get(source_file_id)
|
||
if not target_file_id:
|
||
relations_skipped += 1
|
||
continue
|
||
|
||
# 转换field_id
|
||
target_field_ids = []
|
||
for source_field_id in source_field_ids:
|
||
target_field_id = all_field_id_map.get(source_field_id)
|
||
if target_field_id:
|
||
target_field_ids.append(target_field_id)
|
||
|
||
if not target_field_ids:
|
||
continue
|
||
|
||
# 创建关联关系
|
||
if not dry_run:
|
||
cursor = conn.cursor()
|
||
try:
|
||
for target_field_id in target_field_ids:
|
||
relation_id = generate_id()
|
||
cursor.execute("""
|
||
INSERT INTO f_polic_file_field
|
||
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
""", (
|
||
relation_id,
|
||
tenant_id,
|
||
target_file_id,
|
||
target_field_id,
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
conn.commit()
|
||
relations_created += len(target_field_ids)
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f" [错误] 创建关联关系失败: {str(e)}")
|
||
finally:
|
||
cursor.close()
|
||
else:
|
||
relations_created += len(target_field_ids)
|
||
|
||
print_result(True, f"创建了 {relations_created} 条关联关系,跳过 {relations_skipped} 个模板")
|
||
|
||
return {
|
||
'created': relations_created,
|
||
'skipped': relations_skipped
|
||
}
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print_section("跨数据库同步模板、字段和关联关系")
|
||
|
||
# 1. 获取源数据库配置(从.env)
|
||
print_section("读取源数据库配置")
|
||
try:
|
||
source_config = get_source_db_config()
|
||
print_result(True, f"源数据库: {source_config['host']}:{source_config['port']}/{source_config['database']}")
|
||
except Exception as e:
|
||
print_result(False, str(e))
|
||
return
|
||
|
||
# 2. 获取目标数据库配置(从命令行参数)
|
||
print_section("读取目标数据库配置")
|
||
target_config = get_target_db_config_from_args()
|
||
print_result(True, f"目标数据库: {target_config['host']}:{target_config['port']}/{target_config['database']}")
|
||
print(f" 目标租户ID: {target_config['tenant_id']}")
|
||
|
||
if target_config['dry_run']:
|
||
print("\n[注意] 当前为预览模式,不会实际更新数据库")
|
||
|
||
# 3. 连接数据库
|
||
print_section("连接数据库")
|
||
source_conn = test_db_connection(source_config, "源")
|
||
if not source_conn:
|
||
return
|
||
|
||
target_conn = test_db_connection(target_config, "目标")
|
||
if not target_conn:
|
||
source_conn.close()
|
||
return
|
||
|
||
print_result(True, "数据库连接成功")
|
||
|
||
try:
|
||
# 4. 获取源租户ID
|
||
source_tenant_id = target_config.get('source_tenant_id')
|
||
if not source_tenant_id:
|
||
source_tenant_id = get_source_tenant_id(source_conn)
|
||
print(f"\n源租户ID: {source_tenant_id}")
|
||
|
||
# 5. 读取源数据
|
||
print_section("读取源数据库数据")
|
||
|
||
print(" 读取字段...")
|
||
source_input_fields, source_output_fields = read_source_fields(source_conn, source_tenant_id)
|
||
print_result(True, f"输入字段: {len(source_input_fields)} 个,输出字段: {len(source_output_fields)} 个")
|
||
|
||
print("\n 读取模板...")
|
||
source_templates = read_source_templates(source_conn, source_tenant_id)
|
||
print_result(True, f"模板总数: {len(source_templates)} 个")
|
||
|
||
print("\n 读取关联关系...")
|
||
source_relations = read_source_relations(source_conn, source_tenant_id)
|
||
print_result(True, f"关联关系: {len(source_relations)} 个模板有字段关联")
|
||
|
||
# 6. 同步到目标数据库
|
||
target_tenant_id = target_config['tenant_id']
|
||
dry_run = target_config['dry_run']
|
||
|
||
# 6.1 同步字段
|
||
input_field_id_map, output_field_id_map = sync_fields_to_target(
|
||
target_conn, target_tenant_id,
|
||
source_input_fields, source_output_fields,
|
||
dry_run
|
||
)
|
||
|
||
# 6.2 同步模板
|
||
template_id_map = sync_templates_to_target(
|
||
target_conn, target_tenant_id,
|
||
source_templates,
|
||
dry_run
|
||
)
|
||
|
||
# 6.3 同步关联关系
|
||
relations_result = sync_relations_to_target(
|
||
target_conn, target_tenant_id,
|
||
source_relations,
|
||
template_id_map,
|
||
input_field_id_map,
|
||
output_field_id_map,
|
||
dry_run
|
||
)
|
||
|
||
# 7. 总结
|
||
print_section("同步完成")
|
||
if dry_run:
|
||
print(" 本次为预览模式,未实际更新数据库")
|
||
else:
|
||
print(" 数据库已更新")
|
||
|
||
print(f"\n 同步统计:")
|
||
print(f" - 输入字段: {len(input_field_id_map)} 个")
|
||
print(f" - 输出字段: {len(output_field_id_map)} 个")
|
||
print(f" - 模板: {len(template_id_map)} 个")
|
||
print(f" - 关联关系: {relations_result['created']} 条")
|
||
|
||
finally:
|
||
source_conn.close()
|
||
target_conn.close()
|
||
print_result(True, "数据库连接已关闭")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print("\n\n[中断] 用户取消操作")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
print(f"\n[错误] 发生异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|