ai-business-write/services/document_service.py

873 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文档生成服务 - 处理Word模板填充和MinIO文件上传
"""
import os
import re
import tempfile
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from pathlib import Path
from docx import Document
from minio import Minio
from minio.error import S3Error
import pymysql
class DocumentService:
"""文档生成服务类"""
def __init__(self):
# MinIO配置
self.minio_config = {
'endpoint': os.getenv('MINIO_ENDPOINT', 'minio.datacubeworld.com:9000'),
'access_key': os.getenv('MINIO_ACCESS_KEY', 'JOLXFXny3avFSzB0uRA5'),
'secret_key': os.getenv('MINIO_SECRET_KEY', 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I'),
'secure': os.getenv('MINIO_SECURE', 'true').lower() == 'true'
}
self.bucket_name = os.getenv('MINIO_BUCKET', 'finyx')
# 数据库配置
self.db_config = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
self.tenant_id = 615873064429507639
def get_connection(self):
"""获取数据库连接"""
return pymysql.connect(**self.db_config)
def get_minio_client(self):
"""获取MinIO客户端"""
return Minio(
self.minio_config['endpoint'],
access_key=self.minio_config['access_key'],
secret_key=self.minio_config['secret_key'],
secure=self.minio_config['secure']
)
def get_file_config_by_id(self, file_id: int) -> Optional[Dict]:
"""
根据文件ID获取文件配置
Args:
file_id: 文件配置ID
Returns:
文件配置信息,包含: id, name, file_path
"""
conn = self.get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, name, file_path
FROM f_polic_file_config
WHERE id = %s
AND tenant_id = %s
AND state = 1
"""
cursor.execute(sql, (file_id, self.tenant_id))
config = cursor.fetchone()
if config:
return {
'id': config['id'],
'name': config['name'],
'file_path': config['file_path']
}
return None
finally:
cursor.close()
conn.close()
def download_template_from_minio(self, file_path: str) -> str:
"""
从MinIO下载模板文件到临时目录
Args:
file_path: MinIO中的相对路径'/615873064429507639/TEMPLATE/2024/11/初步核实审批表模板.docx'
Returns:
本地临时文件路径
"""
# 检查file_path是否为None或空
if not file_path:
raise Exception("模板文件路径不能为空请检查数据库中模板配置的file_path字段")
client = self.get_minio_client()
# 创建临时文件
temp_dir = tempfile.gettempdir()
temp_file = os.path.join(temp_dir, f"template_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx")
try:
# 从相对路径中提取对象名称(去掉开头的/
object_name = file_path.lstrip('/')
# 下载文件
client.fget_object(self.bucket_name, object_name, temp_file)
return temp_file
except S3Error as e:
raise Exception(f"从MinIO下载模板文件失败: {str(e)}")
def fill_template(self, template_path: str, field_data: Dict[str, str]) -> str:
"""
填充Word模板中的占位符
Args:
template_path: 模板文件路径
field_data: 字段数据字典,格式: {'field_code': 'field_value'}
Returns:
填充后的文档路径
"""
try:
print(f"[DEBUG] 开始填充模板: {template_path}")
print(f"[DEBUG] 传入的字段数据: {field_data}")
# 打开模板文档
doc = Document(template_path)
print(f"[DEBUG] 文档包含 {len(doc.paragraphs)} 个段落, {len(doc.tables)} 个表格")
# 第一步:扫描模板文档,找出所有占位符(格式:{{field_code}}
# 使用正则表达式匹配所有占位符
placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}')
all_placeholders_in_template = set()
# 扫描段落中的占位符
for paragraph in doc.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
all_placeholders_in_template.add(field_code)
# 扫描表格中的占位符
for table in doc.tables:
try:
if not table.rows:
continue
for row in table.rows:
try:
# 安全地访问 row.cells避免 docx 库在处理异常表格结构时的 bug
if not hasattr(row, 'cells'):
continue
# 使用 try-except 包裹,防止 IndexError
try:
cells = row.cells
except (IndexError, AttributeError) as e:
print(f"[WARN] 无法访问表格行的单元格,跳过该行: {str(e)}")
continue
for cell in cells:
try:
if hasattr(cell, 'paragraphs'):
for paragraph in cell.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
all_placeholders_in_template.add(field_code)
except Exception as e:
print(f"[WARN] 处理表格单元格时出错,跳过: {str(e)}")
continue
except Exception as e:
print(f"[WARN] 处理表格行时出错,跳过: {str(e)}")
continue
except Exception as e:
print(f"[WARN] 处理表格时出错,跳过该表格: {str(e)}")
continue
print(f"[DEBUG] 模板中发现 {len(all_placeholders_in_template)} 个不同的占位符: {sorted(all_placeholders_in_template)}")
# 第二步对于模板中存在的占位符如果field_data中没有对应的值则使用空字符串
# 创建一个完整的字段数据字典,包含所有需要的字段
complete_field_data = {}
for field_code in all_placeholders_in_template:
# 如果传入的数据中有该字段,使用传入的值;否则使用空字符串
complete_field_data[field_code] = field_data.get(field_code, '')
# 同时保留传入的字段数据(可能包含模板中没有的字段,虽然不会使用,但保留以兼容)
for field_code, field_value in field_data.items():
if field_code not in complete_field_data:
complete_field_data[field_code] = field_value or ''
print(f"[DEBUG] 完整的字段数据(包含默认空值): {complete_field_data}")
print(f"[DEBUG] 补充的空值字段: {sorted(set(complete_field_data.keys()) - set(field_data.keys()))}")
# 使用完整的字段数据进行替换
field_data = complete_field_data
def replace_placeholder_in_paragraph(paragraph):
"""在段落中替换占位符保持原有格式处理跨run的情况"""
try:
# 获取段落完整文本
full_text = paragraph.text
if not full_text:
return
# 检查是否有占位符需要替换(使用多种方式检查)
has_placeholder = False
found_placeholders = []
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in full_text:
has_placeholder = True
found_placeholders.append(placeholder)
else:
# 尝试使用正则表达式检查(处理可能的编码问题)
import re
if re.search(re.escape(placeholder), full_text):
has_placeholder = True
found_placeholders.append(placeholder)
if not has_placeholder:
return
# 调试信息:记录找到的占位符
if found_placeholders:
print(f"[DEBUG] 段落中发现占位符: {found_placeholders}, 段落文本前50字符: '{full_text[:50]}'")
# 收集所有runs及其位置和格式信息
runs_info = []
current_pos = 0
for run in paragraph.runs:
run_text = run.text
run_start = current_pos
run_end = current_pos + len(run_text)
# 保存run的格式信息
format_info = {}
try:
if run.font.name:
format_info['font_name'] = run.font.name
if run.font.size:
format_info['font_size'] = run.font.size
if run.bold is not None:
format_info['bold'] = run.bold
if run.italic is not None:
format_info['italic'] = run.italic
if run.underline is not None:
format_info['underline'] = run.underline
if run.font.color and run.font.color.rgb:
format_info['color'] = run.font.color.rgb
except:
pass
runs_info.append({
'run': run,
'text': run_text,
'start': run_start,
'end': run_end,
'format': format_info
})
current_pos = run_end
# 执行所有替换,构建最终文本
final_text = full_text
replacement_count = 0
for field_code, field_value in field_data.items():
placeholder = f"{{{{{field_code}}}}}"
replacement_value = str(field_value) if field_value else ''
# 检查占位符是否在文本中(使用多种方式检查,确保兼容性)
if placeholder in final_text:
# 替换所有出现的占位符
before_replace = final_text
final_text = final_text.replace(placeholder, replacement_value)
count = before_replace.count(placeholder)
replacement_count += count
if count > 0:
print(f"[DEBUG] 替换占位符: {placeholder} -> '{replacement_value}' (共 {count} 次)")
else:
# 尝试使用正则表达式匹配(处理可能的编码或格式问题)
import re
escaped_placeholder = re.escape(placeholder)
if re.search(escaped_placeholder, final_text):
before_replace = final_text
final_text = re.sub(escaped_placeholder, replacement_value, final_text)
count = len(re.findall(escaped_placeholder, before_replace))
replacement_count += count
if count > 0:
print(f"[DEBUG] 使用正则表达式替换占位符: {placeholder} -> '{replacement_value}' (共 {count} 次)")
# 找到包含占位符的第一个run使用它的格式
placeholder_run_format = None
for run_info in runs_info:
run_text = run_info['text']
# 检查这个run是否包含任何占位符
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in run_text:
placeholder_run_format = run_info['format']
break
if placeholder_run_format:
break
# 如果没有找到包含占位符的run使用第一个run的格式
if not placeholder_run_format and runs_info:
placeholder_run_format = runs_info[0]['format']
# 如果只有一个run直接替换文本会自动保持格式
if len(runs_info) == 1:
runs_info[0]['run'].text = final_text
# 验证替换是否成功
if runs_info[0]['run'].text != final_text:
print(f"[WARN] 单run替换后验证失败期望 '{final_text[:50]}...',实际 '{runs_info[0]['run'].text[:50]}...'")
else:
# 多个run的情况合并为一个run保持格式
# 先清空所有runs
for run_info in runs_info:
run_info['run'].text = ''
# 在第一个run中添加替换后的文本
first_run = runs_info[0]['run']
first_run.text = final_text
# 验证文本是否被正确写入
if first_run.text != final_text:
print(f"[WARN] 多run替换后验证失败期望 '{final_text[:50]}...',实际 '{first_run.text[:50]}...'")
# 尝试再次写入
first_run.text = final_text
if first_run.text != final_text:
print(f"[ERROR] 多次尝试写入失败,可能存在严重问题")
# 应用格式使用包含占位符的run的格式或第一个run的格式
if placeholder_run_format:
try:
if 'font_name' in placeholder_run_format:
first_run.font.name = placeholder_run_format['font_name']
if 'font_size' in placeholder_run_format:
first_run.font.size = placeholder_run_format['font_size']
if 'bold' in placeholder_run_format:
first_run.bold = placeholder_run_format['bold']
if 'italic' in placeholder_run_format:
first_run.italic = placeholder_run_format['italic']
if 'underline' in placeholder_run_format:
first_run.underline = placeholder_run_format['underline']
if 'color' in placeholder_run_format:
first_run.font.color.rgb = placeholder_run_format['color']
except Exception as fmt_error:
print(f"[WARN] 应用格式时出错: {str(fmt_error)}")
# 删除其他空的runs从后往前删除避免索引问题
for i in range(len(runs_info) - 1, 0, -1):
run_element = runs_info[i]['run']._element
try:
paragraph._element.remove(run_element)
except Exception as remove_error:
print(f"[WARN] 删除run时出错: {str(remove_error)}")
# 最终验证:检查段落文本是否包含占位符
final_paragraph_text = paragraph.text
remaining_in_paragraph = []
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in final_paragraph_text:
remaining_in_paragraph.append(placeholder)
if remaining_in_paragraph:
print(f"[WARN] 段落替换后仍有占位符: {remaining_in_paragraph}")
print(f"[WARN] 原始文本: '{full_text[:100]}'")
print(f"[WARN] 替换后文本: '{final_text[:100]}'")
print(f"[WARN] 段落实际文本: '{final_paragraph_text[:100]}'")
else:
print(f"[DEBUG] 段落替换了 {replacement_count} 个占位符(保持格式): '{full_text[:50]}...' -> '{final_text[:50]}...'")
except Exception as e:
# 如果单个段落处理失败,记录错误但继续处理其他段落
print(f"[WARN] 处理段落时出错: {str(e)}")
import traceback
print(traceback.format_exc())
pass
# 统计替换信息
total_replacements = 0
replaced_placeholders = set()
# 替换段落中的占位符
for para_idx, paragraph in enumerate(doc.paragraphs):
before_text = paragraph.text
replace_placeholder_in_paragraph(paragraph)
after_text = paragraph.text
if before_text != after_text:
# 检查哪些占位符被替换了
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in before_text and placeholder not in after_text:
replaced_placeholders.add(field_code)
total_replacements += before_text.count(placeholder)
# 替换表格中的占位符
try:
for table in doc.tables:
if not table.rows:
continue
for row in table.rows:
if not row.cells:
continue
for cell in row.cells:
try:
# 检查cell是否有paragraphs属性且不为空
if hasattr(cell, 'paragraphs'):
# 安全地获取paragraphs列表
paragraphs = list(cell.paragraphs) if cell.paragraphs else []
for paragraph in paragraphs:
before_text = paragraph.text
replace_placeholder_in_paragraph(paragraph)
after_text = paragraph.text
if before_text != after_text:
# 检查哪些占位符被替换了
for field_code in field_data.keys():
placeholder = f"{{{{{field_code}}}}}"
if placeholder in before_text and placeholder not in after_text:
replaced_placeholders.add(field_code)
total_replacements += before_text.count(placeholder)
except Exception as e:
# 如果单个单元格处理失败,记录错误但继续处理其他单元格
print(f"[WARN] 处理表格单元格时出错: {str(e)}")
pass
except Exception as e:
# 如果表格处理失败,记录错误但继续保存文档
print(f"[WARN] 处理表格时出错: {str(e)}")
pass
# 第三步:验证是否还有未替换的占位符(使用正则表达式匹配所有可能的占位符)
remaining_placeholders = set()
for paragraph in doc.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
remaining_placeholders.add(field_code)
# 检查表格中的占位符
for table in doc.tables:
try:
if not table.rows:
continue
for row in table.rows:
try:
# 安全地访问 row.cells避免 docx 库在处理异常表格结构时的 bug
if not hasattr(row, 'cells'):
continue
# 使用 try-except 包裹,防止 IndexError
try:
cells = row.cells
except (IndexError, AttributeError) as e:
print(f"[WARN] 无法访问表格行的单元格,跳过该行: {str(e)}")
continue
for cell in cells:
try:
if hasattr(cell, 'paragraphs'):
for paragraph in cell.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
remaining_placeholders.add(field_code)
except Exception as e:
print(f"[WARN] 处理表格单元格时出错,跳过: {str(e)}")
continue
except Exception as e:
print(f"[WARN] 处理表格行时出错,跳过: {str(e)}")
continue
except Exception as e:
print(f"[WARN] 处理表格时出错,跳过该表格: {str(e)}")
continue
# 输出统计信息
print(f"[DEBUG] 占位符替换统计:")
print(f" - 模板中的占位符总数: {len(all_placeholders_in_template)}")
print(f" - 已替换的占位符: {sorted(replaced_placeholders)}")
print(f" - 总替换次数: {total_replacements}")
if remaining_placeholders:
print(f" - ⚠️ 仍有未替换的占位符: {sorted(remaining_placeholders)}")
print(f" - ⚠️ 警告:文档中仍存在占位符,可能格式不正确或替换逻辑有问题")
else:
print(f" - ✓ 所有占位符已成功替换")
# 保存到临时文件
temp_dir = tempfile.gettempdir()
output_file = os.path.join(temp_dir, f"filled_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx")
# 保存文档前,再次验证替换结果(用于调试)
print(f"[DEBUG] 保存前验证:检查文档中是否还有占位符...")
verification_placeholders = set()
for paragraph in doc.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
verification_placeholders.add(field_code)
for table in doc.tables:
try:
if not table.rows:
continue
for row in table.rows:
try:
if not hasattr(row, 'cells'):
continue
try:
cells = row.cells
except (IndexError, AttributeError):
continue
for cell in cells:
try:
if hasattr(cell, 'paragraphs'):
for paragraph in cell.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
verification_placeholders.add(field_code)
except Exception:
continue
except Exception:
continue
except Exception:
continue
if verification_placeholders:
print(f"[WARN] 保存前验证发现仍有占位符: {sorted(verification_placeholders)}")
else:
print(f"[DEBUG] 保存前验证通过:所有占位符已替换")
# 保存文档
try:
doc.save(output_file)
print(f"[DEBUG] 文档已保存到: {output_file}")
# 验证文件是否真的存在且大小大于0
import time
time.sleep(0.1) # 等待文件系统同步Ubuntu上可能需要
if not os.path.exists(output_file):
raise Exception(f"文件保存失败:文件不存在 {output_file}")
file_size = os.path.getsize(output_file)
if file_size == 0:
raise Exception(f"文件保存失败文件大小为0 {output_file}")
print(f"[DEBUG] 文件保存验证通过:文件大小 {file_size} 字节")
# 验证保存的文件内容是否正确(重新打开文件检查)
try:
verify_doc = Document(output_file)
verify_placeholders_in_saved = set()
for paragraph in verify_doc.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
verify_placeholders_in_saved.add(field_code)
for table in verify_doc.tables:
try:
if not table.rows:
continue
for row in table.rows:
try:
if not hasattr(row, 'cells'):
continue
try:
cells = row.cells
except (IndexError, AttributeError):
continue
for cell in cells:
try:
if hasattr(cell, 'paragraphs'):
for paragraph in cell.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
verify_placeholders_in_saved.add(field_code)
except Exception:
continue
except Exception:
continue
except Exception:
continue
if verify_placeholders_in_saved:
print(f"[WARN] 保存后验证:文件中仍有占位符: {sorted(verify_placeholders_in_saved)}")
print(f"[WARN] 这可能是导致Ubuntu上占位符未替换的原因")
else:
print(f"[DEBUG] 保存后验证通过:文件中所有占位符已替换")
except Exception as verify_error:
print(f"[WARN] 保存后验证失败(不影响功能): {str(verify_error)}")
# 在Ubuntu上可能需要显式同步文件系统
try:
import sys
if sys.platform != 'win32':
# 在非Windows系统上尝试同步文件系统
os.sync()
print(f"[DEBUG] 已同步文件系统非Windows系统")
except Exception as sync_error:
print(f"[WARN] 文件系统同步失败(不影响功能): {str(sync_error)}")
except Exception as save_error:
error_msg = f"保存文档失败: {str(save_error)}"
print(f"[ERROR] {error_msg}")
import traceback
print(traceback.format_exc())
raise Exception(error_msg)
return output_file
except IndexError as e:
# 索引越界错误,提供更详细的错误信息
import traceback
error_detail = traceback.format_exc()
raise Exception(f"填充模板失败: list index out of range. 详细信息: {str(e)}\n{error_detail}")
except Exception as e:
# 其他错误,提供详细的错误信息
import traceback
error_detail = traceback.format_exc()
raise Exception(f"填充模板失败: {str(e)}\n{error_detail}")
def upload_to_minio(self, file_path: str, file_name: str) -> str:
"""
上传文件到MinIO
Args:
file_path: 本地文件路径
file_name: 文件名称
Returns:
MinIO中的相对路径
"""
client = self.get_minio_client()
try:
# 生成MinIO对象路径相对路径
now = datetime.now()
# 使用日期路径组织文件,添加微秒确保唯一性
timestamp = f"{now.strftime('%Y%m%d%H%M%S')}{now.microsecond:06d}"
object_name = f"{self.tenant_id}/{timestamp}/{file_name}"
# 上传文件
client.fput_object(
self.bucket_name,
object_name,
file_path,
content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document'
)
# 返回相对路径(以/开头)
return f"/{object_name}"
except S3Error as e:
raise Exception(f"上传文件到MinIO失败: {str(e)}")
def generate_document(self, file_id: int, input_data: List[Dict], file_info: Dict) -> Dict:
"""
生成文档
Args:
file_id: 文件配置ID
input_data: 输入数据列表,格式: [{'fieldCode': 'xxx', 'fieldValue': 'xxx'}]
file_info: 文件信息,格式: {'fileId': 1, 'fileName': 'xxx.doc'}
Returns:
生成结果,包含: filePath
"""
# 获取文件配置
file_config = self.get_file_config_by_id(file_id)
if not file_config:
# 提供更详细的错误信息
raise Exception(
f"文件ID {file_id} 对应的模板不存在或未启用。"
f"请通过查询 f_polic_file_config 表获取有效的文件ID"
f"或访问 /api/file-configs 接口查看可用的文件配置列表。"
)
# 检查file_path是否存在
file_path = file_config.get('file_path')
if not file_path:
raise Exception(f"文件ID {file_id} ({file_config.get('name', '')}) 的文件路径(file_path)为空,请检查数据库配置")
# 将input_data转换为字典格式
field_data = {}
for item in input_data:
field_code = item.get('fieldCode', '')
field_value = item.get('fieldValue', '')
if field_code:
field_data[field_code] = field_value or ''
# 下载模板
template_path = None
filled_doc_path = None
try:
template_path = self.download_template_from_minio(file_path)
# 填充模板
filled_doc_path = self.fill_template(template_path, field_data)
# 生成文档名称(.docx格式
# 优先使用file_info中的fileName如果没有则使用数据库中的name
# 确保每个文件都使用自己的文件名
original_file_name = file_info.get('fileName') or file_info.get('name') or file_config.get('name', 'generated.doc')
print(f"[DEBUG] 文件ID: {file_id}, 原始文件名: {original_file_name}")
print(f"[DEBUG] file_info内容: {file_info}")
print(f"[DEBUG] file_config内容: {file_config}")
print(f"[DEBUG] 字段数据用于生成文档名: {field_data}")
generated_file_name = self.generate_document_name(original_file_name, field_data)
print(f"[DEBUG] 文件ID: {file_id}, 生成的文档名: {generated_file_name}")
# 上传到MinIO使用生成的文档名
file_path = self.upload_to_minio(filled_doc_path, generated_file_name)
# 生成预签名下载URL
download_url = self.generate_presigned_download_url(file_path)
return {
'filePath': file_path,
'fileName': generated_file_name, # 返回生成的文档名
'downloadUrl': download_url # 返回预签名下载URL
}
finally:
# 清理临时文件
if template_path and os.path.exists(template_path):
try:
os.remove(template_path)
except:
pass
if filled_doc_path and os.path.exists(filled_doc_path):
try:
os.remove(filled_doc_path)
except:
pass
def generate_document_id(self) -> str:
"""生成文档ID"""
now = datetime.now()
return f"DOC{now.strftime('%Y%m%d%H%M%S')}{str(now.microsecond)[:3]}"
def generate_document_name(self, original_file_name: str, field_data: Dict[str, str]) -> str:
"""
生成文档名称
Args:
original_file_name: 原始文件名称
field_data: 字段数据
Returns:
生成的文档名称,如 "请示报告卡_张三.docx"
"""
import re
# 提取文件基础名称(不含扩展名)
# 处理可能包含路径的情况
# 先移除路径,只保留文件名
file_name_only = Path(original_file_name).name
# 判断是否有扩展名(.doc, .docx等
# 如果最后有常见的文档扩展名则提取stem
if file_name_only.lower().endswith(('.doc', '.docx', '.txt', '.pdf')):
base_name = Path(file_name_only).stem
else:
# 如果没有扩展名,直接使用文件名
base_name = file_name_only
print(f"[DEBUG] 原始文件名: '{original_file_name}'")
print(f"[DEBUG] 提取的基础名称(清理前): '{base_name}'")
# 清理文件名中的特殊标记
# 1. 移除开头的数字和点(如 "1."、"2." 等),但保留后面的内容
# 使用非贪婪匹配,只匹配开头的数字和点
base_name = re.sub(r'^\d+\.\s*', '', base_name)
# 2. 移除括号及其内容(如 "XXX"、"(初核谈话)" 等)
base_name = re.sub(r'[(].*?[)]', '', base_name)
# 3. 清理首尾空白字符和多余的点
base_name = base_name.strip().strip('.')
# 4. 如果清理后为空或只有数字,使用原始文件名重新处理
if not base_name or base_name.isdigit():
print(f"[DEBUG] 清理后为空或只有数字,重新处理原始文件名")
# 从原始文件名中提取,但保留更多内容
temp_name = file_name_only
# 只移除括号,保留数字前缀(但格式化为更友好的形式)
temp_name = re.sub(r'[(].*?[)]', '', temp_name)
# 移除扩展名(如果存在)
if temp_name.lower().endswith(('.doc', '.docx', '.txt', '.pdf')):
temp_name = Path(temp_name).stem
temp_name = temp_name.strip().strip('.')
if temp_name:
base_name = temp_name
else:
base_name = "文档" # 最后的备选方案
print(f"[DEBUG] 清理后的基础名称: '{base_name}'")
# 尝试从字段数据中提取被核查人姓名作为后缀
suffix = ''
target_name = field_data.get('target_name', '')
if target_name and target_name.strip():
suffix = f"_{target_name.strip()}"
# 生成新文件名(确保是.docx格式
generated_name = f"{base_name}{suffix}.docx"
print(f"[DEBUG] 文档名称生成: '{original_file_name}' -> '{generated_name}' (base_name='{base_name}', suffix='{suffix}')")
return generated_name
def generate_presigned_download_url(self, file_path: str, expires_days: int = 7) -> Optional[str]:
"""
生成MinIO预签名下载URL
Args:
file_path: MinIO中的相对路径'/615873064429507639/20251205090700/初步核实审批表_张三.docx'
expires_days: URL有效期天数默认7天
Returns:
预签名下载URL如果生成失败则返回None
"""
try:
if not file_path:
return None
client = self.get_minio_client()
# 从相对路径中提取对象名称(去掉开头的/
object_name = file_path.lstrip('/')
# 生成预签名URL
url = client.presigned_get_object(
self.bucket_name,
object_name,
expires=timedelta(days=expires_days)
)
return url
except Exception as e:
# 如果生成URL失败记录错误但不影响主流程
print(f"生成预签名URL失败: {str(e)}")
return None