finyx_data_ai/app/services/parse_sql_result_service.py
2026-01-11 07:48:19 +08:00

294 lines
9.9 KiB
Python

"""
SQL 结果解析服务
"""
import time
import os
from pathlib import Path
from typing import List, Dict, Optional
import pandas as pd
from app.schemas.parse_sql_result import (
TableInfo,
FieldInfo,
FileInfo,
)
from app.utils.logger import logger
from app.core.exceptions import ValidationException
# ==================== 列名映射 ====================
COLUMN_MAPPING = {
# 表英文名
'表英文名': 'table_name',
'TABLE_NAME': 'table_name',
'table_name': 'table_name',
# 表中文名/描述
'表中文名/描述': 'table_comment',
'TABLE_COMMENT': 'table_comment',
'table_comment': 'table_comment',
# 字段英文名
'字段英文名': 'column_name',
'COLUMN_NAME': 'column_name',
'column_name': 'column_name',
# 字段中文名
'字段中文名': 'column_comment',
'COLUMN_COMMENT': 'column_comment',
'column_comment': 'column_comment',
# 字段类型
'字段类型': 'column_type',
'COLUMN_TYPE': 'column_type',
'column_type': 'column_type',
}
# ==================== 文件解析函数 ====================
def parse_sql_result_excel(file_path: str) -> List[TableInfo]:
"""
解析 Excel 格式的 SQL 结果
Args:
file_path: Excel 文件路径
Returns:
解析出的表列表
"""
tables = []
try:
# 读取 Excel 文件
df = pd.read_excel(file_path)
# 标准化列名
df.columns = df.columns.str.strip()
df = df.rename(columns=COLUMN_MAPPING)
# 验证必要列是否存在
required_columns = ['table_name', 'column_name', 'column_type']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValidationException(f"缺少必要列: {', '.join(missing_columns)}")
# 清理数据(去除空值)
df = df.dropna(subset=['table_name', 'column_name'])
# 按表名分组
tables_dict: Dict[str, List[FieldInfo]] = {}
for _, row in df.iterrows():
table_name = str(row['table_name']).strip()
column_name = str(row['column_name']).strip()
if not table_name or not column_name:
continue
# 获取字段信息
field = FieldInfo(
raw_name=column_name,
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
)
if table_name not in tables_dict:
tables_dict[table_name] = []
tables_dict[table_name].append(field)
# 构建表信息
for table_name, fields in tables_dict.items():
# 获取表的描述信息(取第一个字段的表描述,或使用表名)
table_comment = None
if 'table_comment' in df.columns:
table_rows = df[df['table_name'] == table_name]
if not table_rows.empty:
first_row = table_rows.iloc[0]
if pd.notna(first_row.get('table_comment')):
table_comment = str(first_row['table_comment']).strip()
table = TableInfo(
raw_name=table_name,
display_name=table_comment if table_comment else table_name,
description=table_comment,
fields=fields,
field_count=len(fields)
)
tables.append(table)
except Exception as e:
logger.error(f"Excel 解析失败: {str(e)}")
raise ValidationException(f"Excel 解析失败: {str(e)}")
return tables
def parse_sql_result_csv(file_path: str) -> List[TableInfo]:
"""
解析 CSV 格式的 SQL 结果
Args:
file_path: CSV 文件路径
Returns:
解析出的表列表
"""
tables = []
try:
# 尝试多种编码
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
df = None
for encoding in encodings:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
if df is None:
raise ValidationException("无法解析 CSV 文件,请检查文件编码")
# 标准化列名
df.columns = df.columns.str.strip()
df = df.rename(columns=COLUMN_MAPPING)
# 验证必要列
required_columns = ['table_name', 'column_name', 'column_type']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValidationException(f"缺少必要列: {', '.join(missing_columns)}")
# 清理数据
df = df.dropna(subset=['table_name', 'column_name'])
# 按表名分组
tables_dict: Dict[str, List[FieldInfo]] = {}
for _, row in df.iterrows():
table_name = str(row['table_name']).strip()
column_name = str(row['column_name']).strip()
if not table_name or not column_name:
continue
field = FieldInfo(
raw_name=column_name,
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
)
if table_name not in tables_dict:
tables_dict[table_name] = []
tables_dict[table_name].append(field)
# 构建表信息
for table_name, fields in tables_dict.items():
table_comment = None
if 'table_comment' in df.columns:
table_rows = df[df['table_name'] == table_name]
if not table_rows.empty:
first_row = table_rows.iloc[0]
if pd.notna(first_row.get('table_comment')):
table_comment = str(first_row['table_comment']).strip()
table = TableInfo(
raw_name=table_name,
display_name=table_comment if table_comment else table_name,
description=table_comment,
fields=fields,
field_count=len(fields)
)
tables.append(table)
except ValidationException:
raise
except Exception as e:
logger.error(f"CSV 解析失败: {str(e)}")
raise ValidationException(f"CSV 解析失败: {str(e)}")
return tables
# ==================== 主要服务类 ====================
class ParseSQLResultService:
"""SQL 结果解析服务"""
@staticmethod
async def parse(
file_path: str,
file_type: Optional[str] = None,
project_id: str = None
) -> dict:
"""
解析 SQL 结果文件
Args:
file_path: 文件路径
file_type: 文件类型(可选)
project_id: 项目ID
Returns:
解析结果
"""
start_time = time.time()
try:
# 验证文件存在
if not os.path.exists(file_path):
raise ValidationException(f"文件不存在: {file_path}")
file_name = Path(file_path).name
file_size = os.path.getsize(file_path)
# 自动检测文件类型
if not file_type:
ext = Path(file_name).suffix.lower()
if ext in ['.xlsx', '.xls']:
file_type = 'excel'
elif ext == '.csv':
file_type = 'csv'
else:
raise ValidationException(f"不支持的文件类型: {ext}")
logger.info(
f"开始解析 SQL 结果 - 文件: {file_name}, 类型: {file_type}, "
f"大小: {file_size} 字节"
)
# 根据文件类型选择解析方法
if file_type == 'excel':
tables = parse_sql_result_excel(file_path)
elif file_type == 'csv':
tables = parse_sql_result_csv(file_path)
else:
raise ValidationException(f"不支持的文件类型: {file_type}")
# 计算统计信息
total_fields = sum(table.field_count for table in tables)
parse_time = time.time() - start_time
# 构建响应数据
response_data = {
"tables": [table.dict() for table in tables],
"total_tables": len(tables),
"total_fields": total_fields,
"parse_time": round(parse_time, 2),
"file_info": FileInfo(
file_name=file_name,
file_size=file_size,
file_type=file_type
).dict()
}
logger.info(
f"SQL 结果解析成功 - 表数: {len(tables)}, 字段数: {total_fields}, "
f"耗时: {parse_time:.2f}"
)
return response_data
except ValidationException:
raise
except Exception as e:
logger.exception(f"SQL 结果解析失败: {str(e)}")
raise ValidationException(f"SQL 结果解析失败: {str(e)}")