341 lines
12 KiB
Python
341 lines
12 KiB
Python
"""
|
||
数据库恢复脚本
|
||
从SQL备份文件恢复数据库
|
||
"""
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import pymysql
|
||
from pathlib import Path
|
||
from dotenv import load_dotenv
|
||
import gzip
|
||
|
||
# 加载环境变量
|
||
load_dotenv()
|
||
|
||
|
||
class DatabaseRestore:
|
||
"""数据库恢复类"""
|
||
|
||
def __init__(self):
|
||
"""初始化数据库配置"""
|
||
self.db_config = {
|
||
'host': os.getenv('DB_HOST', '152.136.177.240'),
|
||
'port': int(os.getenv('DB_PORT', 5012)),
|
||
'user': os.getenv('DB_USER', 'finyx'),
|
||
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
|
||
'database': os.getenv('DB_NAME', 'finyx'),
|
||
'charset': 'utf8mb4'
|
||
}
|
||
|
||
def restore_with_mysql(self, backup_file, drop_database=False):
|
||
"""
|
||
使用mysql命令恢复数据库(推荐方式)
|
||
|
||
Args:
|
||
backup_file: 备份文件路径
|
||
drop_database: 是否先删除数据库(危险操作)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
backup_file = Path(backup_file)
|
||
|
||
if not backup_file.exists():
|
||
raise FileNotFoundError(f"备份文件不存在: {backup_file}")
|
||
|
||
# 如果是压缩文件,先解压
|
||
sql_file = backup_file
|
||
temp_file = None
|
||
if backup_file.suffix == '.gz':
|
||
print(f"检测到压缩文件,正在解压...")
|
||
temp_file = backup_file.with_suffix('')
|
||
with gzip.open(backup_file, 'rb') as f_in:
|
||
with open(temp_file, 'wb') as f_out:
|
||
f_out.write(f_in.read())
|
||
sql_file = temp_file
|
||
print(f"解压完成: {sql_file}")
|
||
|
||
try:
|
||
print(f"开始恢复数据库 {self.db_config['database']}...")
|
||
print(f"备份文件: {backup_file}")
|
||
|
||
# 如果指定删除数据库
|
||
if drop_database:
|
||
print("警告: 将删除现有数据库!")
|
||
confirm = input("确认继续? (yes/no): ")
|
||
if confirm.lower() != 'yes':
|
||
print("已取消恢复操作")
|
||
return False
|
||
|
||
# 删除数据库
|
||
self._drop_database()
|
||
|
||
# 构建mysql命令
|
||
cmd = [
|
||
'mysql',
|
||
f"--host={self.db_config['host']}",
|
||
f"--port={self.db_config['port']}",
|
||
f"--user={self.db_config['user']}",
|
||
f"--password={self.db_config['password']}",
|
||
'--default-character-set=utf8mb4',
|
||
self.db_config['database']
|
||
]
|
||
|
||
# 执行恢复命令
|
||
with open(sql_file, 'r', encoding='utf-8') as f:
|
||
result = subprocess.run(
|
||
cmd,
|
||
stdin=f,
|
||
stderr=subprocess.PIPE,
|
||
text=True
|
||
)
|
||
|
||
if result.returncode != 0:
|
||
error_msg = result.stderr.decode('utf-8') if result.stderr else '未知错误'
|
||
raise Exception(f"mysql执行失败: {error_msg}")
|
||
|
||
print("恢复完成!")
|
||
return True
|
||
|
||
except FileNotFoundError:
|
||
print("错误: 未找到mysql命令,请确保MySQL客户端已安装并在PATH中")
|
||
print("尝试使用Python方式恢复...")
|
||
return self.restore_with_python(backup_file, drop_database)
|
||
except Exception as e:
|
||
print(f"恢复失败: {str(e)}")
|
||
raise
|
||
finally:
|
||
# 清理临时解压文件
|
||
if temp_file and temp_file.exists():
|
||
temp_file.unlink()
|
||
|
||
def restore_with_python(self, backup_file, drop_database=False):
|
||
"""
|
||
使用Python直接连接数据库恢复(备用方式)
|
||
|
||
Args:
|
||
backup_file: 备份文件路径
|
||
drop_database: 是否先删除数据库(危险操作)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
backup_file = Path(backup_file)
|
||
|
||
if not backup_file.exists():
|
||
raise FileNotFoundError(f"备份文件不存在: {backup_file}")
|
||
|
||
# 如果是压缩文件,先解压
|
||
sql_file = backup_file
|
||
temp_file = None
|
||
if backup_file.suffix == '.gz':
|
||
print(f"检测到压缩文件,正在解压...")
|
||
temp_file = backup_file.with_suffix('')
|
||
with gzip.open(backup_file, 'rb') as f_in:
|
||
with open(temp_file, 'wb') as f_out:
|
||
f_out.write(f_in.read())
|
||
sql_file = temp_file
|
||
print(f"解压完成: {sql_file}")
|
||
|
||
try:
|
||
print(f"开始使用Python方式恢复数据库 {self.db_config['database']}...")
|
||
print(f"备份文件: {backup_file}")
|
||
|
||
# 如果指定删除数据库
|
||
if drop_database:
|
||
print("警告: 将删除现有数据库!")
|
||
confirm = input("确认继续? (yes/no): ")
|
||
if confirm.lower() != 'yes':
|
||
print("已取消恢复操作")
|
||
return False
|
||
|
||
# 删除数据库
|
||
self._drop_database()
|
||
|
||
# 连接数据库
|
||
connection = pymysql.connect(**self.db_config)
|
||
cursor = connection.cursor()
|
||
|
||
# 读取SQL文件
|
||
print("读取SQL文件...")
|
||
with open(sql_file, 'r', encoding='utf-8') as f:
|
||
sql_content = f.read()
|
||
|
||
# 分割SQL语句(按分号分割,但要注意字符串中的分号)
|
||
print("执行SQL语句...")
|
||
statements = self._split_sql_statements(sql_content)
|
||
|
||
total = len(statements)
|
||
print(f"共 {total} 条SQL语句")
|
||
|
||
# 执行每条SQL语句
|
||
for i, statement in enumerate(statements, 1):
|
||
statement = statement.strip()
|
||
if not statement or statement.startswith('--'):
|
||
continue
|
||
|
||
try:
|
||
cursor.execute(statement)
|
||
if i % 100 == 0:
|
||
print(f"进度: {i}/{total} ({i*100//total}%)")
|
||
except Exception as e:
|
||
# 某些错误可以忽略(如表已存在等)
|
||
error_msg = str(e).lower()
|
||
if 'already exists' in error_msg or 'duplicate' in error_msg:
|
||
continue
|
||
print(f"警告: 执行SQL语句时出错 (第{i}条): {str(e)}")
|
||
print(f"SQL: {statement[:100]}...")
|
||
|
||
# 提交事务
|
||
connection.commit()
|
||
|
||
cursor.close()
|
||
connection.close()
|
||
|
||
print("恢复完成!")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"恢复失败: {str(e)}")
|
||
raise
|
||
finally:
|
||
# 清理临时解压文件
|
||
if temp_file and temp_file.exists():
|
||
temp_file.unlink()
|
||
|
||
def _split_sql_statements(self, sql_content):
|
||
"""
|
||
分割SQL语句(处理字符串中的分号)
|
||
|
||
Args:
|
||
sql_content: SQL内容
|
||
|
||
Returns:
|
||
SQL语句列表
|
||
"""
|
||
statements = []
|
||
current_statement = []
|
||
in_string = False
|
||
string_char = None
|
||
i = 0
|
||
|
||
while i < len(sql_content):
|
||
char = sql_content[i]
|
||
|
||
# 检测字符串开始/结束
|
||
if char in ("'", '"', '`') and (i == 0 or sql_content[i-1] != '\\'):
|
||
if not in_string:
|
||
in_string = True
|
||
string_char = char
|
||
elif char == string_char:
|
||
in_string = False
|
||
string_char = None
|
||
|
||
current_statement.append(char)
|
||
|
||
# 如果不在字符串中且遇到分号,分割语句
|
||
if not in_string and char == ';':
|
||
statement = ''.join(current_statement).strip()
|
||
if statement:
|
||
statements.append(statement)
|
||
current_statement = []
|
||
|
||
i += 1
|
||
|
||
# 添加最后一条语句
|
||
if current_statement:
|
||
statement = ''.join(current_statement).strip()
|
||
if statement:
|
||
statements.append(statement)
|
||
|
||
return statements
|
||
|
||
def _drop_database(self):
|
||
"""删除数据库(危险操作)"""
|
||
try:
|
||
# 连接到MySQL服务器(不指定数据库)
|
||
config = self.db_config.copy()
|
||
config.pop('database')
|
||
connection = pymysql.connect(**config)
|
||
cursor = connection.cursor()
|
||
|
||
cursor.execute(f"DROP DATABASE IF EXISTS `{self.db_config['database']}`")
|
||
cursor.execute(f"CREATE DATABASE `{self.db_config['database']}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
|
||
|
||
connection.commit()
|
||
cursor.close()
|
||
connection.close()
|
||
|
||
print(f"数据库 {self.db_config['database']} 已删除并重新创建")
|
||
|
||
except Exception as e:
|
||
raise Exception(f"删除数据库失败: {str(e)}")
|
||
|
||
def test_connection(self):
|
||
"""测试数据库连接"""
|
||
try:
|
||
connection = pymysql.connect(**self.db_config)
|
||
cursor = connection.cursor()
|
||
cursor.execute("SELECT VERSION()")
|
||
version = cursor.fetchone()[0]
|
||
cursor.close()
|
||
connection.close()
|
||
|
||
print(f"数据库连接成功!MySQL版本: {version}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"数据库连接失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description='数据库恢复工具')
|
||
parser.add_argument('backup_file', help='备份文件路径')
|
||
parser.add_argument('--method', choices=['mysql', 'python', 'auto'],
|
||
default='auto', help='恢复方法 (默认: auto)')
|
||
parser.add_argument('--drop-db', action='store_true',
|
||
help='恢复前删除现有数据库(危险操作)')
|
||
parser.add_argument('--test', action='store_true',
|
||
help='仅测试数据库连接')
|
||
|
||
args = parser.parse_args()
|
||
|
||
restore = DatabaseRestore()
|
||
|
||
# 测试连接
|
||
if args.test:
|
||
restore.test_connection()
|
||
return
|
||
|
||
# 执行恢复
|
||
try:
|
||
if args.method == 'mysql':
|
||
success = restore.restore_with_mysql(args.backup_file, args.drop_db)
|
||
elif args.method == 'python':
|
||
success = restore.restore_with_python(args.backup_file, args.drop_db)
|
||
else: # auto
|
||
try:
|
||
success = restore.restore_with_mysql(args.backup_file, args.drop_db)
|
||
except:
|
||
print("\nmysql方式失败,切换到Python方式...")
|
||
success = restore.restore_with_python(args.backup_file, args.drop_db)
|
||
|
||
if success:
|
||
print("\n恢复成功!")
|
||
else:
|
||
print("\n恢复失败!")
|
||
sys.exit(1)
|
||
|
||
except Exception as e:
|
||
print(f"\n恢复失败: {str(e)}")
|
||
sys.exit(1)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|