finyx_data_ai/app/services/parse_business_tables_service.py
2026-01-11 07:48:19 +08:00

280 lines
8.6 KiB
Python

"""
业务表解析服务
"""
import time
import os
from pathlib import Path
from typing import List
import pandas as pd
from app.schemas.parse_business_tables import (
TableInfo,
FieldInfo,
ProcessedFile,
FailedFile,
FileInfo,
)
from app.utils.logger import logger
from app.core.exceptions import ValidationException
# ==================== 字段类型推断 ====================
def infer_field_type(pd_type: str) -> str:
"""
根据 pandas 类型推断数据库字段类型
Args:
pd_type: pandas 数据类型
Returns:
数据库字段类型
"""
type_mapping = {
'object': 'varchar(255)',
'int64': 'bigint',
'int32': 'int',
'int16': 'smallint',
'int8': 'tinyint',
'float64': 'double',
'float32': 'float',
'bool': 'tinyint(1)',
'datetime64[ns]': 'datetime',
'timedelta[ns]': 'time',
}
return type_mapping.get(str(pd_type), 'varchar(255)')
# ==================== 文件解析函数 ====================
def parse_excel_file(file_path: str, file_name: str) -> List[TableInfo]:
"""
解析单个 Excel 文件
Args:
file_path: Excel 文件路径
file_name: 文件名
Returns:
解析出的表列表
"""
tables = []
try:
# 读取所有 Sheet
excel_file = pd.ExcelFile(file_path)
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 跳过空 Sheet
if df.empty:
continue
# 识别字段
fields = []
for col in df.columns:
# 推断字段类型
col_type = str(df[col].dtype)
inferred_type = infer_field_type(col_type)
field = FieldInfo(
raw_name=str(col).strip(),
display_name=str(col).strip(),
type=inferred_type,
comment=None,
inferred_type=inferred_type
)
fields.append(field)
if fields:
# 使用 Sheet 名称或文件名作为表名
table_name = sheet_name.lower().replace(' ', '_').replace('-', '_')
if not table_name:
table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_')
table = TableInfo(
raw_name=table_name,
display_name=sheet_name,
description=f"从文件 {file_name} 的 Sheet '{sheet_name}' 解析",
source_file=file_name,
fields=fields,
field_count=len(fields),
row_count=len(df)
)
tables.append(table)
except Exception as e:
logger.error(f"Excel 文件 {file_name} 解析失败: {str(e)}")
raise ValidationException(f"Excel 文件 {file_name} 解析失败: {str(e)}")
return tables
def parse_csv_file(file_path: str, file_name: str) -> List[TableInfo]:
"""
解析单个 CSV 文件
Args:
file_path: CSV 文件路径
file_name: 文件名
Returns:
解析出的表列表
"""
tables = []
try:
# 尝试多种编码
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
df = None
for encoding in encodings:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
if df is None:
raise ValidationException("无法解析 CSV 文件,请检查文件编码")
if df.empty:
return tables
# 识别字段
fields = []
for col in df.columns:
col_type = str(df[col].dtype)
inferred_type = infer_field_type(col_type)
field = FieldInfo(
raw_name=str(col).strip(),
display_name=str(col).strip(),
type=inferred_type,
comment=None,
inferred_type=inferred_type
)
fields.append(field)
if fields:
# 使用文件名作为表名
table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_')
table = TableInfo(
raw_name=table_name,
display_name=Path(file_name).stem,
description=f"从文件 {file_name} 解析",
source_file=file_name,
fields=fields,
field_count=len(fields),
row_count=len(df)
)
tables.append(table)
except ValidationException:
raise
except Exception as e:
logger.error(f"CSV 文件 {file_name} 解析失败: {str(e)}")
raise ValidationException(f"CSV 文件 {file_name} 解析失败: {str(e)}")
return tables
# ==================== 主要服务类 ====================
class ParseBusinessTablesService:
"""业务表解析服务"""
@staticmethod
async def parse(
file_paths: List[str],
project_id: str = None
) -> dict:
"""
批量解析业务表文件
Args:
file_paths: 文件路径列表
project_id: 项目ID
Returns:
解析结果
"""
start_time = time.time()
logger.info(
f"开始批量解析业务表 - 文件数: {len(file_paths)}, "
f"项目ID: {project_id}"
)
all_tables = []
processed_files = []
failed_files = []
try:
# 处理每个文件
for file_path in file_paths:
file_name = Path(file_path).name
file_size = os.path.getsize(file_path)
try:
# 根据文件扩展名选择解析方法
ext = Path(file_name).suffix.lower()
if ext in ['.xlsx', '.xls']:
tables = parse_excel_file(file_path, file_name)
elif ext == '.csv':
tables = parse_csv_file(file_path, file_name)
else:
failed_files.append({
"file_name": file_name,
"error": f"不支持的文件类型: {ext}"
})
continue
all_tables.extend(tables)
processed_files.append({
"file_name": file_name,
"file_size": file_size,
"tables_extracted": len(tables),
"status": "success"
})
# 清理临时文件(如果需要)
# 注意:这里不删除原始文件,因为文件路径是由调用方提供的
except Exception as e:
failed_files.append({
"file_name": file_name,
"error": str(e)
})
# 计算统计信息
total_fields = sum(table.field_count for table in all_tables)
parse_time = time.time() - start_time
# 构建响应数据
response_data = {
"tables": [table.dict() for table in all_tables],
"total_tables": len(all_tables),
"total_fields": total_fields,
"total_files": len(file_paths),
"success_files": len(processed_files),
"failed_files": failed_files,
"parse_time": round(parse_time, 2),
"file_info": {
"processed_files": processed_files
}
}
logger.info(
f"业务表解析完成 - 成功: {len(processed_files)}/{len(file_paths)}, "
f"表数: {len(all_tables)}, 字段数: {total_fields}, "
f"耗时: {parse_time:.2f}"
)
return response_data
except Exception as e:
logger.exception(f"业务表解析失败: {str(e)}")
raise ValidationException(f"业务表解析失败: {str(e)}")