""" 根据template_finish/目录结构更新数据库中的parent_id层级关系 1. 扫描目录结构 2. 创建/更新目录节点(作为父级) 3. 更新文件节点的parent_id """ import os import pymysql from pathlib import Path from typing import Dict, List, Optional from dotenv import load_dotenv # 加载环境变量 load_dotenv() # 数据库配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" def print_section(title): """打印章节标题""" print("\n" + "="*70) print(f" {title}") print("="*70) def print_result(success, message): """打印结果""" status = "[OK]" if success else "[FAIL]" print(f"{status} {message}") def generate_id(): """生成ID""" import time return int(time.time() * 1000000) def get_actual_tenant_id(conn) -> int: """获取数据库中的实际tenant_id""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: cursor.execute("SELECT DISTINCT tenant_id FROM f_polic_file_config LIMIT 1") result = cursor.fetchone() if result: return result['tenant_id'] return 1 finally: cursor.close() def scan_directory_structure(base_dir: Path) -> Dict: """ 扫描目录结构,构建层级关系 Returns: 字典,包含目录和文件的层级信息 """ structure = { 'directories': [], # 目录节点列表 'files': [] # 文件节点列表 } if not base_dir.exists(): return structure # 遍历所有目录和文件 for item in base_dir.rglob("*"): relative_path = item.relative_to(PROJECT_ROOT) relative_path_str = str(relative_path).replace('\\', '/') parts = relative_path.parts if item.is_dir(): # 目录节点 level = len(parts) - 1 # 层级(从0开始,template_finish是0级) dir_name = parts[-1] parent_path = str(Path(*parts[:-1])).replace('\\', '/') if len(parts) > 1 else None structure['directories'].append({ 'name': dir_name, 'path': relative_path_str, 'level': level, 'parent_path': parent_path }) elif item.is_file() and item.suffix.lower() in ['.docx', '.doc', '.wps']: # 文件节点 level = len(parts) - 1 file_name = item.name parent_path = str(Path(*parts[:-1])).replace('\\', '/') if len(parts) > 1 else None structure['files'].append({ 'name': file_name, 'path': relative_path_str, 'level': level, 'parent_path': parent_path }) # 按层级排序目录(确保父目录先处理) structure['directories'].sort(key=lambda x: (x['level'], x['path'])) return structure def get_existing_templates(conn, tenant_id: int) -> Dict: """ 获取数据库中现有的模板记录 Returns: 字典,包含按路径和名称索引的模板 """ cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, name, parent_id, file_path, state FROM f_polic_file_config WHERE tenant_id = %s """ cursor.execute(sql, (tenant_id,)) templates = cursor.fetchall() # 按file_path索引(用于文件匹配) by_path = {} # 按name索引(用于目录匹配) by_name = {} for template in templates: if template['file_path']: by_path[template['file_path']] = template else: # 目录节点(file_path为NULL) name = template['name'] if name not in by_name: by_name[name] = [] by_name[name].append(template) return { 'by_path': by_path, 'by_name': by_name } finally: cursor.close() def create_or_update_directory(conn, tenant_id: int, dir_name: str, dir_path: str, parent_id: Optional[int], existing_templates: Dict) -> int: """ 创建或更新目录节点 Returns: 目录节点的ID """ cursor = conn.cursor() try: # 查找现有目录(优先通过名称查找,且file_path为NULL) existing = None if dir_name in existing_templates['by_name']: # 找到同名的目录节点 for template in existing_templates['by_name'][dir_name]: if template['file_path'] is None: existing = template break if existing: # 更新现有目录记录 template_id = existing['id'] if existing['parent_id'] != parent_id: update_sql = """ UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s, state = 1 WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, tenant_id)) conn.commit() print(f" [UPDATE] 更新目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})") else: print(f" [KEEP] 保持目录: {dir_name} (ID: {template_id})") return template_id else: # 创建新目录记录 template_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( template_id, tenant_id, parent_id, dir_name, '{}', # input_data None, # file_path(目录节点没有文件路径) CREATED_BY, UPDATED_BY )) conn.commit() print(f" [CREATE] 创建目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})") # 更新existing_templates if dir_name not in existing_templates['by_name']: existing_templates['by_name'][dir_name] = [] existing_templates['by_name'][dir_name].append({ 'id': template_id, 'name': dir_name, 'parent_id': parent_id, 'file_path': None, 'state': 1 }) return template_id finally: cursor.close() def update_file_parent(conn, tenant_id: int, file_info: Dict, parent_id: Optional[int], existing_templates: Dict): """更新文件节点的parent_id""" cursor = conn.cursor() try: file_path = file_info['path'] # 查找现有文件记录 existing = existing_templates['by_path'].get(file_path) if existing: template_id = existing['id'] if existing['parent_id'] != parent_id: update_sql = """ UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, tenant_id)) conn.commit() print(f" [UPDATE] 更新文件: {file_info['name']} (ID: {template_id}, parent_id: {parent_id})") else: print(f" [KEEP] 保持文件: {file_info['name']} (ID: {template_id})") else: # 文件不存在,创建新记录 template_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( template_id, tenant_id, parent_id, file_info['name'], '{}', # input_data file_path, CREATED_BY, UPDATED_BY )) conn.commit() print(f" [CREATE] 创建文件: {file_info['name']} (ID: {template_id}, parent_id: {parent_id})") finally: cursor.close() def main(): """主函数""" print_section("更新模板层级结构") # 1. 连接数据库 print_section("1. 连接数据库") try: conn = pymysql.connect(**DB_CONFIG) print_result(True, "数据库连接成功") except Exception as e: print_result(False, f"数据库连接失败: {str(e)}") return try: # 2. 获取实际的tenant_id print_section("2. 获取实际的tenant_id") tenant_id = get_actual_tenant_id(conn) print_result(True, f"实际tenant_id: {tenant_id}") # 3. 扫描目录结构 print_section("3. 扫描目录结构") structure = scan_directory_structure(TEMPLATES_DIR) if not structure['directories'] and not structure['files']: print_result(False, "未找到任何目录或文件") return print(f" 找到 {len(structure['directories'])} 个目录") print(f" 找到 {len(structure['files'])} 个文件") # 4. 获取现有模板 print_section("4. 获取现有模板") existing_templates = get_existing_templates(conn, tenant_id) print(f" 现有文件模板: {len(existing_templates['by_path'])} 个") print(f" 现有目录模板: {sum(len(v) for v in existing_templates['by_name'].values())} 个") # 5. 创建/更新目录节点 print_section("5. 创建/更新目录节点") path_to_id = {} # 路径到ID的映射 # 按层级顺序处理目录(确保父目录先处理) for dir_info in structure['directories']: parent_id = None if dir_info['parent_path']: parent_id = path_to_id.get(dir_info['parent_path']) if parent_id is None: print(f" [WARN] 未找到父目录: {dir_info['parent_path']}") dir_id = create_or_update_directory( conn, tenant_id, dir_info['name'], dir_info['path'], parent_id, existing_templates ) path_to_id[dir_info['path']] = dir_id print_result(True, f"处理了 {len(path_to_id)} 个目录节点") # 6. 更新文件节点的parent_id print_section("6. 更新文件节点的parent_id") updated_count = 0 created_count = 0 kept_count = 0 for file_info in structure['files']: parent_id = None if file_info['parent_path']: parent_id = path_to_id.get(file_info['parent_path']) if parent_id is None: print(f" [WARN] 未找到父目录: {file_info['parent_path']}") # 检查是否需要更新 file_path = file_info['path'] existing = existing_templates['by_path'].get(file_path) if existing: if existing['parent_id'] != parent_id: update_file_parent(conn, tenant_id, file_info, parent_id, existing_templates) updated_count += 1 else: kept_count += 1 else: update_file_parent(conn, tenant_id, file_info, parent_id, existing_templates) created_count += 1 print_result(True, f"处理了 {len(structure['files'])} 个文件节点") print(f" - 更新: {updated_count} 个") print(f" - 创建: {created_count} 个") print(f" - 保持: {kept_count} 个") # 7. 验证层级结构 print_section("7. 验证层级结构") cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 统计各层级的节点数 cursor.execute(""" SELECT CASE WHEN parent_id IS NULL THEN '根节点' ELSE '子节点' END as node_type, COUNT(*) as count FROM f_polic_file_config WHERE tenant_id = %s GROUP BY node_type """, (tenant_id,)) stats = cursor.fetchall() for stat in stats: print(f" {stat['node_type']}: {stat['count']} 个") # 统计有parent_id的文件 cursor.execute(""" SELECT COUNT(*) as count FROM f_polic_file_config WHERE tenant_id = %s AND file_path IS NOT NULL AND parent_id IS NOT NULL """, (tenant_id,)) result = cursor.fetchone() print(f" 有父级的文件: {result['count']} 个") finally: cursor.close() finally: conn.close() print_result(True, "数据库连接已关闭") print_section("完成") if __name__ == "__main__": main()