""" 根据 template_finish/ 目录结构更新 f_polic_file_config 表中的层级结构 使用路径作为唯一标识,确保正确建立层级关系 """ import os import sys import json import pymysql from pathlib import Path from typing import Dict, List, Optional, Tuple from collections import defaultdict # 设置输出编码为UTF-8(Windows兼容) if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') # 数据库连接配置 DB_CONFIG = { 'host': '152.136.177.240', 'port': 5012, 'user': 'finyx', 'password': '6QsGK6MpePZDE57Z', 'database': 'finyx', 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 TEMPLATE_BASE_DIR = 'template_finish' def generate_id(): """生成ID""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def scan_directory_structure(base_dir: str) -> Dict: """ 扫描目录结构,构建层级关系 Returns: 字典,包含目录和文件的层级信息 """ base_path = Path(base_dir) if not base_path.exists(): print(f"错误: 目录不存在 - {base_dir}") return {} structure = { 'directories': [], # 目录节点列表 'files': [] # 文件节点列表 } print("=" * 80) print("扫描目录结构...") print("=" * 80) # 遍历所有目录和文件 for item in base_path.rglob("*"): relative_path = item.relative_to(base_path) parts = relative_path.parts if item.is_dir(): # 目录节点 level = len(parts) - 1 # 层级(从0开始) dir_name = parts[-1] parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None structure['directories'].append({ 'name': dir_name, 'path': str(relative_path), 'level': level, 'parent_path': parent_path }) elif item.is_file() and item.suffix == '.docx' and not item.name.startswith("~$"): # 文件节点 level = len(parts) - 1 file_name = item.name parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None structure['files'].append({ 'name': file_name, 'path': str(relative_path), 'level': level, 'parent_path': parent_path, 'file_path': str(item) }) # 按层级排序 structure['directories'].sort(key=lambda x: (x['level'], x['path'])) structure['files'].sort(key=lambda x: (x['level'], x['path'])) print(f"找到 {len(structure['directories'])} 个目录节点") print(f"找到 {len(structure['files'])} 个文件节点") return structure def get_existing_templates(conn) -> Dict: """ 获取数据库中现有的模板记录 Returns: 字典,key为file_path(如果存在)或名称,value为模板信息列表 """ cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, parent_id, file_path, state FROM f_polic_file_config WHERE tenant_id = %s """ cursor.execute(sql, (TENANT_ID,)) templates = cursor.fetchall() result = {} for template in templates: # 优先使用file_path作为key(更准确) if template['file_path']: key = template['file_path'] if key not in result: result[key] = [] result[key].append({ 'id': template['id'], 'name': template['name'], 'parent_id': template['parent_id'], 'file_path': template['file_path'], 'state': template['state'] }) # 同时使用名称作为key(用于目录节点和没有file_path的记录) name_key = template['name'] if name_key not in result: result[name_key] = [] result[name_key].append({ 'id': template['id'], 'name': template['name'], 'parent_id': template['parent_id'], 'file_path': template['file_path'], 'state': template['state'] }) cursor.close() return result def create_or_update_directory(conn, dir_name: str, parent_id: Optional[int], existing_templates: Dict) -> int: """ 创建或更新目录节点 Returns: 目录节点的ID """ cursor = conn.cursor() try: # 查找是否已存在(通过名称精确匹配,且file_path为None) candidates = existing_templates.get(dir_name, []) existing = None for candidate in candidates: if candidate.get('file_path') is None: # 目录节点 existing = candidate break if existing: # 更新现有目录记录 template_id = existing['id'] if existing['parent_id'] != parent_id: update_sql = """ UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s, state = 1 WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID)) conn.commit() print(f" [UPDATE] 更新目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})") else: print(f" [KEEP] 保持目录: {dir_name} (ID: {template_id})") return template_id else: # 创建新目录记录 template_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s) """ cursor.execute(insert_sql, ( template_id, TENANT_ID, parent_id, dir_name, None, # input_data None, # file_path(目录节点没有文件路径) CREATED_BY, CREATED_BY, 1 # state: 1表示启用 )) conn.commit() print(f" [CREATE] 创建目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})") # 更新existing_templates if dir_name not in existing_templates: existing_templates[dir_name] = [] existing_templates[dir_name].append({ 'id': template_id, 'name': dir_name, 'parent_id': parent_id, 'file_path': None, 'state': 1 }) return template_id except Exception as e: conn.rollback() raise Exception(f"创建或更新目录失败: {str(e)}") finally: cursor.close() def normalize_name(name: str) -> str: """ 标准化名称(去掉扩展名、括号内容、数字前缀等) """ import re # 去掉扩展名 name = Path(name).stem if '.' in name else name # 去掉括号内容 name = re.sub(r'[((].*?[))]', '', name) name = name.strip() # 去掉数字前缀和点号 name = re.sub(r'^\d+[\.\-]?\s*', '', name) name = name.strip() return name def update_file_parent(conn, file_info: Dict, parent_id: Optional[int], existing_templates: Dict) -> Optional[int]: """ 更新文件节点的parent_id Args: file_info: 文件信息,包含name和file_path parent_id: 父节点ID Returns: 文件节点的ID,如果未找到则返回None """ cursor = conn.cursor() try: file_name = file_info['name'] # 根据文件路径构建预期的MinIO路径 # 文件路径格式: template_finish/2-初核模版/1.初核请示/1.请示报告卡(XXX).docx # MinIO路径格式: /615873064429507639/TEMPLATE/2025/12/1.请示报告卡(XXX).docx expected_path = None if 'file_path' in file_info: # 从相对路径构建MinIO路径 from datetime import datetime now = datetime.now() relative_path = file_info.get('path', '') file_name_only = Path(file_name).name expected_path = f'/615873064429507639/TEMPLATE/{now.year}/{now.month:02d}/{file_name_only}' # 优先通过file_path匹配(最准确) existing = None if expected_path and expected_path in existing_templates: candidates = existing_templates[expected_path] if candidates: existing = candidates[0] # 如果没找到,通过标准化名称匹配 if not existing: normalized_file_name = normalize_name(file_name) candidates = [] for key, templates in existing_templates.items(): for template in templates: if template.get('file_path'): # 只匹配有file_path的(文件节点) normalized_template_name = normalize_name(template['name']) if normalized_template_name == normalized_file_name: # 检查file_path是否匹配 if expected_path and template['file_path'] == expected_path: existing = template break candidates.append(template) if existing: break if not existing and candidates: # 如果有多个候选,选择parent_id匹配的 for candidate in candidates: if candidate['parent_id'] == parent_id: existing = candidate break # 如果还是没找到,选择第一个 if not existing: existing = candidates[0] if not existing: print(f" [WARN] 未找到文件: {file_name} (标准化: {normalize_name(file_name)})") return None template_id = existing['id'] if existing['parent_id'] != parent_id: update_sql = """ UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID)) conn.commit() print(f" [UPDATE] 更新文件: {file_name} -> {existing['name']} (ID: {template_id}, parent_id: {parent_id})") else: print(f" [KEEP] 保持文件: {file_name} -> {existing['name']} (ID: {template_id})") return template_id except Exception as e: conn.rollback() raise Exception(f"更新文件parent_id失败: {str(e)}") finally: cursor.close() def main(): """主函数""" print("=" * 80) print("更新模板层级结构") print("=" * 80) print() try: # 连接数据库 print("1. 连接数据库...") conn = pymysql.connect(**DB_CONFIG) print("[OK] 数据库连接成功\n") # 扫描目录结构 print("2. 扫描目录结构...") structure = scan_directory_structure(TEMPLATE_BASE_DIR) if not structure: print("错误: 未找到任何目录或文件") return # 获取现有模板 print("\n3. 获取现有模板...") existing_templates = get_existing_templates(conn) print(f"[OK] 找到 {len(existing_templates)} 个现有模板\n") # 构建路径到ID的映射(处理目录节点) print("4. 创建/更新目录节点...") print("=" * 80) path_to_id = {} # 按层级顺序处理目录 for dir_info in structure['directories']: parent_id = None if dir_info['parent_path']: parent_id = path_to_id.get(dir_info['parent_path']) dir_id = create_or_update_directory(conn, dir_info['name'], parent_id, existing_templates) path_to_id[dir_info['path']] = dir_id print(f"\n[OK] 处理了 {len(path_to_id)} 个目录节点\n") # 更新文件节点的parent_id print("5. 更新文件节点的parent_id...") print("=" * 80) for file_info in structure['files']: parent_id = None if file_info['parent_path']: parent_id = path_to_id.get(file_info['parent_path']) update_file_parent(conn, file_info, parent_id, existing_templates) print(f"\n[OK] 处理了 {len(structure['files'])} 个文件节点\n") # 打印层级结构 print("6. 最终层级结构:") print("=" * 80) print_hierarchy(conn) print("\n" + "=" * 80) print("更新完成!") print("=" * 80) except Exception as e: print(f"\n[ERROR] 发生错误: {e}") import traceback traceback.print_exc() if 'conn' in locals(): conn.rollback() finally: if 'conn' in locals(): conn.close() print("\n数据库连接已关闭") def print_hierarchy(conn, parent_id=None, level=0, prefix=""): """打印层级结构""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: if parent_id is None: sql = """ SELECT id, name, parent_id, file_path FROM f_polic_file_config WHERE tenant_id = %s AND parent_id IS NULL ORDER BY name """ cursor.execute(sql, (TENANT_ID,)) else: sql = """ SELECT id, name, parent_id, file_path FROM f_polic_file_config WHERE tenant_id = %s AND parent_id = %s ORDER BY name """ cursor.execute(sql, (TENANT_ID, parent_id)) items = cursor.fetchall() for i, item in enumerate(items): is_last = i == len(items) - 1 current_prefix = prefix + ("└── " if is_last else "├── ") next_prefix = prefix + (" " if is_last else "│ ") node_type = "📁" if item['file_path'] is None else "📄" print(f"{current_prefix}{node_type} {item['name']} (ID: {item['id']})") # 递归打印子节点 print_hierarchy(conn, item['id'], level + 1, next_prefix) finally: cursor.close() if __name__ == '__main__': main()