325 lines
10 KiB
Python
325 lines
10 KiB
Python
"""
|
||
文档解析服务
|
||
"""
|
||
import time
|
||
import os
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
import pandas as pd
|
||
from docx import Document
|
||
import pdfplumber
|
||
|
||
from app.schemas.parse_document import (
|
||
TableInfo,
|
||
FieldInfo,
|
||
FileInfo,
|
||
)
|
||
from app.utils.logger import logger
|
||
from app.core.exceptions import ValidationException
|
||
|
||
|
||
# ==================== 文件解析函数 ====================
|
||
|
||
def parse_excel(file_path: str) -> List[TableInfo]:
|
||
"""
|
||
解析 Excel 文件
|
||
|
||
Args:
|
||
file_path: Excel 文件路径
|
||
|
||
Returns:
|
||
解析出的表列表
|
||
"""
|
||
tables = []
|
||
try:
|
||
# 读取 Excel 文件
|
||
df_dict = pd.read_excel(file_path, sheet_name=None)
|
||
|
||
for sheet_name, df in df_dict.items():
|
||
# 跳过空 Sheet
|
||
if df.empty:
|
||
continue
|
||
|
||
fields = []
|
||
# 识别字段(假设第一行是表头)
|
||
for col_name in df.columns:
|
||
# 推断字段类型
|
||
col_type = str(df[col_name].dtype)
|
||
inferred_type = infer_field_type(col_type)
|
||
|
||
field = FieldInfo(
|
||
raw_name=str(col_name).strip(),
|
||
display_name=str(col_name).strip(),
|
||
type=inferred_type,
|
||
comment=None,
|
||
is_primary_key=False,
|
||
is_nullable=True,
|
||
default_value=None
|
||
)
|
||
fields.append(field)
|
||
|
||
if fields:
|
||
table = TableInfo(
|
||
raw_name=sheet_name,
|
||
display_name=sheet_name,
|
||
description=f"从 Excel Sheet '{sheet_name}' 解析",
|
||
fields=fields,
|
||
field_count=len(fields)
|
||
)
|
||
tables.append(table)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Excel 解析失败: {str(e)}")
|
||
raise ValidationException(f"Excel 解析失败: {str(e)}")
|
||
|
||
return tables
|
||
|
||
|
||
def parse_word(file_path: str) -> List[TableInfo]:
|
||
"""
|
||
解析 Word 文件
|
||
|
||
Args:
|
||
file_path: Word 文件路径
|
||
|
||
Returns:
|
||
解析出的表列表
|
||
"""
|
||
tables = []
|
||
try:
|
||
doc = Document(file_path)
|
||
|
||
# 遍历文档中的表格
|
||
for table_idx, table in enumerate(doc.tables):
|
||
fields = []
|
||
|
||
# 假设第一行是表头,后续行是字段信息
|
||
if len(table.rows) < 2:
|
||
continue
|
||
|
||
# 获取表头
|
||
header_cells = [cell.text.strip() for cell in table.rows[0].cells]
|
||
|
||
# 识别字段(假设有三列:字段名、类型、注释)
|
||
for row in table.rows[1:]:
|
||
if len(row.cells) >= 2:
|
||
field_name = row.cells[0].text.strip()
|
||
field_type = row.cells[1].text.strip() if len(row.cells) > 1 else "varchar(255)"
|
||
field_comment = row.cells[2].text.strip() if len(row.cells) > 2 else None
|
||
|
||
if field_name:
|
||
field = FieldInfo(
|
||
raw_name=field_name,
|
||
display_name=field_comment if field_comment else field_name,
|
||
type=field_type if field_type else "varchar(255)",
|
||
comment=field_comment,
|
||
is_primary_key=False,
|
||
is_nullable=True,
|
||
default_value=None
|
||
)
|
||
fields.append(field)
|
||
|
||
if fields:
|
||
table_info = TableInfo(
|
||
raw_name=f"table_{table_idx + 1}",
|
||
display_name=f"表{table_idx + 1}",
|
||
description=f"从 Word 文档第 {table_idx + 1} 个表格解析",
|
||
fields=fields,
|
||
field_count=len(fields)
|
||
)
|
||
tables.append(table_info)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Word 解析失败: {str(e)}")
|
||
raise ValidationException(f"Word 解析失败: {str(e)}")
|
||
|
||
return tables
|
||
|
||
|
||
def parse_pdf(file_path: str) -> List[TableInfo]:
|
||
"""
|
||
解析 PDF 文件
|
||
|
||
Args:
|
||
file_path: PDF 文件路径
|
||
|
||
Returns:
|
||
解析出的表列表
|
||
"""
|
||
tables = []
|
||
try:
|
||
with pdfplumber.open(file_path) as pdf:
|
||
for page_idx, page in enumerate(pdf.pages):
|
||
# 提取表格
|
||
page_tables = page.extract_tables()
|
||
|
||
for table_idx, table in enumerate(page_tables):
|
||
if table and len(table) > 1:
|
||
fields = []
|
||
|
||
# 假设第一行是表头
|
||
header_cells = [str(cell).strip() if cell else "" for cell in table[0]]
|
||
|
||
# 识别字段
|
||
for row in table[1:]:
|
||
if len(row) >= 2:
|
||
field_name = str(row[0]).strip() if row[0] else ""
|
||
field_type = str(row[1]).strip() if len(row) > 1 and row[1] else "varchar(255)"
|
||
field_comment = str(row[2]).strip() if len(row) > 2 and row[2] else None
|
||
|
||
if field_name:
|
||
field = FieldInfo(
|
||
raw_name=field_name,
|
||
display_name=field_comment if field_comment else field_name,
|
||
type=field_type if field_type else "varchar(255)",
|
||
comment=field_comment,
|
||
is_primary_key=False,
|
||
is_nullable=True,
|
||
default_value=None
|
||
)
|
||
fields.append(field)
|
||
|
||
if fields:
|
||
table_info = TableInfo(
|
||
raw_name=f"table_{page_idx + 1}_{table_idx + 1}",
|
||
display_name=f"表{page_idx + 1}-{table_idx + 1}",
|
||
description=f"从 PDF 第 {page_idx + 1} 页第 {table_idx + 1} 个表格解析",
|
||
fields=fields,
|
||
field_count=len(fields)
|
||
)
|
||
tables.append(table_info)
|
||
|
||
except Exception as e:
|
||
logger.error(f"PDF 解析失败: {str(e)}")
|
||
raise ValidationException(f"PDF 解析失败: {str(e)}")
|
||
|
||
return tables
|
||
|
||
|
||
def infer_field_type(pd_type: str) -> str:
|
||
"""
|
||
根据 pandas 类型推断数据库字段类型
|
||
|
||
Args:
|
||
pd_type: pandas 数据类型
|
||
|
||
Returns:
|
||
数据库字段类型
|
||
"""
|
||
type_mapping = {
|
||
'object': 'varchar(255)',
|
||
'int64': 'bigint',
|
||
'int32': 'int',
|
||
'int16': 'smallint',
|
||
'int8': 'tinyint',
|
||
'float64': 'double',
|
||
'float32': 'float',
|
||
'bool': 'tinyint(1)',
|
||
'datetime64[ns]': 'datetime',
|
||
'timedelta[ns]': 'time',
|
||
}
|
||
return type_mapping.get(str(pd_type), 'varchar(255)')
|
||
|
||
|
||
def detect_file_type(file_name: str) -> str:
|
||
"""
|
||
根据文件扩展名检测文件类型
|
||
|
||
Args:
|
||
file_name: 文件名
|
||
|
||
Returns:
|
||
文件类型:excel/word/pdf
|
||
"""
|
||
ext = Path(file_name).suffix.lower()
|
||
if ext in ['.xlsx', '.xls']:
|
||
return 'excel'
|
||
elif ext in ['.docx', '.doc']:
|
||
return 'word'
|
||
elif ext == '.pdf':
|
||
return 'pdf'
|
||
else:
|
||
raise ValidationException(f"不支持的文件类型: {ext}")
|
||
|
||
|
||
# ==================== 主要服务类 ====================
|
||
|
||
class ParseDocumentService:
|
||
"""文档解析服务"""
|
||
|
||
@staticmethod
|
||
async def parse(
|
||
file_path: str,
|
||
file_type: Optional[str] = None,
|
||
project_id: str = None
|
||
) -> dict:
|
||
"""
|
||
解析文档
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
file_type: 文件类型(可选)
|
||
project_id: 项目ID
|
||
|
||
Returns:
|
||
解析结果
|
||
"""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# 验证文件存在
|
||
if not os.path.exists(file_path):
|
||
raise ValidationException(f"文件不存在: {file_path}")
|
||
|
||
file_name = Path(file_path).name
|
||
file_size = os.path.getsize(file_path)
|
||
|
||
# 自动检测文件类型
|
||
if not file_type:
|
||
file_type = detect_file_type(file_name)
|
||
|
||
logger.info(
|
||
f"开始解析文档 - 文件: {file_name}, 类型: {file_type}, "
|
||
f"大小: {file_size} 字节"
|
||
)
|
||
|
||
# 根据文件类型选择解析方法
|
||
if file_type == 'excel':
|
||
tables = parse_excel(file_path)
|
||
elif file_type == 'word':
|
||
tables = parse_word(file_path)
|
||
elif file_type == 'pdf':
|
||
tables = parse_pdf(file_path)
|
||
else:
|
||
raise ValidationException(f"不支持的文件类型: {file_type}")
|
||
|
||
# 计算统计信息
|
||
total_fields = sum(table.field_count for table in tables)
|
||
parse_time = time.time() - start_time
|
||
|
||
# 构建响应数据
|
||
response_data = {
|
||
"tables": [table.dict() for table in tables],
|
||
"total_tables": len(tables),
|
||
"total_fields": total_fields,
|
||
"parse_time": round(parse_time, 2),
|
||
"file_info": FileInfo(
|
||
file_name=file_name,
|
||
file_size=file_size,
|
||
file_type=file_type
|
||
).dict()
|
||
}
|
||
|
||
logger.info(
|
||
f"文档解析成功 - 表数: {len(tables)}, 字段数: {total_fields}, "
|
||
f"耗时: {parse_time:.2f}秒"
|
||
)
|
||
|
||
return response_data
|
||
|
||
except ValidationException:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception(f"文档解析失败: {str(e)}")
|
||
raise ValidationException(f"文档解析失败: {str(e)}")
|