280 lines
8.6 KiB
Python
280 lines
8.6 KiB
Python
"""
|
|
业务表解析服务
|
|
"""
|
|
import time
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List
|
|
import pandas as pd
|
|
|
|
from app.schemas.parse_business_tables import (
|
|
TableInfo,
|
|
FieldInfo,
|
|
ProcessedFile,
|
|
FailedFile,
|
|
FileInfo,
|
|
)
|
|
from app.utils.logger import logger
|
|
from app.core.exceptions import ValidationException
|
|
|
|
|
|
# ==================== 字段类型推断 ====================
|
|
|
|
def infer_field_type(pd_type: str) -> str:
|
|
"""
|
|
根据 pandas 类型推断数据库字段类型
|
|
|
|
Args:
|
|
pd_type: pandas 数据类型
|
|
|
|
Returns:
|
|
数据库字段类型
|
|
"""
|
|
type_mapping = {
|
|
'object': 'varchar(255)',
|
|
'int64': 'bigint',
|
|
'int32': 'int',
|
|
'int16': 'smallint',
|
|
'int8': 'tinyint',
|
|
'float64': 'double',
|
|
'float32': 'float',
|
|
'bool': 'tinyint(1)',
|
|
'datetime64[ns]': 'datetime',
|
|
'timedelta[ns]': 'time',
|
|
}
|
|
return type_mapping.get(str(pd_type), 'varchar(255)')
|
|
|
|
|
|
# ==================== 文件解析函数 ====================
|
|
|
|
def parse_excel_file(file_path: str, file_name: str) -> List[TableInfo]:
|
|
"""
|
|
解析单个 Excel 文件
|
|
|
|
Args:
|
|
file_path: Excel 文件路径
|
|
file_name: 文件名
|
|
|
|
Returns:
|
|
解析出的表列表
|
|
"""
|
|
tables = []
|
|
try:
|
|
# 读取所有 Sheet
|
|
excel_file = pd.ExcelFile(file_path)
|
|
|
|
for sheet_name in excel_file.sheet_names:
|
|
df = pd.read_excel(file_path, sheet_name=sheet_name)
|
|
|
|
# 跳过空 Sheet
|
|
if df.empty:
|
|
continue
|
|
|
|
# 识别字段
|
|
fields = []
|
|
for col in df.columns:
|
|
# 推断字段类型
|
|
col_type = str(df[col].dtype)
|
|
inferred_type = infer_field_type(col_type)
|
|
|
|
field = FieldInfo(
|
|
raw_name=str(col).strip(),
|
|
display_name=str(col).strip(),
|
|
type=inferred_type,
|
|
comment=None,
|
|
inferred_type=inferred_type
|
|
)
|
|
fields.append(field)
|
|
|
|
if fields:
|
|
# 使用 Sheet 名称或文件名作为表名
|
|
table_name = sheet_name.lower().replace(' ', '_').replace('-', '_')
|
|
if not table_name:
|
|
table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_')
|
|
|
|
table = TableInfo(
|
|
raw_name=table_name,
|
|
display_name=sheet_name,
|
|
description=f"从文件 {file_name} 的 Sheet '{sheet_name}' 解析",
|
|
source_file=file_name,
|
|
fields=fields,
|
|
field_count=len(fields),
|
|
row_count=len(df)
|
|
)
|
|
tables.append(table)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Excel 文件 {file_name} 解析失败: {str(e)}")
|
|
raise ValidationException(f"Excel 文件 {file_name} 解析失败: {str(e)}")
|
|
|
|
return tables
|
|
|
|
|
|
def parse_csv_file(file_path: str, file_name: str) -> List[TableInfo]:
|
|
"""
|
|
解析单个 CSV 文件
|
|
|
|
Args:
|
|
file_path: CSV 文件路径
|
|
file_name: 文件名
|
|
|
|
Returns:
|
|
解析出的表列表
|
|
"""
|
|
tables = []
|
|
try:
|
|
# 尝试多种编码
|
|
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
|
|
df = None
|
|
|
|
for encoding in encodings:
|
|
try:
|
|
df = pd.read_csv(file_path, encoding=encoding)
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
if df is None:
|
|
raise ValidationException("无法解析 CSV 文件,请检查文件编码")
|
|
|
|
if df.empty:
|
|
return tables
|
|
|
|
# 识别字段
|
|
fields = []
|
|
for col in df.columns:
|
|
col_type = str(df[col].dtype)
|
|
inferred_type = infer_field_type(col_type)
|
|
|
|
field = FieldInfo(
|
|
raw_name=str(col).strip(),
|
|
display_name=str(col).strip(),
|
|
type=inferred_type,
|
|
comment=None,
|
|
inferred_type=inferred_type
|
|
)
|
|
fields.append(field)
|
|
|
|
if fields:
|
|
# 使用文件名作为表名
|
|
table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_')
|
|
|
|
table = TableInfo(
|
|
raw_name=table_name,
|
|
display_name=Path(file_name).stem,
|
|
description=f"从文件 {file_name} 解析",
|
|
source_file=file_name,
|
|
fields=fields,
|
|
field_count=len(fields),
|
|
row_count=len(df)
|
|
)
|
|
tables.append(table)
|
|
|
|
except ValidationException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"CSV 文件 {file_name} 解析失败: {str(e)}")
|
|
raise ValidationException(f"CSV 文件 {file_name} 解析失败: {str(e)}")
|
|
|
|
return tables
|
|
|
|
|
|
# ==================== 主要服务类 ====================
|
|
|
|
class ParseBusinessTablesService:
|
|
"""业务表解析服务"""
|
|
|
|
@staticmethod
|
|
async def parse(
|
|
file_paths: List[str],
|
|
project_id: str = None
|
|
) -> dict:
|
|
"""
|
|
批量解析业务表文件
|
|
|
|
Args:
|
|
file_paths: 文件路径列表
|
|
project_id: 项目ID
|
|
|
|
Returns:
|
|
解析结果
|
|
"""
|
|
start_time = time.time()
|
|
|
|
logger.info(
|
|
f"开始批量解析业务表 - 文件数: {len(file_paths)}, "
|
|
f"项目ID: {project_id}"
|
|
)
|
|
|
|
all_tables = []
|
|
processed_files = []
|
|
failed_files = []
|
|
|
|
try:
|
|
# 处理每个文件
|
|
for file_path in file_paths:
|
|
file_name = Path(file_path).name
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
try:
|
|
# 根据文件扩展名选择解析方法
|
|
ext = Path(file_name).suffix.lower()
|
|
|
|
if ext in ['.xlsx', '.xls']:
|
|
tables = parse_excel_file(file_path, file_name)
|
|
elif ext == '.csv':
|
|
tables = parse_csv_file(file_path, file_name)
|
|
else:
|
|
failed_files.append({
|
|
"file_name": file_name,
|
|
"error": f"不支持的文件类型: {ext}"
|
|
})
|
|
continue
|
|
|
|
all_tables.extend(tables)
|
|
processed_files.append({
|
|
"file_name": file_name,
|
|
"file_size": file_size,
|
|
"tables_extracted": len(tables),
|
|
"status": "success"
|
|
})
|
|
|
|
# 清理临时文件(如果需要)
|
|
# 注意:这里不删除原始文件,因为文件路径是由调用方提供的
|
|
|
|
except Exception as e:
|
|
failed_files.append({
|
|
"file_name": file_name,
|
|
"error": str(e)
|
|
})
|
|
|
|
# 计算统计信息
|
|
total_fields = sum(table.field_count for table in all_tables)
|
|
parse_time = time.time() - start_time
|
|
|
|
# 构建响应数据
|
|
response_data = {
|
|
"tables": [table.dict() for table in all_tables],
|
|
"total_tables": len(all_tables),
|
|
"total_fields": total_fields,
|
|
"total_files": len(file_paths),
|
|
"success_files": len(processed_files),
|
|
"failed_files": failed_files,
|
|
"parse_time": round(parse_time, 2),
|
|
"file_info": {
|
|
"processed_files": processed_files
|
|
}
|
|
}
|
|
|
|
logger.info(
|
|
f"业务表解析完成 - 成功: {len(processed_files)}/{len(file_paths)}, "
|
|
f"表数: {len(all_tables)}, 字段数: {total_fields}, "
|
|
f"耗时: {parse_time:.2f}秒"
|
|
)
|
|
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.exception(f"业务表解析失败: {str(e)}")
|
|
raise ValidationException(f"业务表解析失败: {str(e)}")
|