""" 文档生成服务 - 处理Word模板填充和MinIO文件上传 """ import os import re import tempfile from typing import Dict, List, Optional from datetime import datetime from pathlib import Path from docx import Document from minio import Minio from minio.error import S3Error import pymysql class DocumentService: """文档生成服务类""" def __init__(self): # MinIO配置 self.minio_config = { 'endpoint': os.getenv('MINIO_ENDPOINT', 'minio.datacubeworld.com:9000'), 'access_key': os.getenv('MINIO_ACCESS_KEY', 'JOLXFXny3avFSzB0uRA5'), 'secret_key': os.getenv('MINIO_SECRET_KEY', 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I'), 'secure': os.getenv('MINIO_SECURE', 'true').lower() == 'true' } self.bucket_name = os.getenv('MINIO_BUCKET', 'finyx') # 数据库配置 self.db_config = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } self.tenant_id = 615873064429507639 def get_connection(self): """获取数据库连接""" return pymysql.connect(**self.db_config) def get_minio_client(self): """获取MinIO客户端""" return Minio( self.minio_config['endpoint'], access_key=self.minio_config['access_key'], secret_key=self.minio_config['secret_key'], secure=self.minio_config['secure'] ) def get_file_config_by_id(self, file_id: int) -> Optional[Dict]: """ 根据文件ID获取文件配置 Args: file_id: 文件配置ID Returns: 文件配置信息,包含: id, name, file_path """ conn = self.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, name, file_path FROM f_polic_file_config WHERE id = %s AND tenant_id = %s AND state = 1 """ cursor.execute(sql, (file_id, self.tenant_id)) config = cursor.fetchone() if config: return { 'id': config['id'], 'name': config['name'], 'file_path': config['file_path'] } return None finally: cursor.close() conn.close() def download_template_from_minio(self, file_path: str) -> str: """ 从MinIO下载模板文件到临时目录 Args: file_path: MinIO中的相对路径,如 '/615873064429507639/TEMPLATE/2024/11/初步核实审批表模板.docx' Returns: 本地临时文件路径 """ # 检查file_path是否为None或空 if not file_path: raise Exception("模板文件路径不能为空,请检查数据库中模板配置的file_path字段") client = self.get_minio_client() # 创建临时文件 temp_dir = tempfile.gettempdir() temp_file = os.path.join(temp_dir, f"template_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx") try: # 从相对路径中提取对象名称(去掉开头的/) object_name = file_path.lstrip('/') # 下载文件 client.fget_object(self.bucket_name, object_name, temp_file) return temp_file except S3Error as e: raise Exception(f"从MinIO下载模板文件失败: {str(e)}") def fill_template(self, template_path: str, field_data: Dict[str, str]) -> str: """ 填充Word模板中的占位符 Args: template_path: 模板文件路径 field_data: 字段数据字典,格式: {'field_code': 'field_value'} Returns: 填充后的文档路径 """ try: print(f"[DEBUG] 开始填充模板: {template_path}") print(f"[DEBUG] 字段数据: {field_data}") # 打开模板文档 doc = Document(template_path) print(f"[DEBUG] 文档包含 {len(doc.paragraphs)} 个段落, {len(doc.tables)} 个表格") def replace_placeholder_in_paragraph(paragraph): """在段落中替换占位符(处理跨run的情况)""" try: # 获取段落完整文本 full_text = paragraph.text if not full_text: return # 检查是否有占位符需要替换 has_placeholder = False replaced_text = full_text replacement_count = 0 # 遍历所有字段,替换所有匹配的占位符(包括重复的) for field_code, field_value in field_data.items(): placeholder = f"{{{{{field_code}}}}}" # 使用循环替换所有匹配项(不仅仅是第一个) while placeholder in replaced_text: has_placeholder = True replacement_count += 1 # 替换占位符,如果值为空则替换为空字符串 replaced_text = replaced_text.replace(placeholder, str(field_value) if field_value else '', 1) print(f"[DEBUG] 替换占位符: {placeholder} -> '{field_value}' (在段落中)") # 如果有替换,使用安全的方式更新段落文本 if has_placeholder: print(f"[DEBUG] 段落替换了 {replacement_count} 个占位符: '{full_text[:50]}...' -> '{replaced_text[:50]}...'") try: # 方法1:直接设置text(推荐,会自动处理run) paragraph.text = replaced_text except Exception as e1: # 如果方法1失败,尝试方法2:手动处理run try: # 清空所有run paragraph.clear() # 添加新的run if replaced_text: paragraph.add_run(replaced_text) except Exception as e2: # 如果两种方法都失败,记录错误但继续 print(f"[WARN] 无法更新段落文本,方法1错误: {str(e1)}, 方法2错误: {str(e2)}") pass except Exception as e: # 如果单个段落处理失败,记录错误但继续处理其他段落 print(f"[WARN] 处理段落时出错: {str(e)}") import traceback print(traceback.format_exc()) pass # 统计替换信息 total_replacements = 0 replaced_placeholders = set() # 替换段落中的占位符 for para_idx, paragraph in enumerate(doc.paragraphs): before_text = paragraph.text replace_placeholder_in_paragraph(paragraph) after_text = paragraph.text if before_text != after_text: # 检查哪些占位符被替换了 for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in before_text and placeholder not in after_text: replaced_placeholders.add(field_code) total_replacements += before_text.count(placeholder) # 替换表格中的占位符 try: for table in doc.tables: if not table.rows: continue for row in table.rows: if not row.cells: continue for cell in row.cells: try: # 检查cell是否有paragraphs属性且不为空 if hasattr(cell, 'paragraphs'): # 安全地获取paragraphs列表 paragraphs = list(cell.paragraphs) if cell.paragraphs else [] for paragraph in paragraphs: before_text = paragraph.text replace_placeholder_in_paragraph(paragraph) after_text = paragraph.text if before_text != after_text: # 检查哪些占位符被替换了 for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in before_text and placeholder not in after_text: replaced_placeholders.add(field_code) total_replacements += before_text.count(placeholder) except Exception as e: # 如果单个单元格处理失败,记录错误但继续处理其他单元格 print(f"[WARN] 处理表格单元格时出错: {str(e)}") pass except Exception as e: # 如果表格处理失败,记录错误但继续保存文档 print(f"[WARN] 处理表格时出错: {str(e)}") pass # 验证是否还有未替换的占位符 remaining_placeholders = set() for paragraph in doc.paragraphs: text = paragraph.text for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in text: remaining_placeholders.add(field_code) # 检查表格中的占位符 for table in doc.tables: for row in table.rows: for cell in row.cells: if hasattr(cell, 'paragraphs'): for paragraph in cell.paragraphs: text = paragraph.text for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in text: remaining_placeholders.add(field_code) # 输出统计信息 print(f"[DEBUG] 占位符替换统计:") print(f" - 已替换的占位符: {sorted(replaced_placeholders)}") print(f" - 总替换次数: {total_replacements}") if remaining_placeholders: print(f" - ⚠️ 仍有未替换的占位符: {sorted(remaining_placeholders)}") else: print(f" - ✓ 所有占位符已成功替换") # 保存到临时文件 temp_dir = tempfile.gettempdir() output_file = os.path.join(temp_dir, f"filled_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx") doc.save(output_file) print(f"[DEBUG] 文档已保存到: {output_file}") return output_file except IndexError as e: # 索引越界错误,提供更详细的错误信息 import traceback error_detail = traceback.format_exc() raise Exception(f"填充模板失败: list index out of range. 详细信息: {str(e)}\n{error_detail}") except Exception as e: # 其他错误,提供详细的错误信息 import traceback error_detail = traceback.format_exc() raise Exception(f"填充模板失败: {str(e)}\n{error_detail}") def upload_to_minio(self, file_path: str, file_name: str) -> str: """ 上传文件到MinIO Args: file_path: 本地文件路径 file_name: 文件名称 Returns: MinIO中的相对路径 """ client = self.get_minio_client() try: # 生成MinIO对象路径(相对路径) now = datetime.now() # 使用日期路径组织文件 object_name = f"{self.tenant_id}/{now.strftime('%Y%m%d%H%M%S')}/{file_name}" # 上传文件 client.fput_object( self.bucket_name, object_name, file_path, content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document' ) # 返回相对路径(以/开头) return f"/{object_name}" except S3Error as e: raise Exception(f"上传文件到MinIO失败: {str(e)}") def generate_document(self, file_id: int, input_data: List[Dict], file_info: Dict) -> Dict: """ 生成文档 Args: file_id: 文件配置ID input_data: 输入数据列表,格式: [{'fieldCode': 'xxx', 'fieldValue': 'xxx'}] file_info: 文件信息,格式: {'fileId': 1, 'fileName': 'xxx.doc'} Returns: 生成结果,包含: filePath """ # 获取文件配置 file_config = self.get_file_config_by_id(file_id) if not file_config: # 提供更详细的错误信息 raise Exception( f"文件ID {file_id} 对应的模板不存在或未启用。" f"请通过查询 f_polic_file_config 表获取有效的文件ID," f"或访问 /api/file-configs 接口查看可用的文件配置列表。" ) # 检查file_path是否存在 file_path = file_config.get('file_path') if not file_path: raise Exception(f"文件ID {file_id} ({file_config.get('name', '')}) 的文件路径(file_path)为空,请检查数据库配置") # 将input_data转换为字典格式 field_data = {} for item in input_data: field_code = item.get('fieldCode', '') field_value = item.get('fieldValue', '') if field_code: field_data[field_code] = field_value or '' # 下载模板 template_path = None filled_doc_path = None try: template_path = self.download_template_from_minio(file_path) # 填充模板 filled_doc_path = self.fill_template(template_path, field_data) # 生成文档名称(.docx格式) # 优先使用file_info中的fileName,如果没有则使用数据库中的name original_file_name = file_info.get('fileName') or file_info.get('name') or file_config.get('name', 'generated.doc') print(f"[DEBUG] 原始文件名: {original_file_name}") print(f"[DEBUG] 字段数据用于生成文档名: {field_data}") generated_file_name = self.generate_document_name(original_file_name, field_data) print(f"[DEBUG] 生成的文档名: {generated_file_name}") # 上传到MinIO(使用生成的文档名) file_path = self.upload_to_minio(filled_doc_path, generated_file_name) return { 'filePath': file_path, 'fileName': generated_file_name # 返回生成的文档名 } finally: # 清理临时文件 if template_path and os.path.exists(template_path): try: os.remove(template_path) except: pass if filled_doc_path and os.path.exists(filled_doc_path): try: os.remove(filled_doc_path) except: pass def generate_document_id(self) -> str: """生成文档ID""" now = datetime.now() return f"DOC{now.strftime('%Y%m%d%H%M%S')}{str(now.microsecond)[:3]}" def generate_document_name(self, original_file_name: str, field_data: Dict[str, str]) -> str: """ 生成文档名称 Args: original_file_name: 原始文件名称 field_data: 字段数据 Returns: 生成的文档名称,如 "初步核实审批表_张三.docx" """ # 提取文件基础名称(不含扩展名) # 处理可能包含路径的情况 base_name = Path(original_file_name).stem # 清理文件名中的特殊字符(如括号等,但保留中文) # 移除常见的模板标记,如 "(XXX)"、"(初核谈话)" 等 import re base_name = re.sub(r'[((].*?[))]', '', base_name) # 移除括号及其内容 base_name = base_name.strip() # 尝试从字段数据中提取被核查人姓名作为后缀 suffix = '' target_name = field_data.get('target_name', '') if target_name and target_name.strip(): suffix = f"_{target_name.strip()}" # 生成新文件名(确保是.docx格式) generated_name = f"{base_name}{suffix}.docx" print(f"[DEBUG] 文档名称生成: '{original_file_name}' -> '{generated_name}' (base_name='{base_name}', suffix='{suffix}')") return generated_name