""" 根据 template_finish/ 目录结构更新 f_polic_file_config 表中的层级结构 """ import os import sys import json import pymysql from pathlib import Path from typing import Dict, List, Optional, Tuple from collections import defaultdict # 设置输出编码为UTF-8(Windows兼容) if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # 数据库连接配置 DB_CONFIG = { 'host': '152.136.177.240', 'port': 5012, 'user': 'finyx', 'password': '6QsGK6MpePZDE57Z', 'database': 'finyx', 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 TEMPLATE_BASE_DIR = 'template_finish' def generate_id(): """生成ID""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def normalize_name(name: str) -> str: """ 标准化名称(去掉扩展名、括号内容、数字前缀等) 用于匹配数据库中的记录 """ # 去掉扩展名 name = Path(name).stem if '.' in name else name # 去掉括号内容 import re name = re.sub(r'[((].*?[))]', '', name) name = name.strip() # 去掉数字前缀和点号 name = re.sub(r'^\d+[\.\-]?\s*', '', name) name = name.strip() return name def scan_directory_structure(base_dir: str) -> Dict: """ 扫描目录结构,构建层级关系 Returns: 字典,包含目录和文件的层级信息 """ base_path = Path(base_dir) if not base_path.exists(): print(f"错误: 目录不存在 - {base_dir}") return {} structure = { 'directories': [], # 目录节点列表 'files': [], # 文件节点列表 'name_to_id': {} # 名称到ID的映射(用于查找parent_id) } print("=" * 80) print("扫描目录结构...") print("=" * 80) # 遍历所有目录和文件 for item in base_path.rglob("*"): relative_path = item.relative_to(base_path) parts = relative_path.parts if item.is_dir(): # 目录节点 level = len(parts) - 1 # 层级(从0开始) dir_name = parts[-1] parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None structure['directories'].append({ 'name': dir_name, 'path': str(relative_path), 'level': level, 'parent_path': parent_path, 'parent_id': None # 稍后设置 }) elif item.is_file() and item.suffix == '.docx' and not item.name.startswith("~$"): # 文件节点 level = len(parts) - 1 file_name = item.name parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None structure['files'].append({ 'name': file_name, 'path': str(relative_path), 'level': level, 'parent_path': parent_path, 'parent_id': None, # 稍后设置 'file_path': str(item) }) # 按层级排序 structure['directories'].sort(key=lambda x: (x['level'], x['path'])) structure['files'].sort(key=lambda x: (x['level'], x['path'])) print(f"找到 {len(structure['directories'])} 个目录节点") print(f"找到 {len(structure['files'])} 个文件节点") return structure def get_existing_templates(conn) -> Dict: """ 获取数据库中现有的模板记录 Returns: 字典,key为标准化名称,value为模板信息 """ cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, parent_id, file_path, state FROM f_polic_file_config WHERE tenant_id = %s """ cursor.execute(sql, (TENANT_ID,)) templates = cursor.fetchall() result = {} for template in templates: normalized_name = normalize_name(template['name']) result[normalized_name] = { 'id': template['id'], 'name': template['name'], 'normalized_name': normalized_name, 'parent_id': template['parent_id'], 'file_path': template['file_path'], 'state': template['state'] } cursor.close() return result def find_template_by_name(existing_templates: Dict, name: str, prefer_directory: bool = False) -> Optional[Dict]: """ 根据名称查找模板(支持标准化匹配) Args: name: 模板名称 prefer_directory: 是否优先匹配目录节点 """ normalized = normalize_name(name) # 精确匹配标准化名称 if normalized in existing_templates: template = existing_templates[normalized] # 如果prefer_directory为True,且找到的是文件,继续查找目录 if prefer_directory and template.get('file_path') is not None: pass # 继续查找 else: return template # 模糊匹配(包含关系) candidates = [] for key, template in existing_templates.items(): if key.startswith("DIR:"): # 目录节点 if normalized in template.get('normalized_name', '') or template.get('normalized_name', '') in normalized: candidates.append((template, True)) else: # 文件节点 if normalized in template.get('normalized_name', '') or template.get('normalized_name', '') in normalized: candidates.append((template, False)) # 如果prefer_directory,优先返回目录节点 if prefer_directory: for template, is_dir in candidates: if is_dir: return template # 返回第一个匹配的 if candidates: return candidates[0][0] return None def create_or_update_directory(conn, dir_info: Dict, parent_id: Optional[int], existing_templates: Dict) -> int: """ 创建或更新目录节点 Returns: 目录节点的ID """ cursor = conn.cursor() try: # 先通过路径查找(最准确) path_key = f"DIR:{dir_info['path']}" existing = existing_templates.get(path_key) # 如果没找到,再通过名称查找(优先目录节点) if not existing: existing = find_template_by_name(existing_templates, dir_info['name'], prefer_directory=True) # 确保找到的是目录节点(file_path为None) if existing and existing.get('file_path') is not None: existing = None if existing: # 更新现有记录 template_id = existing['id'] if existing['parent_id'] != parent_id: update_sql = """ UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s, state = 1 WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID)) conn.commit() print(f" [UPDATE] 更新目录: {dir_info['name']} (ID: {template_id}, parent_id: {parent_id})") else: print(f" [KEEP] 保持目录: {dir_info['name']} (ID: {template_id})") return template_id else: # 创建新记录 template_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s) """ cursor.execute(insert_sql, ( template_id, TENANT_ID, parent_id, dir_info['name'], None, # input_data None, # file_path(目录节点没有文件路径) CREATED_BY, CREATED_BY, 1 # state: 1表示启用 )) conn.commit() print(f" [CREATE] 创建目录: {dir_info['name']} (ID: {template_id}, parent_id: {parent_id})") return template_id except Exception as e: conn.rollback() raise Exception(f"创建或更新目录失败: {str(e)}") finally: cursor.close() def update_file_parent(conn, file_info: Dict, parent_id: Optional[int], existing_templates: Dict) -> Optional[int]: """ 更新文件节点的parent_id Returns: 文件节点的ID,如果未找到则返回None """ cursor = conn.cursor() try: # 查找文件(使用文件名匹配) existing = find_template_by_name(existing_templates, file_info['name']) if existing: template_id = existing['id'] if existing['parent_id'] != parent_id: update_sql = """ UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID)) conn.commit() print(f" [UPDATE] 更新文件: {file_info['name']} (ID: {template_id}, parent_id: {parent_id})") else: print(f" [KEEP] 保持文件: {file_info['name']} (ID: {template_id})") return template_id else: print(f" [WARN] 未找到文件: {file_info['name']}") return None except Exception as e: conn.rollback() raise Exception(f"更新文件parent_id失败: {str(e)}") finally: cursor.close() def build_path_to_id_map(structure: Dict, existing_templates: Dict, conn) -> Dict[str, int]: """ 构建路径到ID的映射 Returns: 字典,key为路径,value为ID """ path_to_id = {} # 处理目录节点(按层级顺序,确保父节点先处理) # 按层级和路径排序 sorted_dirs = sorted(structure['directories'], key=lambda x: (x['level'], x['path'])) for dir_info in sorted_dirs: parent_id = None if dir_info['parent_path']: parent_id = path_to_id.get(dir_info['parent_path']) if parent_id is None: print(f" [WARN] 未找到父目录: {dir_info['parent_path']}") dir_id = create_or_update_directory(conn, dir_info, parent_id, existing_templates) path_to_id[dir_info['path']] = dir_id # 更新existing_templates,以便后续查找(使用完整路径作为key避免冲突) key = f"DIR:{dir_info['path']}" existing_templates[key] = { 'id': dir_id, 'name': dir_info['name'], 'normalized_name': normalize_name(dir_info['name']), 'parent_id': parent_id, 'file_path': None, 'state': 1, 'path': dir_info['path'] } # 同时用标准化名称存储(用于文件查找父目录) normalized_key = normalize_name(dir_info['name']) if normalized_key not in existing_templates or existing_templates[normalized_key].get('file_path') is not None: # 只有当不存在或存在的是文件时才更新 existing_templates[normalized_key] = { 'id': dir_id, 'name': dir_info['name'], 'normalized_name': normalized_key, 'parent_id': parent_id, 'file_path': None, 'state': 1, 'path': dir_info['path'] } return path_to_id def update_file_hierarchy(structure: Dict, path_to_id: Dict[str, int], existing_templates: Dict, conn): """ 更新文件节点的parent_id """ for file_info in structure['files']: parent_id = None if file_info['parent_path']: parent_id = path_to_id.get(file_info['parent_path']) update_file_parent(conn, file_info, parent_id, existing_templates) def main(): """主函数""" print("=" * 80) print("更新模板层级结构") print("=" * 80) print() try: # 连接数据库 print("1. 连接数据库...") conn = pymysql.connect(**DB_CONFIG) print("[OK] 数据库连接成功\n") # 扫描目录结构 print("2. 扫描目录结构...") structure = scan_directory_structure(TEMPLATE_BASE_DIR) if not structure: print("错误: 未找到任何目录或文件") return # 获取现有模板 print("\n3. 获取现有模板...") existing_templates = get_existing_templates(conn) print(f"[OK] 找到 {len(existing_templates)} 个现有模板\n") # 构建路径到ID的映射(处理目录节点) print("4. 创建/更新目录节点...") print("=" * 80) path_to_id = build_path_to_id_map(structure, existing_templates, conn) print(f"\n[OK] 处理了 {len(path_to_id)} 个目录节点\n") # 更新文件节点的parent_id print("5. 更新文件节点的parent_id...") print("=" * 80) update_file_hierarchy(structure, path_to_id, existing_templates, conn) print(f"\n[OK] 处理了 {len(structure['files'])} 个文件节点\n") # 打印层级结构 print("6. 最终层级结构:") print("=" * 80) print_hierarchy(conn) print("\n" + "=" * 80) print("更新完成!") print("=" * 80) except Exception as e: print(f"\n[ERROR] 发生错误: {e}") import traceback traceback.print_exc() if 'conn' in locals(): conn.rollback() finally: if 'conn' in locals(): conn.close() print("\n数据库连接已关闭") def print_hierarchy(conn, parent_id=None, level=0, prefix=""): """打印层级结构""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: if parent_id is None: sql = """ SELECT id, name, parent_id, file_path FROM f_polic_file_config WHERE tenant_id = %s AND parent_id IS NULL ORDER BY name """ cursor.execute(sql, (TENANT_ID,)) else: sql = """ SELECT id, name, parent_id, file_path FROM f_polic_file_config WHERE tenant_id = %s AND parent_id = %s ORDER BY name """ cursor.execute(sql, (TENANT_ID, parent_id)) items = cursor.fetchall() for i, item in enumerate(items): is_last = i == len(items) - 1 current_prefix = prefix + ("└── " if is_last else "├── ") next_prefix = prefix + (" " if is_last else "│ ") node_type = "📁" if item['file_path'] is None else "📄" print(f"{current_prefix}{node_type} {item['name']} (ID: {item['id']})") # 递归打印子节点 print_hierarchy(conn, item['id'], level + 1, next_prefix) finally: cursor.close() if __name__ == '__main__': main()