ai-business-write/services/document_service.py

498 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文档生成服务 - 处理Word模板填充和MinIO文件上传
"""
import os
import re
import tempfile
from typing import Dict, List, Optional
from datetime import datetime
from pathlib import Path
from docx import Document
from minio import Minio
from minio.error import S3Error
import pymysql
class DocumentService:
"""文档生成服务类"""
def __init__(self):
# MinIO配置
self.minio_config = {
'endpoint': os.getenv('MINIO_ENDPOINT', 'minio.datacubeworld.com:9000'),
'access_key': os.getenv('MINIO_ACCESS_KEY', 'JOLXFXny3avFSzB0uRA5'),
'secret_key': os.getenv('MINIO_SECRET_KEY', 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I'),
'secure': os.getenv('MINIO_SECURE', 'true').lower() == 'true'
}
self.bucket_name = os.getenv('MINIO_BUCKET', 'finyx')
# 数据库配置
self.db_config = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
self.tenant_id = 615873064429507639
def get_connection(self):
"""获取数据库连接"""
return pymysql.connect(**self.db_config)
def get_minio_client(self):
"""获取MinIO客户端"""
return Minio(
self.minio_config['endpoint'],
access_key=self.minio_config['access_key'],
secret_key=self.minio_config['secret_key'],
secure=self.minio_config['secure']
)
def get_file_config_by_id(self, file_id: int) -> Optional[Dict]:
"""
根据文件ID获取文件配置
Args:
file_id: 文件配置ID
Returns:
文件配置信息,包含: id, name, file_path
"""
conn = self.get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, name, file_path
FROM f_polic_file_config
WHERE id = %s
AND tenant_id = %s
AND state = 1
"""
cursor.execute(sql, (file_id, self.tenant_id))
config = cursor.fetchone()
if config:
return {
'id': config['id'],
'name': config['name'],
'file_path': config['file_path']
}
return None
finally:
cursor.close()
conn.close()
def download_template_from_minio(self, file_path: str) -> str:
"""
从MinIO下载模板文件到临时目录
Args:
file_path: MinIO中的相对路径'/615873064429507639/TEMPLATE/2024/11/初步核实审批表模板.docx'
Returns:
本地临时文件路径
"""
# 检查file_path是否为None或空
if not file_path:
raise Exception("模板文件路径不能为空请检查数据库中模板配置的file_path字段")
client = self.get_minio_client()
# 创建临时文件
temp_dir = tempfile.gettempdir()
temp_file = os.path.join(temp_dir, f"template_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx")
try:
# 从相对路径中提取对象名称(去掉开头的/
object_name = file_path.lstrip('/')
# 下载文件
client.fget_object(self.bucket_name, object_name, temp_file)
return temp_file
except S3Error as e:
raise Exception(f"从MinIO下载模板文件失败: {str(e)}")
def fill_template(self, template_path: str, field_data: Dict[str, str]) -> str:
"""
填充Word模板中的占位符
Args:
template_path: 模板文件路径
field_data: 字段数据字典,格式: {'field_code': 'field_value'}
Returns:
填充后的文档路径
"""
try:
print(f"[DEBUG] 开始填充模板: {template_path}")
print(f"[DEBUG] 字段数据: {field_data}")
# 打开模板文档
doc = Document(template_path)
print(f"[DEBUG] 文档包含 {len(doc.paragraphs)} 个段落, {len(doc.tables)} 个表格")
<<<<<<< HEAD
def replace_placeholder_in_paragraph(paragraph):
"""在段落中替换占位符处理跨run的情况"""
try:
# 获取段落完整文本
full_text = paragraph.text
if not full_text:
return
# 检查是否有占位符需要替换
has_placeholder = False
replaced_text = full_text
replacement_count = 0
# 遍历所有字段,替换所有匹配的占位符(包括重复的)
for field_code, field_value in field_data.items():
placeholder = f"{{{{{field_code}}}}}"
# 使用循环替换所有匹配项(不仅仅是第一个)
while placeholder in replaced_text:
has_placeholder = True
replacement_count += 1
# 替换占位符,如果值为空则替换为空字符串
replaced_text = replaced_text.replace(placeholder, str(field_value) if field_value else '', 1)
print(f"[DEBUG] 替换占位符: {placeholder} -> '{field_value}' (在段落中)")
# 如果有替换,使用安全的方式更新段落文本
if has_placeholder:
print(f"[DEBUG] 段落替换了 {replacement_count} 个占位符: '{full_text[:50]}...' -> '{replaced_text[:50]}...'")
try:
# 方法1直接设置text推荐会自动处理run
paragraph.text = replaced_text
except Exception as e1:
# 如果方法1失败尝试方法2手动处理run
try:
# 清空所有run
paragraph.clear()
# 添加新的run
if replaced_text:
paragraph.add_run(replaced_text)
except Exception as e2:
# 如果两种方法都失败,记录错误但继续
print(f"[WARN] 无法更新段落文本方法1错误: {str(e1)}, 方法2错误: {str(e2)}")
pass
except Exception as e:
# 如果单个段落处理失败,记录错误但继续处理其他段落
print(f"[WARN] 处理段落时出错: {str(e)}")
import traceback
print(traceback.format_exc())
pass
# 统计替换信息
total_replacements = 0
replaced_placeholders = set()
# 替换段落中的占位符
for para_idx, paragraph in enumerate(doc.paragraphs):
before_text = paragraph.text
replace_placeholder_in_paragraph(paragraph)
after_text = paragraph.text
if before_text != after_text:
# 检查哪些占位符被替换了
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in before_text and placeholder not in after_text:
replaced_placeholders.add(field_code)
total_replacements += before_text.count(placeholder)
=======
# 替换占位符 {{field_code}} 为实际值
for paragraph in doc.paragraphs:
# 替换段落文本中的占位符
for field_code, field_value in field_data.items():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in paragraph.text:
# 替换占位符
for run in paragraph.runs:
if placeholder in run.text:
run.text = run.text.replace(placeholder, field_value or '')
>>>>>>> parent of 4897c96 (添加通过taskId获取文档的接口支持文件列表查询和参数验证增强错误处理能力同时优化文档生成逻辑确保生成的文档名称和路径的准确性)
# 替换表格中的占位符
try:
for table in doc.tables:
if not table.rows:
continue
for row in table.rows:
if not row.cells:
continue
for cell in row.cells:
try:
# 检查cell是否有paragraphs属性且不为空
if hasattr(cell, 'paragraphs'):
# 安全地获取paragraphs列表
paragraphs = list(cell.paragraphs) if cell.paragraphs else []
for paragraph in paragraphs:
before_text = paragraph.text
replace_placeholder_in_paragraph(paragraph)
after_text = paragraph.text
if before_text != after_text:
# 检查哪些占位符被替换了
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in before_text and placeholder not in after_text:
replaced_placeholders.add(field_code)
total_replacements += before_text.count(placeholder)
except Exception as e:
# 如果单个单元格处理失败,记录错误但继续处理其他单元格
print(f"[WARN] 处理表格单元格时出错: {str(e)}")
pass
except Exception as e:
# 如果表格处理失败,记录错误但继续保存文档
print(f"[WARN] 处理表格时出错: {str(e)}")
pass
# 验证是否还有未替换的占位符
remaining_placeholders = set()
for paragraph in doc.paragraphs:
text = paragraph.text
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in text:
remaining_placeholders.add(field_code)
# 检查表格中的占位符
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
<<<<<<< HEAD
if hasattr(cell, 'paragraphs'):
for paragraph in cell.paragraphs:
text = paragraph.text
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in text:
remaining_placeholders.add(field_code)
# 输出统计信息
print(f"[DEBUG] 占位符替换统计:")
print(f" - 已替换的占位符: {sorted(replaced_placeholders)}")
print(f" - 总替换次数: {total_replacements}")
if remaining_placeholders:
print(f" - ⚠️ 仍有未替换的占位符: {sorted(remaining_placeholders)}")
else:
print(f" - ✓ 所有占位符已成功替换")
=======
for paragraph in cell.paragraphs:
for field_code, field_value in field_data.items():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in paragraph.text:
for run in paragraph.runs:
if placeholder in run.text:
run.text = run.text.replace(placeholder, field_value or '')
>>>>>>> parent of 4897c96 (添加通过taskId获取文档的接口支持文件列表查询和参数验证增强错误处理能力同时优化文档生成逻辑确保生成的文档名称和路径的准确性)
# 保存到临时文件
temp_dir = tempfile.gettempdir()
output_file = os.path.join(temp_dir, f"filled_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx")
doc.save(output_file)
print(f"[DEBUG] 文档已保存到: {output_file}")
return output_file
except IndexError as e:
# 索引越界错误,提供更详细的错误信息
import traceback
error_detail = traceback.format_exc()
raise Exception(f"填充模板失败: list index out of range. 详细信息: {str(e)}\n{error_detail}")
except Exception as e:
# 其他错误,提供详细的错误信息
import traceback
error_detail = traceback.format_exc()
raise Exception(f"填充模板失败: {str(e)}\n{error_detail}")
def upload_to_minio(self, file_path: str, file_name: str) -> str:
"""
上传文件到MinIO
Args:
file_path: 本地文件路径
file_name: 文件名称
Returns:
MinIO中的相对路径
"""
client = self.get_minio_client()
try:
# 生成MinIO对象路径相对路径
now = datetime.now()
# 使用日期路径组织文件,添加微秒确保唯一性
timestamp = f"{now.strftime('%Y%m%d%H%M%S')}{now.microsecond:06d}"
object_name = f"{self.tenant_id}/{timestamp}/{file_name}"
# 上传文件
client.fput_object(
self.bucket_name,
object_name,
file_path,
content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document'
)
# 返回相对路径(以/开头)
return f"/{object_name}"
except S3Error as e:
raise Exception(f"上传文件到MinIO失败: {str(e)}")
def generate_document(self, file_id: int, input_data: List[Dict], file_info: Dict) -> Dict:
"""
生成文档
Args:
file_id: 文件配置ID
input_data: 输入数据列表,格式: [{'fieldCode': 'xxx', 'fieldValue': 'xxx'}]
file_info: 文件信息,格式: {'fileId': 1, 'fileName': 'xxx.doc'}
Returns:
生成结果,包含: filePath
"""
# 获取文件配置
file_config = self.get_file_config_by_id(file_id)
if not file_config:
# 提供更详细的错误信息
raise Exception(
f"文件ID {file_id} 对应的模板不存在或未启用。"
f"请通过查询 f_polic_file_config 表获取有效的文件ID"
f"或访问 /api/file-configs 接口查看可用的文件配置列表。"
)
# 检查file_path是否存在
file_path = file_config.get('file_path')
if not file_path:
raise Exception(f"文件ID {file_id} ({file_config.get('name', '')}) 的文件路径(file_path)为空,请检查数据库配置")
# 将input_data转换为字典格式
field_data = {}
for item in input_data:
field_code = item.get('fieldCode', '')
field_value = item.get('fieldValue', '')
if field_code:
field_data[field_code] = field_value or ''
# 下载模板
template_path = None
filled_doc_path = None
try:
template_path = self.download_template_from_minio(file_path)
# 填充模板
filled_doc_path = self.fill_template(template_path, field_data)
# 生成文档名称(.docx格式
# 优先使用file_info中的fileName如果没有则使用数据库中的name
# 确保每个文件都使用自己的文件名
original_file_name = file_info.get('fileName') or file_info.get('name') or file_config.get('name', 'generated.doc')
print(f"[DEBUG] 文件ID: {file_id}, 原始文件名: {original_file_name}")
print(f"[DEBUG] file_info内容: {file_info}")
print(f"[DEBUG] file_config内容: {file_config}")
print(f"[DEBUG] 字段数据用于生成文档名: {field_data}")
generated_file_name = self.generate_document_name(original_file_name, field_data)
print(f"[DEBUG] 文件ID: {file_id}, 生成的文档名: {generated_file_name}")
# 上传到MinIO使用生成的文档名
file_path = self.upload_to_minio(filled_doc_path, generated_file_name)
return {
'filePath': file_path,
'fileName': generated_file_name # 返回生成的文档名
}
finally:
# 清理临时文件
if template_path and os.path.exists(template_path):
try:
os.remove(template_path)
except:
pass
if filled_doc_path and os.path.exists(filled_doc_path):
try:
os.remove(filled_doc_path)
except:
pass
def generate_document_id(self) -> str:
"""生成文档ID"""
now = datetime.now()
return f"DOC{now.strftime('%Y%m%d%H%M%S')}{str(now.microsecond)[:3]}"
def generate_document_name(self, original_file_name: str, field_data: Dict[str, str]) -> str:
"""
生成文档名称
Args:
original_file_name: 原始文件名称
field_data: 字段数据
Returns:
生成的文档名称,如 "请示报告卡_张三.docx"
"""
import re
# 提取文件基础名称(不含扩展名)
# 处理可能包含路径的情况
# 先移除路径,只保留文件名
file_name_only = Path(original_file_name).name
# 判断是否有扩展名(.doc, .docx等
# 如果最后有常见的文档扩展名则提取stem
if file_name_only.lower().endswith(('.doc', '.docx', '.txt', '.pdf')):
base_name = Path(file_name_only).stem
else:
# 如果没有扩展名,直接使用文件名
base_name = file_name_only
print(f"[DEBUG] 原始文件名: '{original_file_name}'")
print(f"[DEBUG] 提取的基础名称(清理前): '{base_name}'")
# 清理文件名中的特殊标记
# 1. 移除开头的数字和点(如 "1."、"2." 等),但保留后面的内容
# 使用非贪婪匹配,只匹配开头的数字和点
base_name = re.sub(r'^\d+\.\s*', '', base_name)
# 2. 移除括号及其内容(如 "XXX"、"(初核谈话)" 等)
base_name = re.sub(r'[(].*?[)]', '', base_name)
# 3. 清理首尾空白字符和多余的点
base_name = base_name.strip().strip('.')
# 4. 如果清理后为空或只有数字,使用原始文件名重新处理
if not base_name or base_name.isdigit():
print(f"[DEBUG] 清理后为空或只有数字,重新处理原始文件名")
# 从原始文件名中提取,但保留更多内容
temp_name = file_name_only
# 只移除括号,保留数字前缀(但格式化为更友好的形式)
temp_name = re.sub(r'[(].*?[)]', '', temp_name)
# 移除扩展名(如果存在)
if temp_name.lower().endswith(('.doc', '.docx', '.txt', '.pdf')):
temp_name = Path(temp_name).stem
temp_name = temp_name.strip().strip('.')
if temp_name:
base_name = temp_name
else:
base_name = "文档" # 最后的备选方案
print(f"[DEBUG] 清理后的基础名称: '{base_name}'")
# 尝试从字段数据中提取被核查人姓名作为后缀
suffix = ''
target_name = field_data.get('target_name', '')
if target_name and target_name.strip():
suffix = f"_{target_name.strip()}"
# 生成新文件名(确保是.docx格式
generated_name = f"{base_name}{suffix}.docx"
print(f"[DEBUG] 文档名称生成: '{original_file_name}' -> '{generated_name}' (base_name='{base_name}', suffix='{suffix}')")
return generated_name