""" 文档解析服务 """ import time import os from pathlib import Path from typing import List, Optional import pandas as pd from docx import Document import pdfplumber from app.schemas.parse_document import ( TableInfo, FieldInfo, FileInfo, ) from app.utils.logger import logger from app.core.exceptions import ValidationException # ==================== 文件解析函数 ==================== def parse_excel(file_path: str) -> List[TableInfo]: """ 解析 Excel 文件 Args: file_path: Excel 文件路径 Returns: 解析出的表列表 """ tables = [] try: # 读取 Excel 文件 df_dict = pd.read_excel(file_path, sheet_name=None) for sheet_name, df in df_dict.items(): # 跳过空 Sheet if df.empty: continue fields = [] # 识别字段(假设第一行是表头) for col_name in df.columns: # 推断字段类型 col_type = str(df[col_name].dtype) inferred_type = infer_field_type(col_type) field = FieldInfo( raw_name=str(col_name).strip(), display_name=str(col_name).strip(), type=inferred_type, comment=None, is_primary_key=False, is_nullable=True, default_value=None ) fields.append(field) if fields: table = TableInfo( raw_name=sheet_name, display_name=sheet_name, description=f"从 Excel Sheet '{sheet_name}' 解析", fields=fields, field_count=len(fields) ) tables.append(table) except Exception as e: logger.error(f"Excel 解析失败: {str(e)}") raise ValidationException(f"Excel 解析失败: {str(e)}") return tables def parse_word(file_path: str) -> List[TableInfo]: """ 解析 Word 文件 Args: file_path: Word 文件路径 Returns: 解析出的表列表 """ tables = [] try: doc = Document(file_path) # 遍历文档中的表格 for table_idx, table in enumerate(doc.tables): fields = [] # 假设第一行是表头,后续行是字段信息 if len(table.rows) < 2: continue # 获取表头 header_cells = [cell.text.strip() for cell in table.rows[0].cells] # 识别字段(假设有三列:字段名、类型、注释) for row in table.rows[1:]: if len(row.cells) >= 2: field_name = row.cells[0].text.strip() field_type = row.cells[1].text.strip() if len(row.cells) > 1 else "varchar(255)" field_comment = row.cells[2].text.strip() if len(row.cells) > 2 else None if field_name: field = FieldInfo( raw_name=field_name, display_name=field_comment if field_comment else field_name, type=field_type if field_type else "varchar(255)", comment=field_comment, is_primary_key=False, is_nullable=True, default_value=None ) fields.append(field) if fields: table_info = TableInfo( raw_name=f"table_{table_idx + 1}", display_name=f"表{table_idx + 1}", description=f"从 Word 文档第 {table_idx + 1} 个表格解析", fields=fields, field_count=len(fields) ) tables.append(table_info) except Exception as e: logger.error(f"Word 解析失败: {str(e)}") raise ValidationException(f"Word 解析失败: {str(e)}") return tables def parse_pdf(file_path: str) -> List[TableInfo]: """ 解析 PDF 文件 Args: file_path: PDF 文件路径 Returns: 解析出的表列表 """ tables = [] try: with pdfplumber.open(file_path) as pdf: for page_idx, page in enumerate(pdf.pages): # 提取表格 page_tables = page.extract_tables() for table_idx, table in enumerate(page_tables): if table and len(table) > 1: fields = [] # 假设第一行是表头 header_cells = [str(cell).strip() if cell else "" for cell in table[0]] # 识别字段 for row in table[1:]: if len(row) >= 2: field_name = str(row[0]).strip() if row[0] else "" field_type = str(row[1]).strip() if len(row) > 1 and row[1] else "varchar(255)" field_comment = str(row[2]).strip() if len(row) > 2 and row[2] else None if field_name: field = FieldInfo( raw_name=field_name, display_name=field_comment if field_comment else field_name, type=field_type if field_type else "varchar(255)", comment=field_comment, is_primary_key=False, is_nullable=True, default_value=None ) fields.append(field) if fields: table_info = TableInfo( raw_name=f"table_{page_idx + 1}_{table_idx + 1}", display_name=f"表{page_idx + 1}-{table_idx + 1}", description=f"从 PDF 第 {page_idx + 1} 页第 {table_idx + 1} 个表格解析", fields=fields, field_count=len(fields) ) tables.append(table_info) except Exception as e: logger.error(f"PDF 解析失败: {str(e)}") raise ValidationException(f"PDF 解析失败: {str(e)}") return tables def infer_field_type(pd_type: str) -> str: """ 根据 pandas 类型推断数据库字段类型 Args: pd_type: pandas 数据类型 Returns: 数据库字段类型 """ type_mapping = { 'object': 'varchar(255)', 'int64': 'bigint', 'int32': 'int', 'int16': 'smallint', 'int8': 'tinyint', 'float64': 'double', 'float32': 'float', 'bool': 'tinyint(1)', 'datetime64[ns]': 'datetime', 'timedelta[ns]': 'time', } return type_mapping.get(str(pd_type), 'varchar(255)') def detect_file_type(file_name: str) -> str: """ 根据文件扩展名检测文件类型 Args: file_name: 文件名 Returns: 文件类型:excel/word/pdf """ ext = Path(file_name).suffix.lower() if ext in ['.xlsx', '.xls']: return 'excel' elif ext in ['.docx', '.doc']: return 'word' elif ext == '.pdf': return 'pdf' else: raise ValidationException(f"不支持的文件类型: {ext}") # ==================== 主要服务类 ==================== class ParseDocumentService: """文档解析服务""" @staticmethod async def parse( file_path: str, file_type: Optional[str] = None, project_id: str = None ) -> dict: """ 解析文档 Args: file_path: 文件路径 file_type: 文件类型(可选) project_id: 项目ID Returns: 解析结果 """ start_time = time.time() try: # 验证文件存在 if not os.path.exists(file_path): raise ValidationException(f"文件不存在: {file_path}") file_name = Path(file_path).name file_size = os.path.getsize(file_path) # 自动检测文件类型 if not file_type: file_type = detect_file_type(file_name) logger.info( f"开始解析文档 - 文件: {file_name}, 类型: {file_type}, " f"大小: {file_size} 字节" ) # 根据文件类型选择解析方法 if file_type == 'excel': tables = parse_excel(file_path) elif file_type == 'word': tables = parse_word(file_path) elif file_type == 'pdf': tables = parse_pdf(file_path) else: raise ValidationException(f"不支持的文件类型: {file_type}") # 计算统计信息 total_fields = sum(table.field_count for table in tables) parse_time = time.time() - start_time # 构建响应数据 response_data = { "tables": [table.dict() for table in tables], "total_tables": len(tables), "total_fields": total_fields, "parse_time": round(parse_time, 2), "file_info": FileInfo( file_name=file_name, file_size=file_size, file_type=file_type ).dict() } logger.info( f"文档解析成功 - 表数: {len(tables)}, 字段数: {total_fields}, " f"耗时: {parse_time:.2f}秒" ) return response_data except ValidationException: raise except Exception as e: logger.exception(f"文档解析失败: {str(e)}") raise ValidationException(f"文档解析失败: {str(e)}")