ai-business-write/update_template_hierarchy.py

473 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
根据 template_finish/ 目录结构更新 f_polic_file_config 表中的层级结构
"""
import os
import sys
import json
import pymysql
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from collections import defaultdict
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
TEMPLATE_BASE_DIR = 'template_finish'
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def normalize_name(name: str) -> str:
"""
标准化名称(去掉扩展名、括号内容、数字前缀等)
用于匹配数据库中的记录
"""
# 去掉扩展名
name = Path(name).stem if '.' in name else name
# 去掉括号内容
import re
name = re.sub(r'[(].*?[)]', '', name)
name = name.strip()
# 去掉数字前缀和点号
name = re.sub(r'^\d+[\.\-]?\s*', '', name)
name = name.strip()
return name
def scan_directory_structure(base_dir: str) -> Dict:
"""
扫描目录结构,构建层级关系
Returns:
字典,包含目录和文件的层级信息
"""
base_path = Path(base_dir)
if not base_path.exists():
print(f"错误: 目录不存在 - {base_dir}")
return {}
structure = {
'directories': [], # 目录节点列表
'files': [], # 文件节点列表
'name_to_id': {} # 名称到ID的映射用于查找parent_id
}
print("=" * 80)
print("扫描目录结构...")
print("=" * 80)
# 遍历所有目录和文件
for item in base_path.rglob("*"):
relative_path = item.relative_to(base_path)
parts = relative_path.parts
if item.is_dir():
# 目录节点
level = len(parts) - 1 # 层级从0开始
dir_name = parts[-1]
parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None
structure['directories'].append({
'name': dir_name,
'path': str(relative_path),
'level': level,
'parent_path': parent_path,
'parent_id': None # 稍后设置
})
elif item.is_file() and item.suffix == '.docx' and not item.name.startswith("~$"):
# 文件节点
level = len(parts) - 1
file_name = item.name
parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None
structure['files'].append({
'name': file_name,
'path': str(relative_path),
'level': level,
'parent_path': parent_path,
'parent_id': None, # 稍后设置
'file_path': str(item)
})
# 按层级排序
structure['directories'].sort(key=lambda x: (x['level'], x['path']))
structure['files'].sort(key=lambda x: (x['level'], x['path']))
print(f"找到 {len(structure['directories'])} 个目录节点")
print(f"找到 {len(structure['files'])} 个文件节点")
return structure
def get_existing_templates(conn) -> Dict:
"""
获取数据库中现有的模板记录
Returns:
字典key为标准化名称value为模板信息
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, parent_id, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
result = {}
for template in templates:
normalized_name = normalize_name(template['name'])
result[normalized_name] = {
'id': template['id'],
'name': template['name'],
'normalized_name': normalized_name,
'parent_id': template['parent_id'],
'file_path': template['file_path'],
'state': template['state']
}
cursor.close()
return result
def find_template_by_name(existing_templates: Dict, name: str, prefer_directory: bool = False) -> Optional[Dict]:
"""
根据名称查找模板(支持标准化匹配)
Args:
name: 模板名称
prefer_directory: 是否优先匹配目录节点
"""
normalized = normalize_name(name)
# 精确匹配标准化名称
if normalized in existing_templates:
template = existing_templates[normalized]
# 如果prefer_directory为True且找到的是文件继续查找目录
if prefer_directory and template.get('file_path') is not None:
pass # 继续查找
else:
return template
# 模糊匹配(包含关系)
candidates = []
for key, template in existing_templates.items():
if key.startswith("DIR:"):
# 目录节点
if normalized in template.get('normalized_name', '') or template.get('normalized_name', '') in normalized:
candidates.append((template, True))
else:
# 文件节点
if normalized in template.get('normalized_name', '') or template.get('normalized_name', '') in normalized:
candidates.append((template, False))
# 如果prefer_directory优先返回目录节点
if prefer_directory:
for template, is_dir in candidates:
if is_dir:
return template
# 返回第一个匹配的
if candidates:
return candidates[0][0]
return None
def create_or_update_directory(conn, dir_info: Dict, parent_id: Optional[int], existing_templates: Dict) -> int:
"""
创建或更新目录节点
Returns:
目录节点的ID
"""
cursor = conn.cursor()
try:
# 先通过路径查找(最准确)
path_key = f"DIR:{dir_info['path']}"
existing = existing_templates.get(path_key)
# 如果没找到,再通过名称查找(优先目录节点)
if not existing:
existing = find_template_by_name(existing_templates, dir_info['name'], prefer_directory=True)
# 确保找到的是目录节点file_path为None
if existing and existing.get('file_path') is not None:
existing = None
if existing:
# 更新现有记录
template_id = existing['id']
if existing['parent_id'] != parent_id:
update_sql = """
UPDATE f_polic_file_config
SET parent_id = %s, updated_time = NOW(), updated_by = %s, state = 1
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" [UPDATE] 更新目录: {dir_info['name']} (ID: {template_id}, parent_id: {parent_id})")
else:
print(f" [KEEP] 保持目录: {dir_info['name']} (ID: {template_id})")
return template_id
else:
# 创建新记录
template_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
template_id,
TENANT_ID,
parent_id,
dir_info['name'],
None, # input_data
None, # file_path目录节点没有文件路径
CREATED_BY,
CREATED_BY,
1 # state: 1表示启用
))
conn.commit()
print(f" [CREATE] 创建目录: {dir_info['name']} (ID: {template_id}, parent_id: {parent_id})")
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"创建或更新目录失败: {str(e)}")
finally:
cursor.close()
def update_file_parent(conn, file_info: Dict, parent_id: Optional[int], existing_templates: Dict) -> Optional[int]:
"""
更新文件节点的parent_id
Returns:
文件节点的ID如果未找到则返回None
"""
cursor = conn.cursor()
try:
# 查找文件(使用文件名匹配)
existing = find_template_by_name(existing_templates, file_info['name'])
if existing:
template_id = existing['id']
if existing['parent_id'] != parent_id:
update_sql = """
UPDATE f_polic_file_config
SET parent_id = %s, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" [UPDATE] 更新文件: {file_info['name']} (ID: {template_id}, parent_id: {parent_id})")
else:
print(f" [KEEP] 保持文件: {file_info['name']} (ID: {template_id})")
return template_id
else:
print(f" [WARN] 未找到文件: {file_info['name']}")
return None
except Exception as e:
conn.rollback()
raise Exception(f"更新文件parent_id失败: {str(e)}")
finally:
cursor.close()
def build_path_to_id_map(structure: Dict, existing_templates: Dict, conn) -> Dict[str, int]:
"""
构建路径到ID的映射
Returns:
字典key为路径value为ID
"""
path_to_id = {}
# 处理目录节点(按层级顺序,确保父节点先处理)
# 按层级和路径排序
sorted_dirs = sorted(structure['directories'], key=lambda x: (x['level'], x['path']))
for dir_info in sorted_dirs:
parent_id = None
if dir_info['parent_path']:
parent_id = path_to_id.get(dir_info['parent_path'])
if parent_id is None:
print(f" [WARN] 未找到父目录: {dir_info['parent_path']}")
dir_id = create_or_update_directory(conn, dir_info, parent_id, existing_templates)
path_to_id[dir_info['path']] = dir_id
# 更新existing_templates以便后续查找使用完整路径作为key避免冲突
key = f"DIR:{dir_info['path']}"
existing_templates[key] = {
'id': dir_id,
'name': dir_info['name'],
'normalized_name': normalize_name(dir_info['name']),
'parent_id': parent_id,
'file_path': None,
'state': 1,
'path': dir_info['path']
}
# 同时用标准化名称存储(用于文件查找父目录)
normalized_key = normalize_name(dir_info['name'])
if normalized_key not in existing_templates or existing_templates[normalized_key].get('file_path') is not None:
# 只有当不存在或存在的是文件时才更新
existing_templates[normalized_key] = {
'id': dir_id,
'name': dir_info['name'],
'normalized_name': normalized_key,
'parent_id': parent_id,
'file_path': None,
'state': 1,
'path': dir_info['path']
}
return path_to_id
def update_file_hierarchy(structure: Dict, path_to_id: Dict[str, int], existing_templates: Dict, conn):
"""
更新文件节点的parent_id
"""
for file_info in structure['files']:
parent_id = None
if file_info['parent_path']:
parent_id = path_to_id.get(file_info['parent_path'])
update_file_parent(conn, file_info, parent_id, existing_templates)
def main():
"""主函数"""
print("=" * 80)
print("更新模板层级结构")
print("=" * 80)
print()
try:
# 连接数据库
print("1. 连接数据库...")
conn = pymysql.connect(**DB_CONFIG)
print("[OK] 数据库连接成功\n")
# 扫描目录结构
print("2. 扫描目录结构...")
structure = scan_directory_structure(TEMPLATE_BASE_DIR)
if not structure:
print("错误: 未找到任何目录或文件")
return
# 获取现有模板
print("\n3. 获取现有模板...")
existing_templates = get_existing_templates(conn)
print(f"[OK] 找到 {len(existing_templates)} 个现有模板\n")
# 构建路径到ID的映射处理目录节点
print("4. 创建/更新目录节点...")
print("=" * 80)
path_to_id = build_path_to_id_map(structure, existing_templates, conn)
print(f"\n[OK] 处理了 {len(path_to_id)} 个目录节点\n")
# 更新文件节点的parent_id
print("5. 更新文件节点的parent_id...")
print("=" * 80)
update_file_hierarchy(structure, path_to_id, existing_templates, conn)
print(f"\n[OK] 处理了 {len(structure['files'])} 个文件节点\n")
# 打印层级结构
print("6. 最终层级结构:")
print("=" * 80)
print_hierarchy(conn)
print("\n" + "=" * 80)
print("更新完成!")
print("=" * 80)
except Exception as e:
print(f"\n[ERROR] 发生错误: {e}")
import traceback
traceback.print_exc()
if 'conn' in locals():
conn.rollback()
finally:
if 'conn' in locals():
conn.close()
print("\n数据库连接已关闭")
def print_hierarchy(conn, parent_id=None, level=0, prefix=""):
"""打印层级结构"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
if parent_id is None:
sql = """
SELECT id, name, parent_id, file_path
FROM f_polic_file_config
WHERE tenant_id = %s AND parent_id IS NULL
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID,))
else:
sql = """
SELECT id, name, parent_id, file_path
FROM f_polic_file_config
WHERE tenant_id = %s AND parent_id = %s
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID, parent_id))
items = cursor.fetchall()
for i, item in enumerate(items):
is_last = i == len(items) - 1
current_prefix = prefix + ("└── " if is_last else "├── ")
next_prefix = prefix + (" " if is_last else "")
node_type = "📁" if item['file_path'] is None else "📄"
print(f"{current_prefix}{node_type} {item['name']} (ID: {item['id']})")
# 递归打印子节点
print_hierarchy(conn, item['id'], level + 1, next_prefix)
finally:
cursor.close()
if __name__ == '__main__':
main()