""" 更新模板树状结构 根据 template_finish 目录结构更新数据库中的 parent_id 字段 """ import os import json import pymysql from pathlib import Path from typing import Dict, List, Optional, Tuple from datetime import datetime # 数据库连接配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" # 从 init_all_templates.py 复制的文档类型映射 DOCUMENT_TYPE_MAPPING = { "1.请示报告卡(XXX)": { "template_code": "REPORT_CARD", "name": "1.请示报告卡(XXX)", "business_type": "INVESTIGATION" }, "2.初步核实审批表(XXX)": { "template_code": "PRELIMINARY_VERIFICATION_APPROVAL", "name": "2.初步核实审批表(XXX)", "business_type": "INVESTIGATION" }, "3.附件初核方案(XXX)": { "template_code": "INVESTIGATION_PLAN", "name": "3.附件初核方案(XXX)", "business_type": "INVESTIGATION" }, "谈话通知书第一联": { "template_code": "NOTIFICATION_LETTER_1", "name": "谈话通知书第一联", "business_type": "INVESTIGATION" }, "谈话通知书第二联": { "template_code": "NOTIFICATION_LETTER_2", "name": "谈话通知书第二联", "business_type": "INVESTIGATION" }, "谈话通知书第三联": { "template_code": "NOTIFICATION_LETTER_3", "name": "谈话通知书第三联", "business_type": "INVESTIGATION" }, "1.请示报告卡(初核谈话)": { "template_code": "REPORT_CARD_INTERVIEW", "name": "1.请示报告卡(初核谈话)", "business_type": "INVESTIGATION" }, "2谈话审批表": { "template_code": "INTERVIEW_APPROVAL_FORM", "name": "2谈话审批表", "business_type": "INVESTIGATION" }, "3.谈话前安全风险评估表": { "template_code": "PRE_INTERVIEW_RISK_ASSESSMENT", "name": "3.谈话前安全风险评估表", "business_type": "INVESTIGATION" }, "4.谈话方案": { "template_code": "INTERVIEW_PLAN", "name": "4.谈话方案", "business_type": "INVESTIGATION" }, "5.谈话后安全风险评估表": { "template_code": "POST_INTERVIEW_RISK_ASSESSMENT", "name": "5.谈话后安全风险评估表", "business_type": "INVESTIGATION" }, "1.谈话笔录": { "template_code": "INTERVIEW_RECORD", "name": "1.谈话笔录", "business_type": "INVESTIGATION" }, "2.谈话询问对象情况摸底调查30问": { "template_code": "INVESTIGATION_30_QUESTIONS", "name": "2.谈话询问对象情况摸底调查30问", "business_type": "INVESTIGATION" }, "3.被谈话人权利义务告知书": { "template_code": "RIGHTS_OBLIGATIONS_NOTICE", "name": "3.被谈话人权利义务告知书", "business_type": "INVESTIGATION" }, "4.点对点交接单": { "template_code": "HANDOVER_FORM", "name": "4.点对点交接单", "business_type": "INVESTIGATION" }, "4.点对点交接单2": { "template_code": "HANDOVER_FORM_2", "name": "4.点对点交接单2", "business_type": "INVESTIGATION" }, "5.陪送交接单(新)": { "template_code": "ESCORT_HANDOVER_FORM", "name": "5.陪送交接单(新)", "business_type": "INVESTIGATION" }, "6.1保密承诺书(谈话对象使用-非中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY", "name": "6.1保密承诺书(谈话对象使用-非中共党员用)", "business_type": "INVESTIGATION" }, "6.2保密承诺书(谈话对象使用-中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_PARTY", "name": "6.2保密承诺书(谈话对象使用-中共党员用)", "business_type": "INVESTIGATION" }, "7.办案人员-办案安全保密承诺书": { "template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT", "name": "7.办案人员-办案安全保密承诺书", "business_type": "INVESTIGATION" }, "8-1请示报告卡(初核报告结论) ": { "template_code": "REPORT_CARD_CONCLUSION", "name": "8-1请示报告卡(初核报告结论) ", "business_type": "INVESTIGATION" }, "8.XXX初核情况报告": { "template_code": "INVESTIGATION_REPORT", "name": "8.XXX初核情况报告", "business_type": "INVESTIGATION" } } def generate_id(): """生成ID(使用时间戳+随机数的方式,模拟雪花算法)""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def normalize_name(name: str) -> str: """标准化名称,用于模糊匹配""" import re # 去掉开头的编号(如 "1."、"2."、"8-1" 等) name = re.sub(r'^\d+[\.\-]\s*', '', name) # 去掉括号及其内容(如 "(XXX)"、"(初核谈话)" 等) name = re.sub(r'[((].*?[))]', '', name) # 去掉空格和特殊字符 name = name.strip() return name def identify_document_type(file_name: str) -> Optional[Dict]: """根据完整文件名识别文档类型""" base_name = Path(file_name).stem if base_name in DOCUMENT_TYPE_MAPPING: return DOCUMENT_TYPE_MAPPING[base_name] return None def scan_directory_structure(base_dir: Path) -> Dict: """扫描目录结构,构建树状层级""" structure = { 'directories': {}, # {path: {'name': ..., 'parent': ..., 'level': ...}} 'files': {} # {file_path: {'name': ..., 'parent': ..., 'template_code': ...}} } def process_path(path: Path, parent_path: Optional[str] = None, level: int = 0): """递归处理路径""" if path.is_file() and path.suffix == '.docx': # 处理文件 file_name = path.stem doc_config = identify_document_type(file_name) structure['files'][str(path)] = { 'name': file_name, 'parent': parent_path, 'level': level, 'template_code': doc_config['template_code'] if doc_config else None, 'full_path': str(path), 'normalized_name': normalize_name(file_name) } elif path.is_dir(): # 处理目录 dir_name = path.name structure['directories'][str(path)] = { 'name': dir_name, 'parent': parent_path, 'level': level, 'normalized_name': normalize_name(dir_name) } # 递归处理子目录和文件 for child in sorted(path.iterdir()): if child.name != '__pycache__': process_path(child, str(path), level + 1) # 从根目录开始扫描 if TEMPLATES_DIR.exists(): for item in sorted(TEMPLATES_DIR.iterdir()): if item.name != '__pycache__': process_path(item, None, 0) return structure def find_matching_config(file_info: Dict, existing_data: Dict) -> Optional[Dict]: """ 查找匹配的数据库记录 优先级:1. template_code 精确匹配 2. 名称精确匹配 3. 标准化名称匹配 """ template_code = file_info.get('template_code') file_name = file_info['name'] normalized_name = file_info.get('normalized_name', normalize_name(file_name)) # 优先级1: template_code 精确匹配 if template_code: matched = existing_data['by_template_code'].get(template_code) if matched: return matched # 优先级2: 名称精确匹配 matched = existing_data['by_name'].get(file_name) if matched: return matched # 优先级3: 标准化名称匹配 candidates = existing_data['by_normalized_name'].get(normalized_name, []) if candidates: # 如果有多个候选,优先选择有正确 template_code 的 for candidate in candidates: if candidate.get('extracted_template_code') == template_code: return candidate # 否则返回第一个 return candidates[0] return None def get_existing_data(conn) -> Dict: """获取数据库中的现有数据""" cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, parent_id, template_code, input_data, file_path, state FROM f_polic_file_config WHERE tenant_id = %s """ cursor.execute(sql, (TENANT_ID,)) configs = cursor.fetchall() result = { 'by_id': {}, 'by_name': {}, 'by_template_code': {}, 'by_normalized_name': {} # 新增:标准化名称索引 } for config in configs: config_id = config['id'] config_name = config['name'] # 尝试从 input_data 中提取 template_code template_code = config.get('template_code') if not template_code and config.get('input_data'): try: input_data = json.loads(config['input_data']) if isinstance(config['input_data'], str) else config['input_data'] if isinstance(input_data, dict): template_code = input_data.get('template_code') except: pass config['extracted_template_code'] = template_code config['normalized_name'] = normalize_name(config_name) result['by_id'][config_id] = config result['by_name'][config_name] = config if template_code: # 如果已存在相同 template_code,保留第一个 if template_code not in result['by_template_code']: result['by_template_code'][template_code] = config # 标准化名称索引(可能有多个记录匹配同一个标准化名称) normalized = config['normalized_name'] if normalized not in result['by_normalized_name']: result['by_normalized_name'][normalized] = [] result['by_normalized_name'][normalized].append(config) cursor.close() return result def plan_tree_structure(dir_structure: Dict, existing_data: Dict) -> List[Dict]: """规划树状结构""" plan = [] # 按层级排序目录 directories = sorted(dir_structure['directories'].items(), key=lambda x: (x[1]['level'], x[0])) # 按层级排序文件 files = sorted(dir_structure['files'].items(), key=lambda x: (x[1]['level'], x[0])) # 创建目录映射(用于查找父目录ID) dir_id_map = {} # {dir_path: config_id} # 处理目录(按层级顺序) for dir_path, dir_info in directories: dir_name = dir_info['name'] parent_path = dir_info['parent'] level = dir_info['level'] # 查找父目录ID parent_id = None if parent_path: parent_id = dir_id_map.get(parent_path) # 查找匹配的数据库记录(使用改进的匹配逻辑) existing = find_matching_config(dir_info, existing_data) if existing: # 使用现有记录 plan.append({ 'type': 'directory', 'name': dir_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'update', 'config_id': existing['id'], 'current_parent_id': existing.get('parent_id') }) dir_id_map[dir_path] = existing['id'] else: # 创建新记录(目录节点) new_id = generate_id() plan.append({ 'type': 'directory', 'name': dir_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'create', 'config_id': new_id, 'current_parent_id': None }) dir_id_map[dir_path] = new_id # 处理文件 for file_path, file_info in files: file_name = file_info['name'] parent_path = file_info['parent'] level = file_info['level'] template_code = file_info['template_code'] # 查找父目录ID parent_id = dir_id_map.get(parent_path) if parent_path else None # 查找匹配的数据库记录(使用改进的匹配逻辑) existing = find_matching_config(file_info, existing_data) if existing: # 更新现有记录 plan.append({ 'type': 'file', 'name': file_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'update', 'config_id': existing['id'], 'template_code': template_code, 'current_parent_id': existing.get('parent_id') }) else: # 创建新记录(文件节点)- 这种情况应该很少,因为文件应该已经在数据库中 new_id = generate_id() plan.append({ 'type': 'file', 'name': file_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'create', 'config_id': new_id, 'template_code': template_code, 'current_parent_id': None }) return plan def print_preview(plan: List[Dict]): """打印更新预览""" print("\n" + "="*80) print("更新预览") print("="*80) # 按层级分组 by_level = {} for item in plan: level = item['level'] if level not in by_level: by_level[level] = [] by_level[level].append(item) # 按层级顺序显示 for level in sorted(by_level.keys()): print(f"\n【层级 {level}】") for item in by_level[level]: indent = " " * level if item['action'] == 'create': print(f"{indent}+ 创建: {item['name']} (ID: {item['config_id']})") if item['parent_name']: print(f"{indent} 父节点: {item['parent_name']}") else: current = item.get('current_parent_id', 'None') new = item.get('parent_id', 'None') if current != new: print(f"{indent}→ 更新: {item['name']} (ID: {item['config_id']})") print(f"{indent} parent_id: {current} → {new}") if item['parent_name']: print(f"{indent} 父节点: {item['parent_name']}") else: print(f"{indent}✓ 无需更新: {item['name']} (parent_id 已正确)") def execute_update(conn, plan: List[Dict], dry_run: bool = True): """执行更新""" cursor = conn.cursor() try: if not dry_run: conn.autocommit(False) # 按层级分组 by_level = {} for item in plan: level = item['level'] if level not in by_level: by_level[level] = [] by_level[level].append(item) create_count = 0 update_count = 0 skip_count = 0 # 按层级顺序处理(从顶层到底层) for level in sorted(by_level.keys()): for item in by_level[level]: if item['action'] == 'create': # 创建新记录 if not dry_run: if item['type'] == 'directory': insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s) """ cursor.execute(insert_sql, ( item['config_id'], TENANT_ID, item['parent_id'], item['name'], None, None, CREATED_BY, UPDATED_BY, 1 )) else: # 文件节点 input_data = json.dumps({ 'template_code': item.get('template_code', ''), 'business_type': 'INVESTIGATION' }, ensure_ascii=False) insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, template_code, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s) """ cursor.execute(insert_sql, ( item['config_id'], TENANT_ID, item['parent_id'], item['name'], input_data, None, item.get('template_code'), CREATED_BY, UPDATED_BY, 1 )) create_count += 1 print(f" ✓ {'[模拟]' if dry_run else ''}创建: {item['name']}") else: # 更新现有记录 current_parent = item.get('current_parent_id') new_parent = item.get('parent_id') if current_parent != new_parent: if not dry_run: update_sql = """ UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, ( new_parent, UPDATED_BY, item['config_id'], TENANT_ID )) update_count += 1 print(f" ✓ {'[模拟]' if dry_run else ''}更新: {item['name']} (parent_id: {current_parent} → {new_parent})") else: skip_count += 1 if not dry_run: conn.commit() print(f"\n✓ 更新完成!") else: print(f"\n[模拟模式] 未实际执行更新") print(f"\n统计:") print(f" - 创建: {create_count} 条") print(f" - 更新: {update_count} 条") print(f" - 跳过: {skip_count} 条") except Exception as e: if not dry_run: conn.rollback() print(f"\n✗ 更新失败: {e}") import traceback traceback.print_exc() raise finally: cursor.close() def main(): """主函数""" print("="*80) print("更新模板树状结构") print("="*80) # 连接数据库 try: conn = pymysql.connect(**DB_CONFIG) print("✓ 数据库连接成功\n") except Exception as e: print(f"✗ 数据库连接失败: {e}") return try: # 扫描目录结构 print("扫描目录结构...") dir_structure = scan_directory_structure(TEMPLATES_DIR) print(f" 找到 {len(dir_structure['directories'])} 个目录") print(f" 找到 {len(dir_structure['files'])} 个文件\n") # 获取数据库现有数据 print("获取数据库现有数据...") existing_data = get_existing_data(conn) print(f" 数据库中有 {len(existing_data['by_id'])} 条记录\n") # 规划树状结构 print("规划树状结构...") plan = plan_tree_structure(dir_structure, existing_data) print(f" 生成 {len(plan)} 个更新计划\n") # 打印预览 print_preview(plan) # 询问是否执行 print("\n" + "="*80) response = input("\n是否执行更新?(yes/no,默认no): ").strip().lower() if response == 'yes': # 先执行一次模拟 print("\n执行模拟更新...") execute_update(conn, plan, dry_run=True) # 再次确认 print("\n" + "="*80) confirm = input("\n确认执行实际更新?(yes/no,默认no): ").strip().lower() if confirm == 'yes': print("\n执行实际更新...") execute_update(conn, plan, dry_run=False) else: print("\n已取消更新") else: print("\n已取消更新") finally: conn.close() print("\n数据库连接已关闭") if __name__ == '__main__': main()