""" 处理"6.1保密承诺书(谈话对象使用-非中共党员用).docx" - 解析占位符 - 上传到MinIO - 更新数据库 """ import os import sys import re import json import pymysql from minio import Minio from minio.error import S3Error from datetime import datetime from pathlib import Path from docx import Document from typing import Dict, List, Optional, Tuple # 设置输出编码为UTF-8(Windows兼容) if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # MinIO连接配置 MINIO_CONFIG = { 'endpoint': 'minio.datacubeworld.com:9000', 'access_key': 'JOLXFXny3avFSzB0uRA5', 'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I', 'secure': True } # 数据库连接配置 DB_CONFIG = { 'host': '152.136.177.240', 'port': 5012, 'user': 'finyx', 'password': '6QsGK6MpePZDE57Z', 'database': 'finyx', 'charset': 'utf8mb4' } # 固定值 TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 BUCKET_NAME = 'finyx' # 文件路径 TEMPLATE_FILE = 'template_finish/2-初核模版/2.谈话审批/走读式谈话流程/6.1保密承诺书(谈话对象使用-非中共党员用).docx' PARENT_ID = 1765273962716807 # 走读式谈话流程的ID TEMPLATE_NAME = '6.1保密承诺书(谈话对象使用-非中共党员用)' def generate_id(): """生成ID""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def extract_placeholders_from_docx(file_path: str) -> List[str]: """ 从docx文件中提取所有占位符 Args: file_path: docx文件路径 Returns: 占位符列表,格式: ['field_code1', 'field_code2', ...] """ placeholders = set() pattern = r'\{\{([^}]+)\}\}' # 匹配 {{field_code}} 格式 try: doc = Document(file_path) # 从段落中提取占位符 for paragraph in doc.paragraphs: text = paragraph.text matches = re.findall(pattern, text) for match in matches: placeholders.add(match.strip()) # 从表格中提取占位符 for table in doc.tables: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: text = paragraph.text matches = re.findall(pattern, text) for match in matches: placeholders.add(match.strip()) except Exception as e: print(f" 错误: 读取文件失败 - {str(e)}") return [] return sorted(list(placeholders)) def upload_to_minio(file_path: str, minio_client: Minio) -> str: """ 上传文件到MinIO Args: file_path: 本地文件路径 minio_client: MinIO客户端实例 Returns: MinIO中的相对路径 """ try: # 检查存储桶是否存在 found = minio_client.bucket_exists(BUCKET_NAME) if not found: raise Exception(f"存储桶 '{BUCKET_NAME}' 不存在,请先创建") # 生成MinIO对象路径(使用当前日期) now = datetime.now() file_name = Path(file_path).name object_name = f'{TENANT_ID}/TEMPLATE/{now.year}/{now.month:02d}/{file_name}' # 上传文件 minio_client.fput_object( BUCKET_NAME, object_name, file_path, content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document' ) # 返回相对路径(以/开头) return f"/{object_name}" except S3Error as e: raise Exception(f"MinIO错误: {e}") except Exception as e: raise Exception(f"上传文件时发生错误: {e}") def get_db_fields(conn) -> Dict[str, Dict]: """ 获取数据库中所有字段(field_type=2的输出字段) Returns: 字典,key为filed_code,value为字段信息 """ cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, filed_code, field_type FROM f_polic_field WHERE tenant_id = %s AND field_type = 2 """ cursor.execute(sql, (TENANT_ID,)) fields = cursor.fetchall() result = {} for field in fields: result[field['filed_code']] = { 'id': field['id'], 'name': field['name'], 'filed_code': field['filed_code'], 'field_type': field['field_type'] } cursor.close() return result def match_placeholders_to_fields(placeholders: List[str], fields: Dict[str, Dict]) -> Tuple[List[int], List[str]]: """ 匹配占位符到数据库字段 Returns: (匹配的字段ID列表, 未匹配的占位符列表) """ matched_field_ids = [] unmatched_placeholders = [] for placeholder in placeholders: if placeholder in fields: matched_field_ids.append(fields[placeholder]['id']) else: unmatched_placeholders.append(placeholder) return matched_field_ids, unmatched_placeholders def create_or_update_template(conn, template_name: str, file_path: str, minio_path: str, parent_id: Optional[int]) -> int: """ 创建或更新模板记录 Returns: 模板ID """ cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 查找是否已存在(通过名称和parent_id匹配) sql = """ SELECT id, name, file_path, parent_id FROM f_polic_file_config WHERE tenant_id = %s AND name = %s AND parent_id = %s """ cursor.execute(sql, (TENANT_ID, template_name, parent_id)) existing = cursor.fetchone() if existing: # 更新现有记录 template_id = existing['id'] update_sql = """ UPDATE f_polic_file_config SET file_path = %s, updated_time = NOW(), updated_by = %s, state = 1 WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (minio_path, UPDATED_BY, template_id, TENANT_ID)) conn.commit() print(f" [UPDATE] 更新模板记录 (ID: {template_id})") return template_id else: # 创建新记录 template_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s) """ cursor.execute(insert_sql, ( template_id, TENANT_ID, parent_id, template_name, None, # input_data minio_path, CREATED_BY, CREATED_BY, 1 # state: 1表示启用 )) conn.commit() print(f" [CREATE] 创建模板记录 (ID: {template_id})") return template_id except Exception as e: conn.rollback() raise Exception(f"创建或更新模板失败: {str(e)}") finally: cursor.close() def update_template_field_relations(conn, template_id: int, field_ids: List[int]): """ 更新模板-字段关联关系 """ cursor = conn.cursor() try: # 删除旧的关联关系 delete_sql = """ DELETE FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s """ cursor.execute(delete_sql, (TENANT_ID, template_id)) # 插入新的关联关系 if field_ids: insert_sql = """ INSERT INTO f_polic_file_field (tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by) VALUES (%s, %s, %s, NOW(), %s, NOW(), %s) """ for field_id in field_ids: cursor.execute(insert_sql, (TENANT_ID, template_id, field_id, CREATED_BY, CREATED_BY)) conn.commit() print(f" [UPDATE] 更新字段关联关系: {len(field_ids)} 个字段") except Exception as e: conn.rollback() raise Exception(f"更新字段关联关系失败: {str(e)}") finally: cursor.close() def main(): """主函数""" print("=" * 80) print("处理保密承诺书(非中共党员用)模板") print("=" * 80) print() # 检查文件是否存在 if not os.path.exists(TEMPLATE_FILE): print(f"错误: 文件不存在 - {TEMPLATE_FILE}") return print(f"文件路径: {TEMPLATE_FILE}") print() try: # 1. 提取占位符 print("1. 提取占位符...") placeholders = extract_placeholders_from_docx(TEMPLATE_FILE) print(f" 找到 {len(placeholders)} 个占位符:") for i, placeholder in enumerate(placeholders, 1): print(f" {i}. {{{{ {placeholder} }}}}") print() # 2. 连接数据库和MinIO print("2. 连接数据库和MinIO...") conn = pymysql.connect(**DB_CONFIG) minio_client = Minio( MINIO_CONFIG['endpoint'], access_key=MINIO_CONFIG['access_key'], secret_key=MINIO_CONFIG['secret_key'], secure=MINIO_CONFIG['secure'] ) print(" [OK] 连接成功\n") # 3. 获取数据库字段 print("3. 获取数据库字段...") db_fields = get_db_fields(conn) print(f" [OK] 找到 {len(db_fields)} 个输出字段\n") # 4. 匹配占位符到字段 print("4. 匹配占位符到字段...") matched_field_ids, unmatched_placeholders = match_placeholders_to_fields(placeholders, db_fields) print(f" 匹配成功: {len(matched_field_ids)} 个") print(f" 未匹配: {len(unmatched_placeholders)} 个") if unmatched_placeholders: print(f" 未匹配的占位符: {', '.join(unmatched_placeholders)}") print() # 5. 上传到MinIO print("5. 上传到MinIO...") minio_path = upload_to_minio(TEMPLATE_FILE, minio_client) print(f" [OK] MinIO路径: {minio_path}\n") # 6. 创建或更新数据库记录 print("6. 创建或更新数据库记录...") template_id = create_or_update_template(conn, TEMPLATE_NAME, TEMPLATE_FILE, minio_path, PARENT_ID) print(f" [OK] 模板ID: {template_id}\n") # 7. 更新字段关联关系 print("7. 更新字段关联关系...") update_template_field_relations(conn, template_id, matched_field_ids) print() print("=" * 80) print("处理完成!") print("=" * 80) print(f"模板ID: {template_id}") print(f"MinIO路径: {minio_path}") print(f"关联字段数: {len(matched_field_ids)}") except Exception as e: print(f"\n[ERROR] 发生错误: {e}") import traceback traceback.print_exc() if 'conn' in locals(): conn.rollback() finally: if 'conn' in locals(): conn.close() print("\n数据库连接已关闭") if __name__ == '__main__': main()