""" 跨数据库同步模板、字段和关联关系 功能: 1. 从.env文件读取源数据库配置 2. 同步到目标数据库(10.100.31.21) 3. 处理ID映射关系(两个数据库的ID不同) 4. 根据业务逻辑(name, filed_code, file_path)匹配数据 使用方法: python sync_templates_between_databases.py --target-host 10.100.31.21 --target-port 3306 --target-user finyx --target-password FknJYz3FA5WDYtsd --target-database finyx --target-tenant-id 1 """ import os import sys import pymysql import argparse from pathlib import Path from typing import Dict, List, Set, Optional, Tuple from dotenv import load_dotenv # 设置输出编码为UTF-8(Windows兼容) if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') # 加载环境变量 load_dotenv() # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 def print_section(title): """打印章节标题""" print("\n" + "="*70) print(f" {title}") print("="*70) def print_result(success, message): """打印结果""" status = "[OK]" if success else "[FAIL]" print(f"{status} {message}") def generate_id(): """生成ID""" import time return int(time.time() * 1000000) def get_source_db_config() -> Dict: """从.env文件读取源数据库配置""" db_host = os.getenv('DB_HOST') db_port = os.getenv('DB_PORT') db_user = os.getenv('DB_USER') db_password = os.getenv('DB_PASSWORD') db_name = os.getenv('DB_NAME') if not all([db_host, db_port, db_user, db_password, db_name]): raise ValueError( "源数据库配置不完整,请在.env文件中配置以下环境变量:\n" "DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME" ) return { 'host': db_host, 'port': int(db_port), 'user': db_user, 'password': db_password, 'database': db_name, 'charset': 'utf8mb4' } def get_target_db_config_from_args() -> Dict: """从命令行参数获取目标数据库配置""" parser = argparse.ArgumentParser( description='跨数据库同步模板、字段和关联关系', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python sync_templates_between_databases.py --target-host 10.100.31.21 --target-port 3306 --target-user finyx --target-password FknJYz3FA5WDYtsd --target-database finyx --target-tenant-id 1 """ ) parser.add_argument('--target-host', type=str, required=True, help='目标MySQL服务器地址') parser.add_argument('--target-port', type=int, required=True, help='目标MySQL服务器端口') parser.add_argument('--target-user', type=str, required=True, help='目标MySQL用户名') parser.add_argument('--target-password', type=str, required=True, help='目标MySQL密码') parser.add_argument('--target-database', type=str, required=True, help='目标数据库名称') parser.add_argument('--target-tenant-id', type=int, required=True, help='目标租户ID') parser.add_argument('--source-tenant-id', type=int, help='源租户ID(如果不指定,将使用数据库中的第一个tenant_id)') parser.add_argument('--dry-run', action='store_true', help='预览模式(不实际更新数据库)') args = parser.parse_args() return { 'host': args.target_host, 'port': args.target_port, 'user': args.target_user, 'password': args.target_password, 'database': args.target_database, 'charset': 'utf8mb4', 'tenant_id': args.target_tenant_id, 'source_tenant_id': args.source_tenant_id, 'dry_run': args.dry_run } def test_db_connection(config: Dict, label: str) -> Optional[pymysql.Connection]: """测试数据库连接""" try: conn = pymysql.connect( host=config['host'], port=config['port'], user=config['user'], password=config['password'], database=config['database'], charset=config['charset'] ) return conn except Exception as e: print_result(False, f"{label}数据库连接失败: {str(e)}") return None def get_source_tenant_id(conn) -> int: """获取源数据库中的tenant_id""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: cursor.execute("SELECT DISTINCT tenant_id FROM f_polic_file_config LIMIT 1") result = cursor.fetchone() if result: return result['tenant_id'] return 1 finally: cursor.close() def read_source_fields(conn, tenant_id: int) -> Tuple[Dict[str, Dict], Dict[str, Dict]]: """ 从源数据库读取字段数据 Returns: (input_fields_dict, output_fields_dict) key: filed_code, value: 字段信息 """ cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, tenant_id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s AND state = 1 ORDER BY field_type, filed_code """ cursor.execute(sql, (tenant_id,)) fields = cursor.fetchall() input_fields = {} output_fields = {} for field in fields: field_info = { 'id': field['id'], 'tenant_id': field['tenant_id'], 'name': field['name'], 'filed_code': field['filed_code'], 'field_type': field['field_type'], 'state': field['state'] } if field['field_type'] == 1: input_fields[field['filed_code']] = field_info elif field['field_type'] == 2: output_fields[field['filed_code']] = field_info return input_fields, output_fields finally: cursor.close() def read_source_templates(conn, tenant_id: int) -> Dict[str, Dict]: """ 从源数据库读取模板数据 Returns: key: file_path (如果为空则使用name), value: 模板信息 """ cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, tenant_id, parent_id, name, file_path, state FROM f_polic_file_config WHERE tenant_id = %s AND state = 1 ORDER BY file_path, name """ cursor.execute(sql, (tenant_id,)) templates = cursor.fetchall() result = {} for template in templates: # 使用file_path作为key,如果没有file_path则使用name key = template['file_path'] if template['file_path'] else f"DIR:{template['name']}" result[key] = { 'id': template['id'], 'tenant_id': template['tenant_id'], 'parent_id': template['parent_id'], 'name': template['name'], 'file_path': template['file_path'], 'state': template['state'] } return result finally: cursor.close() def read_source_relations(conn, tenant_id: int) -> Dict[int, List[int]]: """ 从源数据库读取字段关联关系 Returns: key: file_id, value: [filed_id列表] """ cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT file_id, filed_id FROM f_polic_file_field WHERE tenant_id = %s AND state = 1 """ cursor.execute(sql, (tenant_id,)) relations = cursor.fetchall() result = {} for rel in relations: file_id = rel['file_id'] filed_id = rel['filed_id'] if file_id not in result: result[file_id] = [] result[file_id].append(filed_id) return result finally: cursor.close() def sync_fields_to_target(conn, tenant_id: int, source_input_fields: Dict, source_output_fields: Dict, dry_run: bool = False) -> Tuple[Dict[int, int], Dict[int, int]]: """ 同步字段到目标数据库 Returns: (input_field_id_map, output_field_id_map) key: 源字段ID, value: 目标字段ID """ print_section("同步字段到目标数据库") cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 1. 获取目标数据库中的现有字段 cursor.execute(""" SELECT id, filed_code, field_type FROM f_polic_field WHERE tenant_id = %s AND state = 1 """, (tenant_id,)) existing_fields = cursor.fetchall() existing_by_code = {} for field in existing_fields: key = (field['filed_code'], field['field_type']) existing_by_code[key] = field['id'] print(f" 目标数据库现有字段: {len(existing_fields)} 个") # 2. 同步输入字段 print("\n 同步输入字段...") input_field_id_map = {} input_created = 0 input_matched = 0 for code, source_field in source_input_fields.items(): key = (code, 1) if key in existing_by_code: # 字段已存在,使用现有ID target_id = existing_by_code[key] input_field_id_map[source_field['id']] = target_id input_matched += 1 else: # 创建新字段 target_id = generate_id() input_field_id_map[source_field['id']] = target_id if not dry_run: insert_cursor = conn.cursor() try: insert_cursor.execute(""" INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """, ( target_id, tenant_id, source_field['name'], source_field['filed_code'], 1, CREATED_BY, UPDATED_BY )) conn.commit() input_created += 1 finally: insert_cursor.close() else: input_created += 1 print(f" 匹配: {input_matched} 个,创建: {input_created} 个") # 3. 同步输出字段 print("\n 同步输出字段...") output_field_id_map = {} output_created = 0 output_matched = 0 for code, source_field in source_output_fields.items(): key = (code, 2) if key in existing_by_code: # 字段已存在,使用现有ID target_id = existing_by_code[key] output_field_id_map[source_field['id']] = target_id output_matched += 1 else: # 创建新字段 target_id = generate_id() output_field_id_map[source_field['id']] = target_id if not dry_run: insert_cursor = conn.cursor() try: insert_cursor.execute(""" INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """, ( target_id, tenant_id, source_field['name'], source_field['filed_code'], 2, CREATED_BY, UPDATED_BY )) conn.commit() output_created += 1 finally: insert_cursor.close() else: output_created += 1 print(f" 匹配: {output_matched} 个,创建: {output_created} 个") return input_field_id_map, output_field_id_map finally: cursor.close() def sync_templates_to_target(conn, tenant_id: int, source_templates: Dict, dry_run: bool = False) -> Dict[int, int]: """ 同步模板到目标数据库 Returns: template_id_map: key: 源模板ID, value: 目标模板ID """ print_section("同步模板到目标数据库") cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 1. 获取目标数据库中的现有模板 cursor.execute(""" SELECT id, name, file_path, parent_id FROM f_polic_file_config WHERE tenant_id = %s AND state = 1 """, (tenant_id,)) existing_templates = cursor.fetchall() existing_by_path = {} existing_by_name = {} for template in existing_templates: if template['file_path']: existing_by_path[template['file_path']] = template else: # 目录节点 name = template['name'] if name not in existing_by_name: existing_by_name[name] = [] existing_by_name[name].append(template) print(f" 目标数据库现有模板: {len(existing_templates)} 个") # 2. 先处理目录节点(按层级顺序) print("\n 同步目录节点...") template_id_map = {} dir_created = 0 dir_matched = 0 # 分离目录和文件 dir_templates = {} file_templates = {} for key, source_template in source_templates.items(): if source_template['file_path']: file_templates[key] = source_template else: dir_templates[key] = source_template # 构建目录层级关系(需要先处理父目录) # 按parent_id分组,先处理没有parent_id的,再处理有parent_id的 dirs_by_level = {} for key, source_template in dir_templates.items(): level = 0 current = source_template while current.get('parent_id'): level += 1 # 查找父目录 parent_found = False for t in dir_templates.values(): if t['id'] == current['parent_id']: current = t parent_found = True break if not parent_found: break if level not in dirs_by_level: dirs_by_level[level] = [] dirs_by_level[level].append((key, source_template)) # 按层级顺序处理目录 for level in sorted(dirs_by_level.keys()): for key, source_template in dirs_by_level[level]: source_id = source_template['id'] name = source_template['name'] # 查找匹配的目录(通过名称和parent_id) matched = None target_parent_id = None if source_template['parent_id']: target_parent_id = template_id_map.get(source_template['parent_id']) for existing in existing_by_name.get(name, []): if not existing['file_path']: # 确保是目录节点 # 检查parent_id是否匹配 if existing['parent_id'] == target_parent_id: matched = existing break if matched: target_id = matched['id'] template_id_map[source_id] = target_id dir_matched += 1 else: target_id = generate_id() template_id_map[source_id] = target_id if not dry_run: insert_cursor = conn.cursor() try: insert_cursor.execute(""" INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, NULL, NOW(), %s, NOW(), %s, 1) """, ( target_id, tenant_id, target_parent_id, name, CREATED_BY, UPDATED_BY )) conn.commit() dir_created += 1 finally: insert_cursor.close() else: dir_created += 1 print(f" 匹配: {dir_matched} 个,创建: {dir_created} 个") # 3. 处理文件节点 print("\n 同步文件节点...") file_created = 0 file_matched = 0 file_updated = 0 for key, source_template in file_templates.items(): source_id = source_template['id'] file_path = source_template['file_path'] name = source_template['name'] # 通过file_path匹配 matched = existing_by_path.get(file_path) if matched: target_id = matched['id'] template_id_map[source_id] = target_id file_matched += 1 # 检查是否需要更新 target_parent_id = None if source_template['parent_id']: target_parent_id = template_id_map.get(source_template['parent_id']) if matched['parent_id'] != target_parent_id or matched['name'] != name: file_updated += 1 if not dry_run: update_cursor = conn.cursor() try: update_cursor.execute(""" UPDATE f_polic_file_config SET parent_id = %s, name = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """, (target_parent_id, name, UPDATED_BY, target_id, tenant_id)) conn.commit() finally: update_cursor.close() else: target_id = generate_id() template_id_map[source_id] = target_id if not dry_run: insert_cursor = conn.cursor() try: # 处理parent_id映射 target_parent_id = None if source_template['parent_id']: target_parent_id = template_id_map.get(source_template['parent_id']) insert_cursor.execute(""" INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """, ( target_id, tenant_id, target_parent_id, name, file_path, CREATED_BY, UPDATED_BY )) conn.commit() file_created += 1 finally: insert_cursor.close() else: file_created += 1 print(f" 匹配: {file_matched} 个,创建: {file_created} 个,更新: {file_updated} 个") return template_id_map finally: cursor.close() def sync_relations_to_target(conn, tenant_id: int, source_relations: Dict[int, List[int]], template_id_map: Dict[int, int], input_field_id_map: Dict[int, int], output_field_id_map: Dict[int, int], dry_run: bool = False): """同步字段关联关系到目标数据库""" print_section("同步字段关联关系到目标数据库") # 1. 清理现有关联关系 print("1. 清理现有关联关系...") if not dry_run: cursor = conn.cursor() try: cursor.execute(""" DELETE FROM f_polic_file_field WHERE tenant_id = %s """, (tenant_id,)) deleted_count = cursor.rowcount conn.commit() print_result(True, f"删除了 {deleted_count} 条旧关联关系") finally: cursor.close() else: print(" [预览模式] 将清理所有现有关联关系") # 2. 创建新的关联关系 print("\n2. 创建新的关联关系...") all_field_id_map = {**input_field_id_map, **output_field_id_map} relations_created = 0 relations_skipped = 0 for source_file_id, source_field_ids in source_relations.items(): # 获取目标file_id target_file_id = template_id_map.get(source_file_id) if not target_file_id: relations_skipped += 1 continue # 转换field_id target_field_ids = [] for source_field_id in source_field_ids: target_field_id = all_field_id_map.get(source_field_id) if target_field_id: target_field_ids.append(target_field_id) if not target_field_ids: continue # 创建关联关系 if not dry_run: cursor = conn.cursor() try: for target_field_id in target_field_ids: relation_id = generate_id() cursor.execute(""" INSERT INTO f_polic_file_field (id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """, ( relation_id, tenant_id, target_file_id, target_field_id, CREATED_BY, UPDATED_BY )) conn.commit() relations_created += len(target_field_ids) except Exception as e: conn.rollback() print(f" [错误] 创建关联关系失败: {str(e)}") finally: cursor.close() else: relations_created += len(target_field_ids) print_result(True, f"创建了 {relations_created} 条关联关系,跳过 {relations_skipped} 个模板") return { 'created': relations_created, 'skipped': relations_skipped } def main(): """主函数""" print_section("跨数据库同步模板、字段和关联关系") # 1. 获取源数据库配置(从.env) print_section("读取源数据库配置") try: source_config = get_source_db_config() print_result(True, f"源数据库: {source_config['host']}:{source_config['port']}/{source_config['database']}") except Exception as e: print_result(False, str(e)) return # 2. 获取目标数据库配置(从命令行参数) print_section("读取目标数据库配置") target_config = get_target_db_config_from_args() print_result(True, f"目标数据库: {target_config['host']}:{target_config['port']}/{target_config['database']}") print(f" 目标租户ID: {target_config['tenant_id']}") if target_config['dry_run']: print("\n[注意] 当前为预览模式,不会实际更新数据库") # 3. 连接数据库 print_section("连接数据库") source_conn = test_db_connection(source_config, "源") if not source_conn: return target_conn = test_db_connection(target_config, "目标") if not target_conn: source_conn.close() return print_result(True, "数据库连接成功") try: # 4. 获取源租户ID source_tenant_id = target_config.get('source_tenant_id') if not source_tenant_id: source_tenant_id = get_source_tenant_id(source_conn) print(f"\n源租户ID: {source_tenant_id}") # 5. 读取源数据 print_section("读取源数据库数据") print(" 读取字段...") source_input_fields, source_output_fields = read_source_fields(source_conn, source_tenant_id) print_result(True, f"输入字段: {len(source_input_fields)} 个,输出字段: {len(source_output_fields)} 个") print("\n 读取模板...") source_templates = read_source_templates(source_conn, source_tenant_id) print_result(True, f"模板总数: {len(source_templates)} 个") print("\n 读取关联关系...") source_relations = read_source_relations(source_conn, source_tenant_id) print_result(True, f"关联关系: {len(source_relations)} 个模板有字段关联") # 6. 同步到目标数据库 target_tenant_id = target_config['tenant_id'] dry_run = target_config['dry_run'] # 6.1 同步字段 input_field_id_map, output_field_id_map = sync_fields_to_target( target_conn, target_tenant_id, source_input_fields, source_output_fields, dry_run ) # 6.2 同步模板 template_id_map = sync_templates_to_target( target_conn, target_tenant_id, source_templates, dry_run ) # 6.3 同步关联关系 relations_result = sync_relations_to_target( target_conn, target_tenant_id, source_relations, template_id_map, input_field_id_map, output_field_id_map, dry_run ) # 7. 总结 print_section("同步完成") if dry_run: print(" 本次为预览模式,未实际更新数据库") else: print(" 数据库已更新") print(f"\n 同步统计:") print(f" - 输入字段: {len(input_field_id_map)} 个") print(f" - 输出字段: {len(output_field_id_map)} 个") print(f" - 模板: {len(template_id_map)} 个") print(f" - 关联关系: {relations_result['created']} 条") finally: source_conn.close() target_conn.close() print_result(True, "数据库连接已关闭") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n[中断] 用户取消操作") sys.exit(0) except Exception as e: print(f"\n[错误] 发生异常: {str(e)}") import traceback traceback.print_exc() sys.exit(1)