294 lines
9.9 KiB
Python
294 lines
9.9 KiB
Python
"""
|
|
SQL 结果解析服务
|
|
"""
|
|
import time
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
import pandas as pd
|
|
|
|
from app.schemas.parse_sql_result import (
|
|
TableInfo,
|
|
FieldInfo,
|
|
FileInfo,
|
|
)
|
|
from app.utils.logger import logger
|
|
from app.core.exceptions import ValidationException
|
|
|
|
|
|
# ==================== 列名映射 ====================
|
|
|
|
COLUMN_MAPPING = {
|
|
# 表英文名
|
|
'表英文名': 'table_name',
|
|
'TABLE_NAME': 'table_name',
|
|
'table_name': 'table_name',
|
|
# 表中文名/描述
|
|
'表中文名/描述': 'table_comment',
|
|
'TABLE_COMMENT': 'table_comment',
|
|
'table_comment': 'table_comment',
|
|
# 字段英文名
|
|
'字段英文名': 'column_name',
|
|
'COLUMN_NAME': 'column_name',
|
|
'column_name': 'column_name',
|
|
# 字段中文名
|
|
'字段中文名': 'column_comment',
|
|
'COLUMN_COMMENT': 'column_comment',
|
|
'column_comment': 'column_comment',
|
|
# 字段类型
|
|
'字段类型': 'column_type',
|
|
'COLUMN_TYPE': 'column_type',
|
|
'column_type': 'column_type',
|
|
}
|
|
|
|
|
|
# ==================== 文件解析函数 ====================
|
|
|
|
def parse_sql_result_excel(file_path: str) -> List[TableInfo]:
|
|
"""
|
|
解析 Excel 格式的 SQL 结果
|
|
|
|
Args:
|
|
file_path: Excel 文件路径
|
|
|
|
Returns:
|
|
解析出的表列表
|
|
"""
|
|
tables = []
|
|
try:
|
|
# 读取 Excel 文件
|
|
df = pd.read_excel(file_path)
|
|
|
|
# 标准化列名
|
|
df.columns = df.columns.str.strip()
|
|
df = df.rename(columns=COLUMN_MAPPING)
|
|
|
|
# 验证必要列是否存在
|
|
required_columns = ['table_name', 'column_name', 'column_type']
|
|
missing_columns = [col for col in required_columns if col not in df.columns]
|
|
if missing_columns:
|
|
raise ValidationException(f"缺少必要列: {', '.join(missing_columns)}")
|
|
|
|
# 清理数据(去除空值)
|
|
df = df.dropna(subset=['table_name', 'column_name'])
|
|
|
|
# 按表名分组
|
|
tables_dict: Dict[str, List[FieldInfo]] = {}
|
|
for _, row in df.iterrows():
|
|
table_name = str(row['table_name']).strip()
|
|
column_name = str(row['column_name']).strip()
|
|
|
|
if not table_name or not column_name:
|
|
continue
|
|
|
|
# 获取字段信息
|
|
field = FieldInfo(
|
|
raw_name=column_name,
|
|
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
|
|
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
|
|
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
|
|
)
|
|
|
|
if table_name not in tables_dict:
|
|
tables_dict[table_name] = []
|
|
tables_dict[table_name].append(field)
|
|
|
|
# 构建表信息
|
|
for table_name, fields in tables_dict.items():
|
|
# 获取表的描述信息(取第一个字段的表描述,或使用表名)
|
|
table_comment = None
|
|
if 'table_comment' in df.columns:
|
|
table_rows = df[df['table_name'] == table_name]
|
|
if not table_rows.empty:
|
|
first_row = table_rows.iloc[0]
|
|
if pd.notna(first_row.get('table_comment')):
|
|
table_comment = str(first_row['table_comment']).strip()
|
|
|
|
table = TableInfo(
|
|
raw_name=table_name,
|
|
display_name=table_comment if table_comment else table_name,
|
|
description=table_comment,
|
|
fields=fields,
|
|
field_count=len(fields)
|
|
)
|
|
tables.append(table)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Excel 解析失败: {str(e)}")
|
|
raise ValidationException(f"Excel 解析失败: {str(e)}")
|
|
|
|
return tables
|
|
|
|
|
|
def parse_sql_result_csv(file_path: str) -> List[TableInfo]:
|
|
"""
|
|
解析 CSV 格式的 SQL 结果
|
|
|
|
Args:
|
|
file_path: CSV 文件路径
|
|
|
|
Returns:
|
|
解析出的表列表
|
|
"""
|
|
tables = []
|
|
try:
|
|
# 尝试多种编码
|
|
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
|
|
df = None
|
|
|
|
for encoding in encodings:
|
|
try:
|
|
df = pd.read_csv(file_path, encoding=encoding)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
if df is None:
|
|
raise ValidationException("无法解析 CSV 文件,请检查文件编码")
|
|
|
|
# 标准化列名
|
|
df.columns = df.columns.str.strip()
|
|
df = df.rename(columns=COLUMN_MAPPING)
|
|
|
|
# 验证必要列
|
|
required_columns = ['table_name', 'column_name', 'column_type']
|
|
missing_columns = [col for col in required_columns if col not in df.columns]
|
|
if missing_columns:
|
|
raise ValidationException(f"缺少必要列: {', '.join(missing_columns)}")
|
|
|
|
# 清理数据
|
|
df = df.dropna(subset=['table_name', 'column_name'])
|
|
|
|
# 按表名分组
|
|
tables_dict: Dict[str, List[FieldInfo]] = {}
|
|
for _, row in df.iterrows():
|
|
table_name = str(row['table_name']).strip()
|
|
column_name = str(row['column_name']).strip()
|
|
|
|
if not table_name or not column_name:
|
|
continue
|
|
|
|
field = FieldInfo(
|
|
raw_name=column_name,
|
|
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
|
|
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
|
|
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
|
|
)
|
|
|
|
if table_name not in tables_dict:
|
|
tables_dict[table_name] = []
|
|
tables_dict[table_name].append(field)
|
|
|
|
# 构建表信息
|
|
for table_name, fields in tables_dict.items():
|
|
table_comment = None
|
|
if 'table_comment' in df.columns:
|
|
table_rows = df[df['table_name'] == table_name]
|
|
if not table_rows.empty:
|
|
first_row = table_rows.iloc[0]
|
|
if pd.notna(first_row.get('table_comment')):
|
|
table_comment = str(first_row['table_comment']).strip()
|
|
|
|
table = TableInfo(
|
|
raw_name=table_name,
|
|
display_name=table_comment if table_comment else table_name,
|
|
description=table_comment,
|
|
fields=fields,
|
|
field_count=len(fields)
|
|
)
|
|
tables.append(table)
|
|
|
|
except ValidationException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"CSV 解析失败: {str(e)}")
|
|
raise ValidationException(f"CSV 解析失败: {str(e)}")
|
|
|
|
return tables
|
|
|
|
|
|
# ==================== 主要服务类 ====================
|
|
|
|
class ParseSQLResultService:
|
|
"""SQL 结果解析服务"""
|
|
|
|
@staticmethod
|
|
async def parse(
|
|
file_path: str,
|
|
file_type: Optional[str] = None,
|
|
project_id: str = None
|
|
) -> dict:
|
|
"""
|
|
解析 SQL 结果文件
|
|
|
|
Args:
|
|
file_path: 文件路径
|
|
file_type: 文件类型(可选)
|
|
project_id: 项目ID
|
|
|
|
Returns:
|
|
解析结果
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# 验证文件存在
|
|
if not os.path.exists(file_path):
|
|
raise ValidationException(f"文件不存在: {file_path}")
|
|
|
|
file_name = Path(file_path).name
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
# 自动检测文件类型
|
|
if not file_type:
|
|
ext = Path(file_name).suffix.lower()
|
|
if ext in ['.xlsx', '.xls']:
|
|
file_type = 'excel'
|
|
elif ext == '.csv':
|
|
file_type = 'csv'
|
|
else:
|
|
raise ValidationException(f"不支持的文件类型: {ext}")
|
|
|
|
logger.info(
|
|
f"开始解析 SQL 结果 - 文件: {file_name}, 类型: {file_type}, "
|
|
f"大小: {file_size} 字节"
|
|
)
|
|
|
|
# 根据文件类型选择解析方法
|
|
if file_type == 'excel':
|
|
tables = parse_sql_result_excel(file_path)
|
|
elif file_type == 'csv':
|
|
tables = parse_sql_result_csv(file_path)
|
|
else:
|
|
raise ValidationException(f"不支持的文件类型: {file_type}")
|
|
|
|
# 计算统计信息
|
|
total_fields = sum(table.field_count for table in tables)
|
|
parse_time = time.time() - start_time
|
|
|
|
# 构建响应数据
|
|
response_data = {
|
|
"tables": [table.dict() for table in tables],
|
|
"total_tables": len(tables),
|
|
"total_fields": total_fields,
|
|
"parse_time": round(parse_time, 2),
|
|
"file_info": FileInfo(
|
|
file_name=file_name,
|
|
file_size=file_size,
|
|
file_type=file_type
|
|
).dict()
|
|
}
|
|
|
|
logger.info(
|
|
f"SQL 结果解析成功 - 表数: {len(tables)}, 字段数: {total_fields}, "
|
|
f"耗时: {parse_time:.2f}秒"
|
|
)
|
|
|
|
return response_data
|
|
|
|
except ValidationException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception(f"SQL 结果解析失败: {str(e)}")
|
|
raise ValidationException(f"SQL 结果解析失败: {str(e)}")
|