ai-business-write/restore_database.py

341 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据库恢复脚本
从SQL备份文件恢复数据库
"""
import os
import sys
import subprocess
import pymysql
from pathlib import Path
from dotenv import load_dotenv
import gzip
# 加载环境变量
load_dotenv()
class DatabaseRestore:
"""数据库恢复类"""
def __init__(self):
"""初始化数据库配置"""
self.db_config = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
def restore_with_mysql(self, backup_file, drop_database=False):
"""
使用mysql命令恢复数据库推荐方式
Args:
backup_file: 备份文件路径
drop_database: 是否先删除数据库(危险操作)
Returns:
是否成功
"""
backup_file = Path(backup_file)
if not backup_file.exists():
raise FileNotFoundError(f"备份文件不存在: {backup_file}")
# 如果是压缩文件,先解压
sql_file = backup_file
temp_file = None
if backup_file.suffix == '.gz':
print(f"检测到压缩文件,正在解压...")
temp_file = backup_file.with_suffix('')
with gzip.open(backup_file, 'rb') as f_in:
with open(temp_file, 'wb') as f_out:
f_out.write(f_in.read())
sql_file = temp_file
print(f"解压完成: {sql_file}")
try:
print(f"开始恢复数据库 {self.db_config['database']}...")
print(f"备份文件: {backup_file}")
# 如果指定删除数据库
if drop_database:
print("警告: 将删除现有数据库!")
confirm = input("确认继续? (yes/no): ")
if confirm.lower() != 'yes':
print("已取消恢复操作")
return False
# 删除数据库
self._drop_database()
# 构建mysql命令
cmd = [
'mysql',
f"--host={self.db_config['host']}",
f"--port={self.db_config['port']}",
f"--user={self.db_config['user']}",
f"--password={self.db_config['password']}",
'--default-character-set=utf8mb4',
self.db_config['database']
]
# 执行恢复命令
with open(sql_file, 'r', encoding='utf-8') as f:
result = subprocess.run(
cmd,
stdin=f,
stderr=subprocess.PIPE,
text=True
)
if result.returncode != 0:
error_msg = result.stderr.decode('utf-8') if result.stderr else '未知错误'
raise Exception(f"mysql执行失败: {error_msg}")
print("恢复完成!")
return True
except FileNotFoundError:
print("错误: 未找到mysql命令请确保MySQL客户端已安装并在PATH中")
print("尝试使用Python方式恢复...")
return self.restore_with_python(backup_file, drop_database)
except Exception as e:
print(f"恢复失败: {str(e)}")
raise
finally:
# 清理临时解压文件
if temp_file and temp_file.exists():
temp_file.unlink()
def restore_with_python(self, backup_file, drop_database=False):
"""
使用Python直接连接数据库恢复备用方式
Args:
backup_file: 备份文件路径
drop_database: 是否先删除数据库(危险操作)
Returns:
是否成功
"""
backup_file = Path(backup_file)
if not backup_file.exists():
raise FileNotFoundError(f"备份文件不存在: {backup_file}")
# 如果是压缩文件,先解压
sql_file = backup_file
temp_file = None
if backup_file.suffix == '.gz':
print(f"检测到压缩文件,正在解压...")
temp_file = backup_file.with_suffix('')
with gzip.open(backup_file, 'rb') as f_in:
with open(temp_file, 'wb') as f_out:
f_out.write(f_in.read())
sql_file = temp_file
print(f"解压完成: {sql_file}")
try:
print(f"开始使用Python方式恢复数据库 {self.db_config['database']}...")
print(f"备份文件: {backup_file}")
# 如果指定删除数据库
if drop_database:
print("警告: 将删除现有数据库!")
confirm = input("确认继续? (yes/no): ")
if confirm.lower() != 'yes':
print("已取消恢复操作")
return False
# 删除数据库
self._drop_database()
# 连接数据库
connection = pymysql.connect(**self.db_config)
cursor = connection.cursor()
# 读取SQL文件
print("读取SQL文件...")
with open(sql_file, 'r', encoding='utf-8') as f:
sql_content = f.read()
# 分割SQL语句按分号分割但要注意字符串中的分号
print("执行SQL语句...")
statements = self._split_sql_statements(sql_content)
total = len(statements)
print(f"{total} 条SQL语句")
# 执行每条SQL语句
for i, statement in enumerate(statements, 1):
statement = statement.strip()
if not statement or statement.startswith('--'):
continue
try:
cursor.execute(statement)
if i % 100 == 0:
print(f"进度: {i}/{total} ({i*100//total}%)")
except Exception as e:
# 某些错误可以忽略(如表已存在等)
error_msg = str(e).lower()
if 'already exists' in error_msg or 'duplicate' in error_msg:
continue
print(f"警告: 执行SQL语句时出错 (第{i}条): {str(e)}")
print(f"SQL: {statement[:100]}...")
# 提交事务
connection.commit()
cursor.close()
connection.close()
print("恢复完成!")
return True
except Exception as e:
print(f"恢复失败: {str(e)}")
raise
finally:
# 清理临时解压文件
if temp_file and temp_file.exists():
temp_file.unlink()
def _split_sql_statements(self, sql_content):
"""
分割SQL语句处理字符串中的分号
Args:
sql_content: SQL内容
Returns:
SQL语句列表
"""
statements = []
current_statement = []
in_string = False
string_char = None
i = 0
while i < len(sql_content):
char = sql_content[i]
# 检测字符串开始/结束
if char in ("'", '"', '`') and (i == 0 or sql_content[i-1] != '\\'):
if not in_string:
in_string = True
string_char = char
elif char == string_char:
in_string = False
string_char = None
current_statement.append(char)
# 如果不在字符串中且遇到分号,分割语句
if not in_string and char == ';':
statement = ''.join(current_statement).strip()
if statement:
statements.append(statement)
current_statement = []
i += 1
# 添加最后一条语句
if current_statement:
statement = ''.join(current_statement).strip()
if statement:
statements.append(statement)
return statements
def _drop_database(self):
"""删除数据库(危险操作)"""
try:
# 连接到MySQL服务器不指定数据库
config = self.db_config.copy()
config.pop('database')
connection = pymysql.connect(**config)
cursor = connection.cursor()
cursor.execute(f"DROP DATABASE IF EXISTS `{self.db_config['database']}`")
cursor.execute(f"CREATE DATABASE `{self.db_config['database']}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
connection.commit()
cursor.close()
connection.close()
print(f"数据库 {self.db_config['database']} 已删除并重新创建")
except Exception as e:
raise Exception(f"删除数据库失败: {str(e)}")
def test_connection(self):
"""测试数据库连接"""
try:
connection = pymysql.connect(**self.db_config)
cursor = connection.cursor()
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()[0]
cursor.close()
connection.close()
print(f"数据库连接成功MySQL版本: {version}")
return True
except Exception as e:
print(f"数据库连接失败: {str(e)}")
return False
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='数据库恢复工具')
parser.add_argument('backup_file', help='备份文件路径')
parser.add_argument('--method', choices=['mysql', 'python', 'auto'],
default='auto', help='恢复方法 (默认: auto)')
parser.add_argument('--drop-db', action='store_true',
help='恢复前删除现有数据库(危险操作)')
parser.add_argument('--test', action='store_true',
help='仅测试数据库连接')
args = parser.parse_args()
restore = DatabaseRestore()
# 测试连接
if args.test:
restore.test_connection()
return
# 执行恢复
try:
if args.method == 'mysql':
success = restore.restore_with_mysql(args.backup_file, args.drop_db)
elif args.method == 'python':
success = restore.restore_with_python(args.backup_file, args.drop_db)
else: # auto
try:
success = restore.restore_with_mysql(args.backup_file, args.drop_db)
except:
print("\nmysql方式失败切换到Python方式...")
success = restore.restore_with_python(args.backup_file, args.drop_db)
if success:
print("\n恢复成功!")
else:
print("\n恢复失败!")
sys.exit(1)
except Exception as e:
print(f"\n恢复失败: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()