ai-business-write/sync_tables_to_new_database.py
2025-12-12 09:50:22 +08:00

459 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
从现有数据库同步三个表的数据到新数据库
同步的表f_polic_field, f_polic_file_config, f_polic_file_field
同步前会先备份新数据库
"""
import os
import sys
import subprocess
import pymysql
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
from dotenv import load_dotenv
# 设置输出编码为UTF-8Windows控制台兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# 加载环境变量
load_dotenv()
# 现有数据库配置(源数据库)
SOURCE_DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
# 新数据库配置(目标数据库)
TARGET_DB_CONFIG = {
'host': '10.100.31.21',
'port': 3306,
'user': 'finyx',
'password': 'FknJYz3FA5WDYtsd',
'database': 'finyx',
'charset': 'utf8mb4'
}
# 需要同步的表
TABLES_TO_SYNC = ['f_polic_field', 'f_polic_file_config', 'f_polic_file_field']
# 备份文件存储目录
BACKUP_DIR = Path('backups')
BACKUP_DIR.mkdir(exist_ok=True)
def backup_target_database() -> str:
"""
备份目标数据库
Returns:
备份文件路径
"""
print("=" * 60)
print("步骤 1: 备份新数据库")
print("=" * 60)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = BACKUP_DIR / f"backup_target_db_{timestamp}.sql"
# 构建mysqldump命令
cmd = [
'mysqldump',
f"--host={TARGET_DB_CONFIG['host']}",
f"--port={TARGET_DB_CONFIG['port']}",
f"--user={TARGET_DB_CONFIG['user']}",
f"--password={TARGET_DB_CONFIG['password']}",
'--single-transaction',
'--routines',
'--triggers',
'--events',
'--add-drop-table',
'--default-character-set=utf8mb4',
TARGET_DB_CONFIG['database']
]
try:
print(f"开始备份数据库 {TARGET_DB_CONFIG['database']}...")
print(f"备份文件: {backup_file}")
# 执行备份命令
with open(backup_file, 'w', encoding='utf-8') as f:
result = subprocess.run(
cmd,
stdout=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode != 0:
error_msg = result.stderr if result.stderr else '未知错误'
raise Exception(f"mysqldump执行失败: {error_msg}")
# 检查文件大小
file_size = backup_file.stat().st_size
print(f"备份完成!文件大小: {file_size / 1024 / 1024:.2f} MB")
print(f"备份文件路径: {backup_file}")
return str(backup_file)
except FileNotFoundError:
print("警告: 未找到mysqldump命令尝试使用Python方式备份...")
return backup_target_database_with_python(backup_file)
except Exception as e:
print(f"备份失败: {str(e)}")
raise
def backup_target_database_with_python(backup_file: Path) -> str:
"""
使用Python方式备份目标数据库备用方式
Args:
backup_file: 备份文件路径
Returns:
备份文件路径
"""
try:
print(f"开始使用Python方式备份数据库 {TARGET_DB_CONFIG['database']}...")
# 连接数据库
connection = pymysql.connect(**TARGET_DB_CONFIG)
cursor = connection.cursor()
with open(backup_file, 'w', encoding='utf-8') as f:
# 写入文件头
f.write(f"-- MySQL数据库备份\n")
f.write(f"-- 数据库: {TARGET_DB_CONFIG['database']}\n")
f.write(f"-- 备份时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"-- 主机: {TARGET_DB_CONFIG['host']}:{TARGET_DB_CONFIG['port']}\n")
f.write("--\n\n")
f.write(f"SET NAMES utf8mb4;\n")
f.write(f"SET FOREIGN_KEY_CHECKS=0;\n\n")
# 获取所有表
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
print(f"找到 {len(tables)} 个表")
# 备份每个表
for table in tables:
print(f"备份表: {table}")
# 获取表结构
cursor.execute(f"SHOW CREATE TABLE `{table}`")
create_table_sql = cursor.fetchone()[1]
f.write(f"-- ----------------------------\n")
f.write(f"-- 表结构: {table}\n")
f.write(f"-- ----------------------------\n")
f.write(f"DROP TABLE IF EXISTS `{table}`;\n")
f.write(f"{create_table_sql};\n\n")
# 获取表数据
cursor.execute(f"SELECT * FROM `{table}`")
rows = cursor.fetchall()
if rows:
# 获取列名
cursor.execute(f"DESCRIBE `{table}`")
columns = [col[0] for col in cursor.fetchall()]
f.write(f"-- ----------------------------\n")
f.write(f"-- 表数据: {table}\n")
f.write(f"-- ----------------------------\n")
# 分批写入数据
batch_size = 1000
for i in range(0, len(rows), batch_size):
batch = rows[i:i+batch_size]
values_list = []
for row in batch:
values = []
for value in row:
if value is None:
values.append('NULL')
elif isinstance(value, (int, float)):
values.append(str(value))
else:
# 转义特殊字符
escaped_value = str(value).replace('\\', '\\\\').replace("'", "\\'")
values.append(f"'{escaped_value}'")
values_list.append(f"({', '.join(values)})")
columns_str = ', '.join([f"`{col}`" for col in columns])
values_str = ',\n'.join(values_list)
f.write(f"INSERT INTO `{table}` ({columns_str}) VALUES\n")
f.write(f"{values_str};\n\n")
print(f" 完成: {len(rows)} 条记录")
f.write("SET FOREIGN_KEY_CHECKS=1;\n")
cursor.close()
connection.close()
# 检查文件大小
file_size = backup_file.stat().st_size
print(f"备份完成!文件大小: {file_size / 1024 / 1024:.2f} MB")
return str(backup_file)
except Exception as e:
print(f"备份失败: {str(e)}")
raise
def get_table_data(conn, table_name: str) -> List[Dict[str, Any]]:
"""
从源数据库获取表的所有数据
Args:
conn: 数据库连接
table_name: 表名
Returns:
数据列表
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(f"SELECT * FROM `{table_name}`")
return cursor.fetchall()
finally:
cursor.close()
def get_table_columns(conn, table_name: str) -> List[str]:
"""
获取表的列名
Args:
conn: 数据库连接
table_name: 表名
Returns:
列名列表
"""
cursor = conn.cursor()
try:
cursor.execute(f"DESCRIBE `{table_name}`")
return [col[0] for col in cursor.fetchall()]
finally:
cursor.close()
def clear_table(conn, table_name: str):
"""
清空目标数据库中的表
Args:
conn: 数据库连接
table_name: 表名
"""
cursor = conn.cursor()
try:
# 禁用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS=0")
# 清空表
cursor.execute(f"TRUNCATE TABLE `{table_name}`")
# 恢复外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS=1")
conn.commit()
print(f" 已清空表: {table_name}")
except Exception as e:
conn.rollback()
raise Exception(f"清空表 {table_name} 失败: {str(e)}")
finally:
cursor.close()
def insert_table_data(conn, table_name: str, columns: List[str], data: List[Dict[str, Any]]):
"""
将数据插入到目标数据库
Args:
conn: 数据库连接
table_name: 表名
columns: 列名列表
data: 数据列表
"""
if not data:
print(f"{table_name} 没有数据需要插入")
return
cursor = conn.cursor()
try:
# 禁用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS=0")
# 构建INSERT语句
columns_str = ', '.join([f"`{col}`" for col in columns])
placeholders = ', '.join(['%s'] * len(columns))
insert_sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})"
# 批量插入数据
batch_size = 1000
total_inserted = 0
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
values_list = []
for row in batch:
values = [row.get(col) for col in columns]
values_list.append(values)
cursor.executemany(insert_sql, values_list)
total_inserted += len(batch)
# 恢复外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS=1")
conn.commit()
print(f" 已插入 {total_inserted} 条记录到表: {table_name}")
except Exception as e:
conn.rollback()
raise Exception(f"插入数据到表 {table_name} 失败: {str(e)}")
finally:
cursor.close()
def sync_table(source_conn, target_conn, table_name: str):
"""
同步单个表的数据
Args:
source_conn: 源数据库连接
target_conn: 目标数据库连接
table_name: 表名
"""
print(f"\n同步表: {table_name}")
print("-" * 60)
try:
# 获取表的列名
columns = get_table_columns(source_conn, table_name)
print(f" 表列: {', '.join(columns)}")
# 从源数据库获取数据
print(f" 从源数据库读取数据...")
source_data = get_table_data(source_conn, table_name)
print(f" 读取到 {len(source_data)} 条记录")
# 清空目标表
print(f" 清空目标表...")
clear_table(target_conn, table_name)
# 插入数据到目标表
if source_data:
print(f" 插入数据到目标表...")
insert_table_data(target_conn, table_name, columns, source_data)
else:
print(f"{table_name} 没有数据需要同步")
print(f"[OK] 表 {table_name} 同步完成")
except Exception as e:
print(f"[ERROR] 表 {table_name} 同步失败: {str(e)}")
raise
def main():
"""主函数"""
print("=" * 60)
print("数据库表同步工具")
print("=" * 60)
print(f"源数据库: {SOURCE_DB_CONFIG['host']}:{SOURCE_DB_CONFIG['port']}/{SOURCE_DB_CONFIG['database']}")
print(f"目标数据库: {TARGET_DB_CONFIG['host']}:{TARGET_DB_CONFIG['port']}/{TARGET_DB_CONFIG['database']}")
print(f"同步表: {', '.join(TABLES_TO_SYNC)}")
print("=" * 60)
# 步骤1: 备份目标数据库
try:
backup_file = backup_target_database()
print(f"\n[OK] 备份完成: {backup_file}\n")
except Exception as e:
print(f"\n[ERROR] 备份失败: {str(e)}")
response = input("是否继续同步?(y/n): ")
if response.lower() != 'y':
print("已取消同步")
sys.exit(1)
# 步骤2: 连接数据库
print("=" * 60)
print("步骤 2: 连接数据库")
print("=" * 60)
source_conn = None
target_conn = None
try:
print("连接源数据库...")
source_conn = pymysql.connect(**SOURCE_DB_CONFIG)
print("[OK] 源数据库连接成功")
print("连接目标数据库...")
try:
target_conn = pymysql.connect(**TARGET_DB_CONFIG, connect_timeout=10)
print("[OK] 目标数据库连接成功\n")
except pymysql.err.OperationalError as e:
if "timed out" in str(e) or "2003" in str(e):
print(f"[ERROR] 无法连接到目标数据库 {TARGET_DB_CONFIG['host']}:{TARGET_DB_CONFIG['port']}")
print("请检查:")
print(" 1. 网络连接是否正常")
print(" 2. 是否需要VPN连接")
print(" 3. 数据库服务器是否可访问")
print(" 4. 防火墙设置是否正确")
raise
# 步骤3: 同步表数据
print("=" * 60)
print("步骤 3: 同步表数据")
print("=" * 60)
for table_name in TABLES_TO_SYNC:
try:
sync_table(source_conn, target_conn, table_name)
except Exception as e:
print(f"\n[ERROR] 同步表 {table_name} 时发生错误: {str(e)}")
print("已停止同步")
sys.exit(1)
print("\n" + "=" * 60)
print("[OK] 所有表同步完成!")
print("=" * 60)
except pymysql.Error as e:
print(f"\n[ERROR] 数据库连接失败: {str(e)}")
if "timed out" in str(e) or "2003" in str(e):
print("\n提示如果无法连接到目标数据库请检查网络连接和VPN设置")
sys.exit(1)
except Exception as e:
print(f"\n[ERROR] 发生错误: {str(e)}")
sys.exit(1)
finally:
if source_conn:
source_conn.close()
if target_conn:
target_conn.close()
if __name__ == '__main__':
main()