""" 初始化所有模板到数据库和MinIO 处理模板文件夹下的所有模板文件,上传到MinIO并更新数据库 """ import os import json import pymysql from minio import Minio from minio.error import S3Error from datetime import datetime from pathlib import Path from typing import Dict, List, Optional # MinIO连接配置 MINIO_CONFIG = { 'endpoint': 'minio.datacubeworld.com:9000', 'access_key': 'JOLXFXny3avFSzB0uRA5', 'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I', 'secure': True # 使用HTTPS } # 数据库连接配置 DB_CONFIG = { 'host': '152.136.177.240', 'port': 5012, 'user': 'finyx', 'password': '6QsGK6MpePZDE57Z', 'database': 'finyx', 'charset': 'utf8mb4' } # 固定值 TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 CURRENT_TIME = datetime.now() # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" BUCKET_NAME = 'finyx' # 文档类型映射(根据完整文件名识别,保持原文件名不变) # 每个文件名都是独立的模板,使用完整文件名作为key DOCUMENT_TYPE_MAPPING = { "1.请示报告卡(XXX)": { "template_code": "REPORT_CARD", "name": "1.请示报告卡(XXX)", "business_type": "INVESTIGATION" }, "2.初步核实审批表(XXX)": { "template_code": "PRELIMINARY_VERIFICATION_APPROVAL", "name": "2.初步核实审批表(XXX)", "business_type": "INVESTIGATION" }, "3.附件初核方案(XXX)": { "template_code": "INVESTIGATION_PLAN", "name": "3.附件初核方案(XXX)", "business_type": "INVESTIGATION" }, "谈话通知书第一联": { "template_code": "NOTIFICATION_LETTER_1", "name": "谈话通知书第一联", "business_type": "INVESTIGATION" }, "谈话通知书第二联": { "template_code": "NOTIFICATION_LETTER_2", "name": "谈话通知书第二联", "business_type": "INVESTIGATION" }, "谈话通知书第三联": { "template_code": "NOTIFICATION_LETTER_3", "name": "谈话通知书第三联", "business_type": "INVESTIGATION" }, "1.请示报告卡(初核谈话)": { "template_code": "REPORT_CARD_INTERVIEW", "name": "1.请示报告卡(初核谈话)", "business_type": "INVESTIGATION" }, "2谈话审批表": { "template_code": "INTERVIEW_APPROVAL_FORM", "name": "2谈话审批表", "business_type": "INVESTIGATION" }, "3.谈话前安全风险评估表": { "template_code": "PRE_INTERVIEW_RISK_ASSESSMENT", "name": "3.谈话前安全风险评估表", "business_type": "INVESTIGATION" }, "4.谈话方案": { "template_code": "INTERVIEW_PLAN", "name": "4.谈话方案", "business_type": "INVESTIGATION" }, "5.谈话后安全风险评估表": { "template_code": "POST_INTERVIEW_RISK_ASSESSMENT", "name": "5.谈话后安全风险评估表", "business_type": "INVESTIGATION" }, "1.谈话笔录": { "template_code": "INTERVIEW_RECORD", "name": "1.谈话笔录", "business_type": "INVESTIGATION" }, "2.谈话询问对象情况摸底调查30问": { "template_code": "INVESTIGATION_30_QUESTIONS", "name": "2.谈话询问对象情况摸底调查30问", "business_type": "INVESTIGATION" }, "3.被谈话人权利义务告知书": { "template_code": "RIGHTS_OBLIGATIONS_NOTICE", "name": "3.被谈话人权利义务告知书", "business_type": "INVESTIGATION" }, "4.点对点交接单": { "template_code": "HANDOVER_FORM", "name": "4.点对点交接单", "business_type": "INVESTIGATION" }, "4.点对点交接单2": { "template_code": "HANDOVER_FORM_2", "name": "4.点对点交接单2", "business_type": "INVESTIGATION" }, "5.陪送交接单(新)": { "template_code": "ESCORT_HANDOVER_FORM", "name": "5.陪送交接单(新)", "business_type": "INVESTIGATION" }, "6.1保密承诺书(谈话对象使用-非中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY", "name": "6.1保密承诺书(谈话对象使用-非中共党员用)", "business_type": "INVESTIGATION" }, "6.2保密承诺书(谈话对象使用-中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_PARTY", "name": "6.2保密承诺书(谈话对象使用-中共党员用)", "business_type": "INVESTIGATION" }, "7.办案人员-办案安全保密承诺书": { "template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT", "name": "7.办案人员-办案安全保密承诺书", "business_type": "INVESTIGATION" }, "8-1请示报告卡(初核报告结论) ": { "template_code": "REPORT_CARD_CONCLUSION", "name": "8-1请示报告卡(初核报告结论) ", "business_type": "INVESTIGATION" }, "8.XXX初核情况报告": { "template_code": "INVESTIGATION_REPORT", "name": "8.XXX初核情况报告", "business_type": "INVESTIGATION" } } def generate_id(): """生成ID(使用时间戳+随机数的方式,模拟雪花算法)""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def identify_document_type(file_name: str) -> Optional[Dict]: """ 根据完整文件名识别文档类型(保持原文件名不变) Args: file_name: 文件名(不含扩展名) Returns: 文档类型配置,如果无法识别返回None """ # 获取文件名(不含扩展名),保持原样 base_name = Path(file_name).stem # 直接使用完整文件名进行精确匹配 if base_name in DOCUMENT_TYPE_MAPPING: return DOCUMENT_TYPE_MAPPING[base_name] # 如果精确匹配失败,返回None(不进行任何修改或模糊匹配) return None def upload_to_minio(file_path: Path) -> str: """ 上传文件到MinIO Args: file_path: 本地文件路径 Returns: MinIO中的相对路径 """ try: # 创建MinIO客户端 client = Minio( MINIO_CONFIG['endpoint'], access_key=MINIO_CONFIG['access_key'], secret_key=MINIO_CONFIG['secret_key'], secure=MINIO_CONFIG['secure'] ) # 检查存储桶是否存在 found = client.bucket_exists(BUCKET_NAME) if not found: raise Exception(f"存储桶 '{BUCKET_NAME}' 不存在,请先创建") # 生成MinIO对象路径 now = datetime.now() object_name = f'{TENANT_ID}/TEMPLATE/{now.year}/{now.month:02d}/{file_path.name}' # 上传文件 client.fput_object( BUCKET_NAME, object_name, str(file_path), content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document' ) # 返回相对路径(以/开头) return f"/{object_name}" except S3Error as e: raise Exception(f"MinIO错误: {e}") except Exception as e: raise Exception(f"上传文件时发生错误: {e}") def get_or_create_file_config(conn, doc_config: Dict, file_path: str) -> int: """ 获取或创建文件配置记录 Args: conn: 数据库连接 doc_config: 文档配置 file_path: MinIO文件路径 Returns: 文件配置ID """ cursor = conn.cursor() try: # 检查是否已存在 select_sql = """ SELECT id FROM f_polic_file_config WHERE tenant_id = %s AND template_code = %s """ cursor.execute(select_sql, (TENANT_ID, doc_config['template_code'])) existing = cursor.fetchone() if existing: file_config_id = existing[0] # 更新文件路径 update_sql = """ UPDATE f_polic_file_config SET file_path = %s, updated_time = %s, updated_by = %s WHERE id = %s """ cursor.execute(update_sql, (file_path, CURRENT_TIME, UPDATED_BY, file_config_id)) conn.commit() return file_config_id else: # 创建新记录 file_config_id = generate_id() input_data = json.dumps({ 'template_code': doc_config['template_code'], 'business_type': doc_config['business_type'] }, ensure_ascii=False) insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, template_code, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_sql, ( file_config_id, TENANT_ID, None, # parent_id doc_config['name'], input_data, file_path, doc_config['template_code'], CURRENT_TIME, CREATED_BY, CURRENT_TIME, CREATED_BY, 1 # state: 1表示启用 )) conn.commit() return file_config_id finally: cursor.close() def process_all_templates(): """ 处理所有模板文件,上传到MinIO并更新数据库 """ print("="*80) print("开始初始化所有模板") print("="*80) if not TEMPLATES_DIR.exists(): print(f"错误: 模板目录不存在: {TEMPLATES_DIR}") return # 连接数据库 try: conn = pymysql.connect(**DB_CONFIG) print("✓ 数据库连接成功\n") except Exception as e: print(f"✗ 数据库连接失败: {e}") return # 统计信息 processed_count = 0 skipped_count = 0 failed_count = 0 # 遍历所有.docx文件 for root, dirs, files in os.walk(TEMPLATES_DIR): for file in files: # 只处理.docx文件 if not file.endswith('.docx'): continue file_path = Path(root) / file # 识别文档类型 doc_config = identify_document_type(file) if not doc_config: print(f"\n⚠ 无法识别文档类型: {file}") print(f" 路径: {file_path}") skipped_count += 1 continue print(f"\n处理: {file}") print(f" 类型: {doc_config.get('template_code', 'UNKNOWN')}") print(f" 名称: {doc_config.get('name', 'UNKNOWN')}") try: # 上传到MinIO print(f" 上传到MinIO...") minio_path = upload_to_minio(file_path) print(f" ✓ MinIO路径: {minio_path}") # 更新数据库 print(f" 更新数据库...") file_config_id = get_or_create_file_config(conn, doc_config, minio_path) print(f" ✓ 文件配置ID: {file_config_id}") processed_count += 1 print(f" ✓ 处理成功") except Exception as e: failed_count += 1 print(f" ✗ 处理失败: {e}") import traceback traceback.print_exc() # 关闭数据库连接 conn.close() # 输出统计信息 print("\n" + "="*80) print("初始化完成") print("="*80) print(f"成功处理: {processed_count} 个文件") print(f"跳过: {skipped_count} 个文件") print(f"失败: {failed_count} 个文件") if __name__ == '__main__': process_all_templates()