""" 数据库备份脚本 支持使用mysqldump命令或Python直接导出SQL文件 """ import os import sys import subprocess import pymysql from datetime import datetime from pathlib import Path from dotenv import load_dotenv # 加载环境变量 load_dotenv() class DatabaseBackup: """数据库备份类""" def __init__(self): """初始化数据库配置""" self.db_config = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } # 备份文件存储目录 self.backup_dir = Path('backups') self.backup_dir.mkdir(exist_ok=True) def backup_with_mysqldump(self, output_file=None, compress=False): """ 使用mysqldump命令备份数据库(推荐方式) Args: output_file: 输出文件路径,如果为None则自动生成 compress: 是否压缩备份文件 Returns: 备份文件路径 """ # 生成备份文件名 if output_file is None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = self.backup_dir / f"backup_{self.db_config['database']}_{timestamp}.sql" output_file = Path(output_file) # 构建mysqldump命令 cmd = [ 'mysqldump', f"--host={self.db_config['host']}", f"--port={self.db_config['port']}", f"--user={self.db_config['user']}", f"--password={self.db_config['password']}", '--single-transaction', # 保证数据一致性 '--routines', # 包含存储过程和函数 '--triggers', # 包含触发器 '--events', # 包含事件 '--add-drop-table', # 添加DROP TABLE语句 '--default-character-set=utf8mb4', # 设置字符集 self.db_config['database'] ] try: print(f"开始备份数据库 {self.db_config['database']}...") print(f"备份文件: {output_file}") # 执行备份命令 with open(output_file, 'w', encoding='utf-8') as f: result = subprocess.run( cmd, stdout=f, stderr=subprocess.PIPE, text=True ) if result.returncode != 0: error_msg = result.stderr.decode('utf-8') if result.stderr else '未知错误' raise Exception(f"mysqldump执行失败: {error_msg}") # 检查文件大小 file_size = output_file.stat().st_size print(f"备份完成!文件大小: {file_size / 1024 / 1024:.2f} MB") # 如果需要压缩 if compress: compressed_file = self._compress_file(output_file) print(f"压缩完成: {compressed_file}") return str(compressed_file) return str(output_file) except FileNotFoundError: print("错误: 未找到mysqldump命令,请确保MySQL客户端已安装并在PATH中") print("尝试使用Python方式备份...") return self.backup_with_python(output_file) except Exception as e: print(f"备份失败: {str(e)}") raise def backup_with_python(self, output_file=None): """ 使用Python直接连接数据库备份(备用方式) Args: output_file: 输出文件路径,如果为None则自动生成 Returns: 备份文件路径 """ if output_file is None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = self.backup_dir / f"backup_{self.db_config['database']}_{timestamp}.sql" output_file = Path(output_file) try: print(f"开始使用Python方式备份数据库 {self.db_config['database']}...") print(f"备份文件: {output_file}") # 连接数据库 connection = pymysql.connect(**self.db_config) cursor = connection.cursor() with open(output_file, 'w', encoding='utf-8') as f: # 写入文件头 f.write(f"-- MySQL数据库备份\n") f.write(f"-- 数据库: {self.db_config['database']}\n") f.write(f"-- 备份时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"-- 主机: {self.db_config['host']}:{self.db_config['port']}\n") f.write("--\n\n") f.write(f"SET NAMES utf8mb4;\n") f.write(f"SET FOREIGN_KEY_CHECKS=0;\n\n") # 获取所有表 cursor.execute("SHOW TABLES") tables = [table[0] for table in cursor.fetchall()] print(f"找到 {len(tables)} 个表") # 备份每个表 for table in tables: print(f"备份表: {table}") # 获取表结构 cursor.execute(f"SHOW CREATE TABLE `{table}`") create_table_sql = cursor.fetchone()[1] f.write(f"-- ----------------------------\n") f.write(f"-- 表结构: {table}\n") f.write(f"-- ----------------------------\n") f.write(f"DROP TABLE IF EXISTS `{table}`;\n") f.write(f"{create_table_sql};\n\n") # 获取表数据 cursor.execute(f"SELECT * FROM `{table}`") rows = cursor.fetchall() if rows: # 获取列名 cursor.execute(f"DESCRIBE `{table}`") columns = [col[0] for col in cursor.fetchall()] f.write(f"-- ----------------------------\n") f.write(f"-- 表数据: {table}\n") f.write(f"-- ----------------------------\n") # 分批写入数据 batch_size = 1000 for i in range(0, len(rows), batch_size): batch = rows[i:i+batch_size] values_list = [] for row in batch: values = [] for value in row: if value is None: values.append('NULL') elif isinstance(value, (int, float)): values.append(str(value)) else: # 转义特殊字符 escaped_value = str(value).replace('\\', '\\\\').replace("'", "\\'") values.append(f"'{escaped_value}'") values_list.append(f"({', '.join(values)})") columns_str = ', '.join([f"`{col}`" for col in columns]) values_str = ',\n'.join(values_list) f.write(f"INSERT INTO `{table}` ({columns_str}) VALUES\n") f.write(f"{values_str};\n\n") print(f" 完成: {len(rows)} 条记录") f.write("SET FOREIGN_KEY_CHECKS=1;\n") cursor.close() connection.close() # 检查文件大小 file_size = output_file.stat().st_size print(f"备份完成!文件大小: {file_size / 1024 / 1024:.2f} MB") return str(output_file) except Exception as e: print(f"备份失败: {str(e)}") raise def _compress_file(self, file_path): """ 压缩备份文件 Args: file_path: 文件路径 Returns: 压缩后的文件路径 """ import gzip file_path = Path(file_path) compressed_path = file_path.with_suffix('.sql.gz') with open(file_path, 'rb') as f_in: with gzip.open(compressed_path, 'wb') as f_out: f_out.writelines(f_in) # 删除原文件 file_path.unlink() return compressed_path def list_backups(self): """ 列出所有备份文件 Returns: 备份文件列表 """ backups = [] for file in sorted(self.backup_dir.glob('backup_*.sql*'), reverse=True): file_info = { 'filename': file.name, 'path': str(file), 'size': file.stat().st_size, 'size_mb': file.stat().st_size / 1024 / 1024, 'modified': datetime.fromtimestamp(file.stat().st_mtime) } backups.append(file_info) return backups def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='数据库备份工具') parser.add_argument('--method', choices=['mysqldump', 'python', 'auto'], default='auto', help='备份方法 (默认: auto)') parser.add_argument('--output', '-o', help='输出文件路径') parser.add_argument('--compress', '-c', action='store_true', help='压缩备份文件') parser.add_argument('--list', '-l', action='store_true', help='列出所有备份文件') args = parser.parse_args() backup = DatabaseBackup() # 列出备份文件 if args.list: backups = backup.list_backups() if backups: print(f"\n找到 {len(backups)} 个备份文件:\n") print(f"{'文件名':<50} {'大小(MB)':<15} {'修改时间':<20}") print("-" * 85) for b in backups: print(f"{b['filename']:<50} {b['size_mb']:<15.2f} {b['modified'].strftime('%Y-%m-%d %H:%M:%S'):<20}") else: print("未找到备份文件") return # 执行备份 try: if args.method == 'mysqldump': backup_file = backup.backup_with_mysqldump(args.output, args.compress) elif args.method == 'python': backup_file = backup.backup_with_python(args.output) else: # auto try: backup_file = backup.backup_with_mysqldump(args.output, args.compress) except: print("\nmysqldump方式失败,切换到Python方式...") backup_file = backup.backup_with_python(args.output) print(f"\n备份成功!") print(f"备份文件: {backup_file}") except Exception as e: print(f"\n备份失败: {str(e)}") sys.exit(1) if __name__ == '__main__': main()