finyx_data_ai/app/services/parse_document_service.py
2026-01-11 07:48:19 +08:00

325 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文档解析服务
"""
import time
import os
from pathlib import Path
from typing import List, Optional
import pandas as pd
from docx import Document
import pdfplumber
from app.schemas.parse_document import (
TableInfo,
FieldInfo,
FileInfo,
)
from app.utils.logger import logger
from app.core.exceptions import ValidationException
# ==================== 文件解析函数 ====================
def parse_excel(file_path: str) -> List[TableInfo]:
"""
解析 Excel 文件
Args:
file_path: Excel 文件路径
Returns:
解析出的表列表
"""
tables = []
try:
# 读取 Excel 文件
df_dict = pd.read_excel(file_path, sheet_name=None)
for sheet_name, df in df_dict.items():
# 跳过空 Sheet
if df.empty:
continue
fields = []
# 识别字段(假设第一行是表头)
for col_name in df.columns:
# 推断字段类型
col_type = str(df[col_name].dtype)
inferred_type = infer_field_type(col_type)
field = FieldInfo(
raw_name=str(col_name).strip(),
display_name=str(col_name).strip(),
type=inferred_type,
comment=None,
is_primary_key=False,
is_nullable=True,
default_value=None
)
fields.append(field)
if fields:
table = TableInfo(
raw_name=sheet_name,
display_name=sheet_name,
description=f"从 Excel Sheet '{sheet_name}' 解析",
fields=fields,
field_count=len(fields)
)
tables.append(table)
except Exception as e:
logger.error(f"Excel 解析失败: {str(e)}")
raise ValidationException(f"Excel 解析失败: {str(e)}")
return tables
def parse_word(file_path: str) -> List[TableInfo]:
"""
解析 Word 文件
Args:
file_path: Word 文件路径
Returns:
解析出的表列表
"""
tables = []
try:
doc = Document(file_path)
# 遍历文档中的表格
for table_idx, table in enumerate(doc.tables):
fields = []
# 假设第一行是表头,后续行是字段信息
if len(table.rows) < 2:
continue
# 获取表头
header_cells = [cell.text.strip() for cell in table.rows[0].cells]
# 识别字段(假设有三列:字段名、类型、注释)
for row in table.rows[1:]:
if len(row.cells) >= 2:
field_name = row.cells[0].text.strip()
field_type = row.cells[1].text.strip() if len(row.cells) > 1 else "varchar(255)"
field_comment = row.cells[2].text.strip() if len(row.cells) > 2 else None
if field_name:
field = FieldInfo(
raw_name=field_name,
display_name=field_comment if field_comment else field_name,
type=field_type if field_type else "varchar(255)",
comment=field_comment,
is_primary_key=False,
is_nullable=True,
default_value=None
)
fields.append(field)
if fields:
table_info = TableInfo(
raw_name=f"table_{table_idx + 1}",
display_name=f"{table_idx + 1}",
description=f"从 Word 文档第 {table_idx + 1} 个表格解析",
fields=fields,
field_count=len(fields)
)
tables.append(table_info)
except Exception as e:
logger.error(f"Word 解析失败: {str(e)}")
raise ValidationException(f"Word 解析失败: {str(e)}")
return tables
def parse_pdf(file_path: str) -> List[TableInfo]:
"""
解析 PDF 文件
Args:
file_path: PDF 文件路径
Returns:
解析出的表列表
"""
tables = []
try:
with pdfplumber.open(file_path) as pdf:
for page_idx, page in enumerate(pdf.pages):
# 提取表格
page_tables = page.extract_tables()
for table_idx, table in enumerate(page_tables):
if table and len(table) > 1:
fields = []
# 假设第一行是表头
header_cells = [str(cell).strip() if cell else "" for cell in table[0]]
# 识别字段
for row in table[1:]:
if len(row) >= 2:
field_name = str(row[0]).strip() if row[0] else ""
field_type = str(row[1]).strip() if len(row) > 1 and row[1] else "varchar(255)"
field_comment = str(row[2]).strip() if len(row) > 2 and row[2] else None
if field_name:
field = FieldInfo(
raw_name=field_name,
display_name=field_comment if field_comment else field_name,
type=field_type if field_type else "varchar(255)",
comment=field_comment,
is_primary_key=False,
is_nullable=True,
default_value=None
)
fields.append(field)
if fields:
table_info = TableInfo(
raw_name=f"table_{page_idx + 1}_{table_idx + 1}",
display_name=f"{page_idx + 1}-{table_idx + 1}",
description=f"从 PDF 第 {page_idx + 1} 页第 {table_idx + 1} 个表格解析",
fields=fields,
field_count=len(fields)
)
tables.append(table_info)
except Exception as e:
logger.error(f"PDF 解析失败: {str(e)}")
raise ValidationException(f"PDF 解析失败: {str(e)}")
return tables
def infer_field_type(pd_type: str) -> str:
"""
根据 pandas 类型推断数据库字段类型
Args:
pd_type: pandas 数据类型
Returns:
数据库字段类型
"""
type_mapping = {
'object': 'varchar(255)',
'int64': 'bigint',
'int32': 'int',
'int16': 'smallint',
'int8': 'tinyint',
'float64': 'double',
'float32': 'float',
'bool': 'tinyint(1)',
'datetime64[ns]': 'datetime',
'timedelta[ns]': 'time',
}
return type_mapping.get(str(pd_type), 'varchar(255)')
def detect_file_type(file_name: str) -> str:
"""
根据文件扩展名检测文件类型
Args:
file_name: 文件名
Returns:
文件类型excel/word/pdf
"""
ext = Path(file_name).suffix.lower()
if ext in ['.xlsx', '.xls']:
return 'excel'
elif ext in ['.docx', '.doc']:
return 'word'
elif ext == '.pdf':
return 'pdf'
else:
raise ValidationException(f"不支持的文件类型: {ext}")
# ==================== 主要服务类 ====================
class ParseDocumentService:
"""文档解析服务"""
@staticmethod
async def parse(
file_path: str,
file_type: Optional[str] = None,
project_id: str = None
) -> dict:
"""
解析文档
Args:
file_path: 文件路径
file_type: 文件类型(可选)
project_id: 项目ID
Returns:
解析结果
"""
start_time = time.time()
try:
# 验证文件存在
if not os.path.exists(file_path):
raise ValidationException(f"文件不存在: {file_path}")
file_name = Path(file_path).name
file_size = os.path.getsize(file_path)
# 自动检测文件类型
if not file_type:
file_type = detect_file_type(file_name)
logger.info(
f"开始解析文档 - 文件: {file_name}, 类型: {file_type}, "
f"大小: {file_size} 字节"
)
# 根据文件类型选择解析方法
if file_type == 'excel':
tables = parse_excel(file_path)
elif file_type == 'word':
tables = parse_word(file_path)
elif file_type == 'pdf':
tables = parse_pdf(file_path)
else:
raise ValidationException(f"不支持的文件类型: {file_type}")
# 计算统计信息
total_fields = sum(table.field_count for table in tables)
parse_time = time.time() - start_time
# 构建响应数据
response_data = {
"tables": [table.dict() for table in tables],
"total_tables": len(tables),
"total_fields": total_fields,
"parse_time": round(parse_time, 2),
"file_info": FileInfo(
file_name=file_name,
file_size=file_size,
file_type=file_type
).dict()
}
logger.info(
f"文档解析成功 - 表数: {len(tables)}, 字段数: {total_fields}, "
f"耗时: {parse_time:.2f}"
)
return response_data
except ValidationException:
raise
except Exception as e:
logger.exception(f"文档解析失败: {str(e)}")
raise ValidationException(f"文档解析失败: {str(e)}")