""" 文档生成服务 - 处理Word模板填充和MinIO文件上传 """ import os import re import tempfile import zipfile from typing import Dict, List, Optional from datetime import datetime, timedelta from pathlib import Path from docx import Document from minio import Minio from minio.error import S3Error import pymysql from xml.etree import ElementTree as ET class DocumentService: """文档生成服务类""" def __init__(self): # MinIO配置(从环境变量读取,不设置默认值) minio_endpoint = os.getenv('MINIO_ENDPOINT') minio_access_key = os.getenv('MINIO_ACCESS_KEY') minio_secret_key = os.getenv('MINIO_SECRET_KEY') minio_secure = os.getenv('MINIO_SECURE', 'true').lower() == 'true' minio_bucket = os.getenv('MINIO_BUCKET') if not all([minio_endpoint, minio_access_key, minio_secret_key, minio_bucket]): raise ValueError( "MinIO配置不完整,请在.env文件中配置以下环境变量:\n" "MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY, MINIO_BUCKET" ) self.minio_config = { 'endpoint': minio_endpoint, 'access_key': minio_access_key, 'secret_key': minio_secret_key, 'secure': minio_secure } self.bucket_name = minio_bucket # 数据库配置(从环境变量读取,不设置默认值) db_host = os.getenv('DB_HOST') db_port = os.getenv('DB_PORT') db_user = os.getenv('DB_USER') db_password = os.getenv('DB_PASSWORD') db_name = os.getenv('DB_NAME') if not all([db_host, db_port, db_user, db_password, db_name]): raise ValueError( "数据库配置不完整,请在.env文件中配置以下环境变量:\n" "DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME" ) self.db_config = { 'host': db_host, 'port': int(db_port), 'user': db_user, 'password': db_password, 'database': db_name, 'charset': 'utf8mb4' } # tenant_id 从环境变量读取(用于MinIO路径,如果不需要可以移除) # 如果不需要tenant_id,可以设置为空字符串或从路径中移除 self.tenant_id = os.getenv('TENANT_ID', '') def get_connection(self): """获取数据库连接""" return pymysql.connect(**self.db_config) def get_minio_client(self): """获取MinIO客户端""" return Minio( self.minio_config['endpoint'], access_key=self.minio_config['access_key'], secret_key=self.minio_config['secret_key'], secure=self.minio_config['secure'] ) def get_file_config_by_id(self, file_id: int, tenant_id: Optional[int] = None) -> Optional[Dict]: """ 根据文件ID获取文件配置 Args: file_id: 文件配置ID tenant_id: 租户ID(如果为None,则从环境变量获取或使用默认值1) Returns: 文件配置信息,包含: id, name, file_path """ conn = self.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 获取tenant_id if tenant_id is None: # 尝试从环境变量获取 tenant_id_str = os.getenv('TENANT_ID', '1') try: tenant_id = int(tenant_id_str) except (ValueError, TypeError): tenant_id = 1 # 默认值 sql = """ SELECT id, name, file_path FROM f_polic_file_config WHERE id = %s AND tenant_id = %s AND state = 1 """ cursor.execute(sql, (file_id, tenant_id)) config = cursor.fetchone() if config: return { 'id': config['id'], 'name': config['name'], 'file_path': config['file_path'] } return None finally: cursor.close() conn.close() def download_template_from_minio(self, file_path: str) -> str: """ 从本地template_finish文件夹读取模板文件到临时目录 Args: file_path: 本地相对路径(相对于项目根目录),如 'template_finish/2-初核模版/1.初核请示/1.请示报告卡(XXX).docx' Returns: 本地临时文件路径 """ # 检查file_path是否为None或空 if not file_path: raise Exception("模板文件路径不能为空,请检查数据库中模板配置的file_path字段") # 获取项目根目录(document_service.py在services/目录下,需要向上一级) project_root = Path(__file__).parent.parent local_template_path = project_root / file_path # 检查文件是否存在 if not local_template_path.exists(): raise Exception(f"模板文件不存在: {local_template_path}。请检查数据库中的file_path配置是否正确。") if not local_template_path.is_file(): raise Exception(f"路径不是文件: {local_template_path}") # 创建临时文件 temp_dir = tempfile.gettempdir() # 使用原文件名和扩展名,但添加时间戳确保唯一性 original_name = local_template_path.name name_without_ext = local_template_path.stem ext = local_template_path.suffix temp_file = os.path.join(temp_dir, f"template_{name_without_ext}_{datetime.now().strftime('%Y%m%d%H%M%S')}{ext}") try: # 复制文件到临时目录 import shutil shutil.copy2(local_template_path, temp_file) return temp_file except Exception as e: raise Exception(f"从本地读取模板文件失败: {str(e)}") def replace_placeholders_via_xml(self, docx_path: str, field_data: Dict[str, str]) -> bool: """ 通过直接操作XML来替换占位符(备用方案,用于处理表格访问失败的情况) Args: docx_path: docx文件路径 field_data: 字段数据字典 Returns: 是否成功替换 """ try: # docx文件实际上是一个ZIP文件 # 需要创建一个新的ZIP文件来替换内容 temp_zip_path = docx_path + '.tmp' with zipfile.ZipFile(docx_path, 'r') as zip_read: with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_write: # 复制所有文件,但替换word/document.xml for item in zip_read.infolist(): if item.filename == 'word/document.xml': # 读取并修改XML内容 xml_content = zip_read.read(item.filename).decode('utf-8') # 替换占位符 modified = False for field_code, field_value in field_data.items(): placeholder = f"{{{{{field_code}}}}}" replacement_value = str(field_value) if field_value else '' if placeholder in xml_content: xml_content = xml_content.replace(placeholder, replacement_value) modified = True print(f"[DEBUG] XML替换占位符: {placeholder} -> '{replacement_value}'") # 写入修改后的XML zip_write.writestr(item.filename, xml_content.encode('utf-8')) if modified: print(f"[DEBUG] XML成功替换占位符") else: # 复制其他文件 zip_write.writestr(item, zip_read.read(item.filename)) # 替换原文件 if os.path.exists(temp_zip_path): os.replace(temp_zip_path, docx_path) return True return False except Exception as e: print(f"[WARN] XML替换占位符失败: {str(e)}") import traceback print(traceback.format_exc()) # 清理临时文件 temp_zip_path = docx_path + '.tmp' if os.path.exists(temp_zip_path): try: os.remove(temp_zip_path) except: pass return False def fill_template(self, template_path: str, field_data: Dict[str, str]) -> str: """ 填充Word模板中的占位符 Args: template_path: 模板文件路径 field_data: 字段数据字典,格式: {'field_code': 'field_value'} Returns: 填充后的文档路径 """ try: print(f"[DEBUG] 开始填充模板: {template_path}") print(f"[DEBUG] 传入的字段数据: {field_data}") # 打开模板文档 doc = Document(template_path) print(f"[DEBUG] 文档包含 {len(doc.paragraphs)} 个段落, {len(doc.tables)} 个表格") # 第一步:扫描模板文档,找出所有占位符(格式:{{field_code}}) # 使用正则表达式匹配所有占位符 placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}') all_placeholders_in_template = set() # 扫描段落中的占位符 for paragraph in doc.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: all_placeholders_in_template.add(field_code) # 扫描表格中的占位符 for table_idx, table in enumerate(doc.tables): try: if not table.rows: continue # 安全地获取表格行数 try: row_count = len(table.rows) except Exception as e: print(f"[WARN] 扫描表格 {table_idx} 时无法获取行数,跳过该表格: {str(e)}") continue # 使用索引方式访问行,而不是迭代器,避免在迭代时触发内部索引访问错误 try: row_count = len(table.rows) except Exception: row_count = 0 for row_idx in range(row_count): try: # 使用索引访问行,而不是迭代器 row = table.rows[row_idx] # 安全地访问 row.cells,避免 docx 库在处理异常表格结构时的 bug if not hasattr(row, 'cells'): continue # 使用 try-except 包裹,防止 IndexError try: # 先尝试获取cells的数量 try: cell_count = len(row.cells) except (IndexError, AttributeError): cell_count = 0 if cell_count == 0: continue # 使用索引方式访问cells,而不是迭代器 cells = [] for cell_idx in range(cell_count): try: cell = row.cells[cell_idx] cells.append(cell) except (IndexError, AttributeError): # 如果某个单元格无法访问,跳过 continue if not cells: continue except (IndexError, AttributeError) as e: print(f"[WARN] 扫描表格 {table_idx} 行 {row_idx} 时无法访问单元格,跳过该行: {str(e)}") continue for cell_idx, cell in enumerate(cells): try: if not hasattr(cell, 'paragraphs'): continue # 安全地获取paragraphs列表 try: # 先尝试获取paragraphs的数量 try: para_count = len(cell.paragraphs) except (IndexError, AttributeError): para_count = 0 paragraphs = [] for para_idx in range(para_count): try: para = cell.paragraphs[para_idx] paragraphs.append(para) except (IndexError, AttributeError): continue except (IndexError, AttributeError) as e: print(f"[WARN] 扫描表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 时无法访问段落,跳过: {str(e)}") continue for paragraph in paragraphs: try: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: all_placeholders_in_template.add(field_code) except Exception as e: print(f"[WARN] 扫描表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 段落时出错,跳过: {str(e)}") continue except Exception as e: print(f"[WARN] 扫描表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 时出错,跳过: {str(e)}") continue except Exception as e: print(f"[WARN] 扫描表格 {table_idx} 行 {row_idx} 时出错,跳过: {str(e)}") continue except Exception as e: print(f"[WARN] 扫描表格 {table_idx} 时出错,跳过该表格: {str(e)}") continue print(f"[DEBUG] 模板中发现 {len(all_placeholders_in_template)} 个不同的占位符: {sorted(all_placeholders_in_template)}") # 第二步:对于模板中存在的占位符,如果field_data中没有对应的值,则使用空字符串 # 创建一个完整的字段数据字典,包含所有需要的字段 complete_field_data = {} for field_code in all_placeholders_in_template: # 如果传入的数据中有该字段,使用传入的值;否则使用空字符串 complete_field_data[field_code] = field_data.get(field_code, '') # 同时保留传入的字段数据(可能包含模板中没有的字段,虽然不会使用,但保留以兼容) for field_code, field_value in field_data.items(): if field_code not in complete_field_data: complete_field_data[field_code] = field_value or '' print(f"[DEBUG] 完整的字段数据(包含默认空值): {complete_field_data}") print(f"[DEBUG] 补充的空值字段: {sorted(set(complete_field_data.keys()) - set(field_data.keys()))}") # 使用完整的字段数据进行替换 field_data = complete_field_data def replace_placeholder_in_paragraph(paragraph): """在段落中替换占位符,保持原有格式(处理跨run的情况)""" try: # 获取段落完整文本 full_text = paragraph.text if not full_text: return # 检查是否有占位符需要替换(使用多种方式检查) has_placeholder = False found_placeholders = [] for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in full_text: has_placeholder = True found_placeholders.append(placeholder) else: # 尝试使用正则表达式检查(处理可能的编码问题) import re if re.search(re.escape(placeholder), full_text): has_placeholder = True found_placeholders.append(placeholder) if not has_placeholder: return # 调试信息:记录找到的占位符 if found_placeholders: print(f"[DEBUG] 段落中发现占位符: {found_placeholders}, 段落文本前50字符: '{full_text[:50]}'") # 收集所有runs及其位置和格式信息 runs_info = [] current_pos = 0 for run in paragraph.runs: run_text = run.text run_start = current_pos run_end = current_pos + len(run_text) # 保存run的格式信息 format_info = {} try: if run.font.name: format_info['font_name'] = run.font.name if run.font.size: format_info['font_size'] = run.font.size if run.bold is not None: format_info['bold'] = run.bold if run.italic is not None: format_info['italic'] = run.italic if run.underline is not None: format_info['underline'] = run.underline if run.font.color and run.font.color.rgb: format_info['color'] = run.font.color.rgb except: pass runs_info.append({ 'run': run, 'text': run_text, 'start': run_start, 'end': run_end, 'format': format_info }) current_pos = run_end # 执行所有替换,构建最终文本 final_text = full_text replacement_count = 0 for field_code, field_value in field_data.items(): placeholder = f"{{{{{field_code}}}}}" replacement_value = str(field_value) if field_value else '' # 检查占位符是否在文本中(使用多种方式检查,确保兼容性) if placeholder in final_text: # 替换所有出现的占位符 before_replace = final_text final_text = final_text.replace(placeholder, replacement_value) count = before_replace.count(placeholder) replacement_count += count if count > 0: print(f"[DEBUG] 替换占位符: {placeholder} -> '{replacement_value}' (共 {count} 次)") else: # 尝试使用正则表达式匹配(处理可能的编码或格式问题) import re escaped_placeholder = re.escape(placeholder) if re.search(escaped_placeholder, final_text): before_replace = final_text final_text = re.sub(escaped_placeholder, replacement_value, final_text) count = len(re.findall(escaped_placeholder, before_replace)) replacement_count += count if count > 0: print(f"[DEBUG] 使用正则表达式替换占位符: {placeholder} -> '{replacement_value}' (共 {count} 次)") # 找到包含占位符的第一个run,使用它的格式 placeholder_run_format = None for run_info in runs_info: run_text = run_info['text'] # 检查这个run是否包含任何占位符 for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in run_text: placeholder_run_format = run_info['format'] break if placeholder_run_format: break # 如果没有找到包含占位符的run,使用第一个run的格式 if not placeholder_run_format and runs_info: placeholder_run_format = runs_info[0]['format'] # 如果只有一个run,直接替换文本(会自动保持格式) if len(runs_info) == 1: runs_info[0]['run'].text = final_text # 验证替换是否成功 if runs_info[0]['run'].text != final_text: print(f"[WARN] 单run替换后验证失败:期望 '{final_text[:50]}...',实际 '{runs_info[0]['run'].text[:50]}...'") else: # 多个run的情况:合并为一个run,保持格式 # 先清空所有runs for run_info in runs_info: run_info['run'].text = '' # 在第一个run中添加替换后的文本 first_run = runs_info[0]['run'] first_run.text = final_text # 验证文本是否被正确写入 if first_run.text != final_text: print(f"[WARN] 多run替换后验证失败:期望 '{final_text[:50]}...',实际 '{first_run.text[:50]}...'") # 尝试再次写入 first_run.text = final_text if first_run.text != final_text: print(f"[ERROR] 多次尝试写入失败,可能存在严重问题") # 应用格式(使用包含占位符的run的格式,或第一个run的格式) if placeholder_run_format: try: if 'font_name' in placeholder_run_format: first_run.font.name = placeholder_run_format['font_name'] if 'font_size' in placeholder_run_format: first_run.font.size = placeholder_run_format['font_size'] if 'bold' in placeholder_run_format: first_run.bold = placeholder_run_format['bold'] if 'italic' in placeholder_run_format: first_run.italic = placeholder_run_format['italic'] if 'underline' in placeholder_run_format: first_run.underline = placeholder_run_format['underline'] if 'color' in placeholder_run_format: first_run.font.color.rgb = placeholder_run_format['color'] except Exception as fmt_error: print(f"[WARN] 应用格式时出错: {str(fmt_error)}") # 删除其他空的runs(从后往前删除,避免索引问题) for i in range(len(runs_info) - 1, 0, -1): run_element = runs_info[i]['run']._element try: paragraph._element.remove(run_element) except Exception as remove_error: print(f"[WARN] 删除run时出错: {str(remove_error)}") # 最终验证:检查段落文本是否包含占位符 final_paragraph_text = paragraph.text remaining_in_paragraph = [] for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in final_paragraph_text: remaining_in_paragraph.append(placeholder) if remaining_in_paragraph: print(f"[WARN] 段落替换后仍有占位符: {remaining_in_paragraph}") print(f"[WARN] 原始文本: '{full_text[:100]}'") print(f"[WARN] 替换后文本: '{final_text[:100]}'") print(f"[WARN] 段落实际文本: '{final_paragraph_text[:100]}'") else: print(f"[DEBUG] 段落替换了 {replacement_count} 个占位符(保持格式): '{full_text[:50]}...' -> '{final_text[:50]}...'") except Exception as e: # 如果单个段落处理失败,记录错误但继续处理其他段落 print(f"[WARN] 处理段落时出错: {str(e)}") import traceback print(traceback.format_exc()) pass # 统计替换信息 total_replacements = 0 replaced_placeholders = set() # 替换段落中的占位符 for para_idx, paragraph in enumerate(doc.paragraphs): before_text = paragraph.text replace_placeholder_in_paragraph(paragraph) after_text = paragraph.text if before_text != after_text: # 检查哪些占位符被替换了 for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in before_text and placeholder not in after_text: replaced_placeholders.add(field_code) total_replacements += before_text.count(placeholder) # 替换表格中的占位符 table_replacements = 0 table_errors = 0 use_xml_fallback = False try: for table_idx, table in enumerate(doc.tables): try: if not table.rows: continue # 安全地获取表格行数 try: row_count = len(table.rows) print(f"[DEBUG] 表格 {table_idx} 有 {row_count} 行") except Exception as e: print(f"[WARN] 无法获取表格 {table_idx} 的行数,跳过该表格: {str(e)}") table_errors += 1 continue # 使用索引方式访问行,而不是迭代器,避免在迭代时触发内部索引访问错误 for row_idx in range(row_count): try: # 使用索引访问行,而不是迭代器 row = table.rows[row_idx] # 安全地访问 row.cells,避免 docx 库在处理异常表格结构时的 bug if not hasattr(row, 'cells'): continue # 使用 try-except 包裹,防止 IndexError try: # 先尝试获取cells的数量 try: cell_count = len(row.cells) except (IndexError, AttributeError): cell_count = 0 if cell_count == 0: continue # 使用索引方式访问cells,而不是迭代器 cells = [] for cell_idx in range(cell_count): try: cell = row.cells[cell_idx] cells.append(cell) except (IndexError, AttributeError): # 如果某个单元格无法访问,跳过 continue if not cells: continue except (IndexError, AttributeError) as e: print(f"[WARN] 表格 {table_idx} 行 {row_idx} 无法访问单元格,跳过该行: {str(e)}") table_errors += 1 continue # 安全地遍历单元格 for cell_idx, cell in enumerate(cells): try: # 检查cell是否有paragraphs属性且不为空 if not hasattr(cell, 'paragraphs'): continue # 安全地获取paragraphs列表 try: # 先尝试获取paragraphs的数量 try: para_count = len(cell.paragraphs) except (IndexError, AttributeError): para_count = 0 paragraphs = [] for para_idx in range(para_count): try: para = cell.paragraphs[para_idx] paragraphs.append(para) except (IndexError, AttributeError): continue except (IndexError, AttributeError) as e: print(f"[WARN] 表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 无法访问段落,跳过: {str(e)}") table_errors += 1 continue for para_idx, paragraph in enumerate(paragraphs): try: before_text = paragraph.text if not before_text: continue # 检查是否有占位符 has_placeholder = False for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in before_text: has_placeholder = True break if not has_placeholder: continue print(f"[DEBUG] 表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 段落 {para_idx} 发现占位符: '{before_text[:50]}...'") replace_placeholder_in_paragraph(paragraph) after_text = paragraph.text if before_text != after_text: print(f"[DEBUG] 表格替换成功: '{before_text[:50]}...' -> '{after_text[:50]}...'") table_replacements += 1 # 检查哪些占位符被替换了 for field_code in field_data.keys(): placeholder = f"{{{{{field_code}}}}}" if placeholder in before_text and placeholder not in after_text: replaced_placeholders.add(field_code) total_replacements += before_text.count(placeholder) else: print(f"[WARN] 表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 段落 {para_idx} 替换后文本未改变") except Exception as e: # 如果单个段落处理失败,记录错误但继续处理其他段落 print(f"[WARN] 表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 段落 {para_idx} 处理出错: {str(e)}") table_errors += 1 import traceback print(traceback.format_exc()) continue except Exception as e: # 如果单个单元格处理失败,记录错误但继续处理其他单元格 print(f"[WARN] 表格 {table_idx} 行 {row_idx} 单元格 {cell_idx} 处理出错: {str(e)}") table_errors += 1 continue except Exception as e: # 如果单个行处理失败,记录错误但继续处理其他行 print(f"[WARN] 表格 {table_idx} 行 {row_idx} 处理出错: {str(e)}") table_errors += 1 import traceback print(traceback.format_exc()) continue except Exception as e: # 如果单个表格处理失败,记录错误但继续处理其他表格 print(f"[WARN] 表格 {table_idx} 处理出错: {str(e)}") table_errors += 1 import traceback print(traceback.format_exc()) continue print(f"[DEBUG] 表格替换统计: 成功 {table_replacements} 次, 错误 {table_errors} 次") # 记录表格处理结果,稍后在保存后使用XML方法作为备用方案 if table_errors > 0: print(f"[DEBUG] 检测到表格处理错误 {table_errors} 次,将在保存后使用XML方法作为备用方案") except Exception as e: # 如果表格处理失败,记录错误但继续保存文档 print(f"[WARN] 处理表格时出错: {str(e)}") import traceback print(traceback.format_exc()) table_errors = 999 # 标记为严重错误 # 第三步:验证是否还有未替换的占位符(使用正则表达式匹配所有可能的占位符) remaining_placeholders = set() for paragraph in doc.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: remaining_placeholders.add(field_code) # 跳过表格验证,因为: # 1. 表格中的占位符已经在替换阶段被处理了 # 2. 某些表格结构会导致索引越界错误(python-docx库的已知问题) # 3. 如果替换阶段成功,表格中的占位符应该已经被替换 # 4. 验证阶段的错误不影响功能,只是无法统计表格中剩余的占位符 print(f"[DEBUG] 跳过表格验证(表格中的占位符已在替换阶段处理,某些表格结构会导致验证错误)") # 输出统计信息 print(f"[DEBUG] 占位符替换统计:") print(f" - 模板中的占位符总数: {len(all_placeholders_in_template)}") print(f" - 已替换的占位符: {sorted(replaced_placeholders)}") print(f" - 总替换次数: {total_replacements}") if remaining_placeholders: print(f" - ⚠️ 仍有未替换的占位符: {sorted(remaining_placeholders)}") print(f" - ⚠️ 警告:文档中仍存在占位符,可能格式不正确或替换逻辑有问题") else: print(f" - [OK] 所有占位符已成功替换") # 保存到临时文件 temp_dir = tempfile.gettempdir() output_file = os.path.join(temp_dir, f"filled_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx") # 保存文档前,再次验证替换结果(用于调试) print(f"[DEBUG] 保存前验证:检查文档中是否还有占位符...") verification_placeholders = set() for paragraph in doc.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: verification_placeholders.add(field_code) # 跳过表格验证(表格中的占位符已在替换阶段处理) print(f"[DEBUG] 保存前验证:跳过表格验证(表格中的占位符已在替换阶段处理)") if verification_placeholders: print(f"[WARN] 保存前验证发现仍有占位符: {sorted(verification_placeholders)}") else: print(f"[DEBUG] 保存前验证通过:所有占位符已替换") # 保存文档 try: doc.save(output_file) print(f"[DEBUG] 文档已保存到: {output_file}") # 验证文件是否真的存在且大小大于0 import time time.sleep(0.1) # 等待文件系统同步(Ubuntu上可能需要) if not os.path.exists(output_file): raise Exception(f"文件保存失败:文件不存在 {output_file}") file_size = os.path.getsize(output_file) if file_size == 0: raise Exception(f"文件保存失败:文件大小为0 {output_file}") print(f"[DEBUG] 文件保存验证通过:文件大小 {file_size} 字节") # 如果表格处理有错误,使用XML方法作为备用方案 if use_xml_fallback: print(f"[DEBUG] 使用XML方法作为备用方案替换占位符...") if self.replace_placeholders_via_xml(output_file, field_data): print(f"[DEBUG] XML备用方案成功替换占位符") else: print(f"[WARN] XML备用方案替换失败") # 验证保存的文件内容是否正确(重新打开文件检查) try: verify_doc = Document(output_file) verify_placeholders_in_saved = set() for paragraph in verify_doc.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: verify_placeholders_in_saved.add(field_code) # 跳过表格验证(表格中的占位符已在替换阶段处理) print(f"[DEBUG] 保存后验证:跳过表格验证(表格中的占位符已在替换阶段处理)") if verify_placeholders_in_saved: print(f"[WARN] 保存后验证:文件中仍有占位符: {sorted(verify_placeholders_in_saved)}") print(f"[WARN] 这可能是导致Ubuntu上占位符未替换的原因") else: print(f"[DEBUG] 保存后验证通过:文件中所有占位符已替换") except Exception as verify_error: print(f"[WARN] 保存后验证失败(不影响功能): {str(verify_error)}") # 在Ubuntu上,可能需要显式同步文件系统 try: import sys if sys.platform != 'win32': # 在非Windows系统上,尝试同步文件系统 os.sync() print(f"[DEBUG] 已同步文件系统(非Windows系统)") except Exception as sync_error: print(f"[WARN] 文件系统同步失败(不影响功能): {str(sync_error)}") except Exception as save_error: error_msg = f"保存文档失败: {str(save_error)}" print(f"[ERROR] {error_msg}") import traceback print(traceback.format_exc()) raise Exception(error_msg) return output_file except IndexError as e: # 索引越界错误,提供更详细的错误信息 import traceback error_detail = traceback.format_exc() raise Exception(f"填充模板失败: list index out of range. 详细信息: {str(e)}\n{error_detail}") except Exception as e: # 其他错误,提供详细的错误信息 import traceback error_detail = traceback.format_exc() raise Exception(f"填充模板失败: {str(e)}\n{error_detail}") def upload_to_minio(self, file_path: str, file_name: str) -> str: """ 上传文件到MinIO Args: file_path: 本地文件路径 file_name: 文件名称 Returns: MinIO中的相对路径 """ client = self.get_minio_client() try: # 生成MinIO对象路径(相对路径) now = datetime.now() # 使用日期路径组织文件,添加微秒确保唯一性 timestamp = f"{now.strftime('%Y%m%d%H%M%S')}{now.microsecond:06d}" # 如果配置了tenant_id,则在路径中包含它;否则直接使用时间戳路径 if self.tenant_id: object_name = f"{self.tenant_id}/{timestamp}/{file_name}" else: object_name = f"{timestamp}/{file_name}" # 上传文件 client.fput_object( self.bucket_name, object_name, file_path, content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document' ) # 返回相对路径(以/开头) return f"/{object_name}" except S3Error as e: raise Exception(f"上传文件到MinIO失败: {str(e)}") def generate_document(self, file_id: int, input_data: List[Dict], file_info: Dict, tenant_id: Optional[int] = None) -> Dict: """ 生成文档 Args: file_id: 文件配置ID input_data: 输入数据列表,格式: [{'fieldCode': 'xxx', 'fieldValue': 'xxx'}] file_info: 文件信息,格式: {'fileId': 1, 'fileName': 'xxx.doc'} tenant_id: 租户ID(如果为None,则从环境变量获取或使用默认值1) Returns: 生成结果,包含: filePath """ # 获取文件配置 file_config = self.get_file_config_by_id(file_id, tenant_id) if not file_config: # 提供更详细的错误信息 raise Exception( f"文件ID {file_id} 对应的模板不存在或未启用。" f"请通过查询 f_polic_file_config 表获取有效的文件ID," f"或访问 /api/file-configs 接口查看可用的文件配置列表。" ) # 检查file_path是否存在 file_path = file_config.get('file_path') if not file_path: raise Exception(f"文件ID {file_id} ({file_config.get('name', '')}) 的文件路径(file_path)为空,请检查数据库配置") # 将input_data转换为字典格式 field_data = {} for item in input_data: field_code = item.get('fieldCode', '') field_value = item.get('fieldValue', '') if field_code: field_data[field_code] = field_value or '' # 下载模板 template_path = None filled_doc_path = None try: template_path = self.download_template_from_minio(file_path) # 填充模板 filled_doc_path = self.fill_template(template_path, field_data) # 生成文档名称(.docx格式) # 优先使用file_info中的fileName,如果没有则使用数据库中的name # 确保每个文件都使用自己的文件名 original_file_name = file_info.get('fileName') or file_info.get('name') or file_config.get('name', 'generated.doc') print(f"[DEBUG] 文件ID: {file_id}, 原始文件名: {original_file_name}") print(f"[DEBUG] file_info内容: {file_info}") print(f"[DEBUG] file_config内容: {file_config}") print(f"[DEBUG] 字段数据用于生成文档名: {field_data}") generated_file_name = self.generate_document_name(original_file_name, field_data) print(f"[DEBUG] 文件ID: {file_id}, 生成的文档名: {generated_file_name}") # 上传到MinIO(使用生成的文档名) file_path = self.upload_to_minio(filled_doc_path, generated_file_name) # 生成预签名下载URL download_url = self.generate_presigned_download_url(file_path) return { 'filePath': file_path, 'fileName': generated_file_name, # 返回生成的文档名 'downloadUrl': download_url # 返回预签名下载URL } finally: # 清理临时文件 if template_path and os.path.exists(template_path): try: os.remove(template_path) except: pass if filled_doc_path and os.path.exists(filled_doc_path): try: os.remove(filled_doc_path) except: pass def generate_document_id(self) -> str: """生成文档ID""" now = datetime.now() return f"DOC{now.strftime('%Y%m%d%H%M%S')}{str(now.microsecond)[:3]}" def generate_document_name(self, original_file_name: str, field_data: Dict[str, str]) -> str: """ 生成文档名称 Args: original_file_name: 原始文件名称 field_data: 字段数据 Returns: 生成的文档名称,如 "请示报告卡_张三.docx" """ import re # 提取文件基础名称(不含扩展名) # 处理可能包含路径的情况 # 先移除路径,只保留文件名 file_name_only = Path(original_file_name).name # 判断是否有扩展名(.doc, .docx等) # 如果最后有常见的文档扩展名,则提取stem if file_name_only.lower().endswith(('.doc', '.docx', '.txt', '.pdf')): base_name = Path(file_name_only).stem else: # 如果没有扩展名,直接使用文件名 base_name = file_name_only print(f"[DEBUG] 原始文件名: '{original_file_name}'") print(f"[DEBUG] 提取的基础名称(清理前): '{base_name}'") # 清理文件名中的特殊标记 # 1. 移除开头的数字和点(如 "1."、"2." 等),但保留后面的内容 # 使用非贪婪匹配,只匹配开头的数字和点 base_name = re.sub(r'^\d+\.\s*', '', base_name) # 2. 移除括号及其内容(如 "(XXX)"、"(初核谈话)" 等) base_name = re.sub(r'[((].*?[))]', '', base_name) # 3. 清理首尾空白字符和多余的点 base_name = base_name.strip().strip('.') # 4. 如果清理后为空或只有数字,使用原始文件名重新处理 if not base_name or base_name.isdigit(): print(f"[DEBUG] 清理后为空或只有数字,重新处理原始文件名") # 从原始文件名中提取,但保留更多内容 temp_name = file_name_only # 只移除括号,保留数字前缀(但格式化为更友好的形式) temp_name = re.sub(r'[((].*?[))]', '', temp_name) # 移除扩展名(如果存在) if temp_name.lower().endswith(('.doc', '.docx', '.txt', '.pdf')): temp_name = Path(temp_name).stem temp_name = temp_name.strip().strip('.') if temp_name: base_name = temp_name else: base_name = "文档" # 最后的备选方案 print(f"[DEBUG] 清理后的基础名称: '{base_name}'") # 尝试从字段数据中提取被核查人姓名作为后缀 suffix = '' target_name = field_data.get('target_name', '') if target_name and target_name.strip(): suffix = f"_{target_name.strip()}" # 生成新文件名(确保是.docx格式) generated_name = f"{base_name}{suffix}.docx" print(f"[DEBUG] 文档名称生成: '{original_file_name}' -> '{generated_name}' (base_name='{base_name}', suffix='{suffix}')") return generated_name def generate_presigned_download_url(self, file_path: str, expires_days: int = 7) -> Optional[str]: """ 生成MinIO预签名下载URL Args: file_path: MinIO中的相对路径,如 '/615873064429507639/20251205090700/初步核实审批表_张三.docx' expires_days: URL有效期(天数),默认7天 Returns: 预签名下载URL,如果生成失败则返回None """ try: if not file_path: return None client = self.get_minio_client() # 从相对路径中提取对象名称(去掉开头的/) object_name = file_path.lstrip('/') # 生成预签名URL url = client.presigned_get_object( self.bucket_name, object_name, expires=timedelta(days=expires_days) ) return url except Exception as e: # 如果生成URL失败,记录错误但不影响主流程 print(f"生成预签名URL失败: {str(e)}") return None