""" SQL 结果解析服务 """ import time import os from pathlib import Path from typing import List, Dict, Optional import pandas as pd from app.schemas.parse_sql_result import ( TableInfo, FieldInfo, FileInfo, ) from app.utils.logger import logger from app.core.exceptions import ValidationException # ==================== 列名映射 ==================== COLUMN_MAPPING = { # 表英文名 '表英文名': 'table_name', 'TABLE_NAME': 'table_name', 'table_name': 'table_name', # 表中文名/描述 '表中文名/描述': 'table_comment', 'TABLE_COMMENT': 'table_comment', 'table_comment': 'table_comment', # 字段英文名 '字段英文名': 'column_name', 'COLUMN_NAME': 'column_name', 'column_name': 'column_name', # 字段中文名 '字段中文名': 'column_comment', 'COLUMN_COMMENT': 'column_comment', 'column_comment': 'column_comment', # 字段类型 '字段类型': 'column_type', 'COLUMN_TYPE': 'column_type', 'column_type': 'column_type', } # ==================== 文件解析函数 ==================== def parse_sql_result_excel(file_path: str) -> List[TableInfo]: """ 解析 Excel 格式的 SQL 结果 Args: file_path: Excel 文件路径 Returns: 解析出的表列表 """ tables = [] try: # 读取 Excel 文件 df = pd.read_excel(file_path) # 标准化列名 df.columns = df.columns.str.strip() df = df.rename(columns=COLUMN_MAPPING) # 验证必要列是否存在 required_columns = ['table_name', 'column_name', 'column_type'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: raise ValidationException(f"缺少必要列: {', '.join(missing_columns)}") # 清理数据(去除空值) df = df.dropna(subset=['table_name', 'column_name']) # 按表名分组 tables_dict: Dict[str, List[FieldInfo]] = {} for _, row in df.iterrows(): table_name = str(row['table_name']).strip() column_name = str(row['column_name']).strip() if not table_name or not column_name: continue # 获取字段信息 field = FieldInfo( raw_name=column_name, display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None, type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)', comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None ) if table_name not in tables_dict: tables_dict[table_name] = [] tables_dict[table_name].append(field) # 构建表信息 for table_name, fields in tables_dict.items(): # 获取表的描述信息(取第一个字段的表描述,或使用表名) table_comment = None if 'table_comment' in df.columns: table_rows = df[df['table_name'] == table_name] if not table_rows.empty: first_row = table_rows.iloc[0] if pd.notna(first_row.get('table_comment')): table_comment = str(first_row['table_comment']).strip() table = TableInfo( raw_name=table_name, display_name=table_comment if table_comment else table_name, description=table_comment, fields=fields, field_count=len(fields) ) tables.append(table) except Exception as e: logger.error(f"Excel 解析失败: {str(e)}") raise ValidationException(f"Excel 解析失败: {str(e)}") return tables def parse_sql_result_csv(file_path: str) -> List[TableInfo]: """ 解析 CSV 格式的 SQL 结果 Args: file_path: CSV 文件路径 Returns: 解析出的表列表 """ tables = [] try: # 尝试多种编码 encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1'] df = None for encoding in encodings: try: df = pd.read_csv(file_path, encoding=encoding) break except UnicodeDecodeError: continue if df is None: raise ValidationException("无法解析 CSV 文件,请检查文件编码") # 标准化列名 df.columns = df.columns.str.strip() df = df.rename(columns=COLUMN_MAPPING) # 验证必要列 required_columns = ['table_name', 'column_name', 'column_type'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: raise ValidationException(f"缺少必要列: {', '.join(missing_columns)}") # 清理数据 df = df.dropna(subset=['table_name', 'column_name']) # 按表名分组 tables_dict: Dict[str, List[FieldInfo]] = {} for _, row in df.iterrows(): table_name = str(row['table_name']).strip() column_name = str(row['column_name']).strip() if not table_name or not column_name: continue field = FieldInfo( raw_name=column_name, display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None, type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)', comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None ) if table_name not in tables_dict: tables_dict[table_name] = [] tables_dict[table_name].append(field) # 构建表信息 for table_name, fields in tables_dict.items(): table_comment = None if 'table_comment' in df.columns: table_rows = df[df['table_name'] == table_name] if not table_rows.empty: first_row = table_rows.iloc[0] if pd.notna(first_row.get('table_comment')): table_comment = str(first_row['table_comment']).strip() table = TableInfo( raw_name=table_name, display_name=table_comment if table_comment else table_name, description=table_comment, fields=fields, field_count=len(fields) ) tables.append(table) except ValidationException: raise except Exception as e: logger.error(f"CSV 解析失败: {str(e)}") raise ValidationException(f"CSV 解析失败: {str(e)}") return tables # ==================== 主要服务类 ==================== class ParseSQLResultService: """SQL 结果解析服务""" @staticmethod async def parse( file_path: str, file_type: Optional[str] = None, project_id: str = None ) -> dict: """ 解析 SQL 结果文件 Args: file_path: 文件路径 file_type: 文件类型(可选) project_id: 项目ID Returns: 解析结果 """ start_time = time.time() try: # 验证文件存在 if not os.path.exists(file_path): raise ValidationException(f"文件不存在: {file_path}") file_name = Path(file_path).name file_size = os.path.getsize(file_path) # 自动检测文件类型 if not file_type: ext = Path(file_name).suffix.lower() if ext in ['.xlsx', '.xls']: file_type = 'excel' elif ext == '.csv': file_type = 'csv' else: raise ValidationException(f"不支持的文件类型: {ext}") logger.info( f"开始解析 SQL 结果 - 文件: {file_name}, 类型: {file_type}, " f"大小: {file_size} 字节" ) # 根据文件类型选择解析方法 if file_type == 'excel': tables = parse_sql_result_excel(file_path) elif file_type == 'csv': tables = parse_sql_result_csv(file_path) else: raise ValidationException(f"不支持的文件类型: {file_type}") # 计算统计信息 total_fields = sum(table.field_count for table in tables) parse_time = time.time() - start_time # 构建响应数据 response_data = { "tables": [table.dict() for table in tables], "total_tables": len(tables), "total_fields": total_fields, "parse_time": round(parse_time, 2), "file_info": FileInfo( file_name=file_name, file_size=file_size, file_type=file_type ).dict() } logger.info( f"SQL 结果解析成功 - 表数: {len(tables)}, 字段数: {total_fields}, " f"耗时: {parse_time:.2f}秒" ) return response_data except ValidationException: raise except Exception as e: logger.exception(f"SQL 结果解析失败: {str(e)}") raise ValidationException(f"SQL 结果解析失败: {str(e)}")