ai-business-write/update_template_hierarchy_final.py

406 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
根据 template_finish/ 目录结构更新 f_polic_file_config 表中的层级结构
使用file_path作为唯一标识确保正确建立层级关系
"""
import os
import sys
import json
import pymysql
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from collections import defaultdict
from datetime import datetime
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
TEMPLATE_BASE_DIR = 'template_finish'
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def scan_directory_structure(base_dir: str) -> Dict:
"""
扫描目录结构,构建层级关系
Returns:
字典,包含目录和文件的层级信息
"""
base_path = Path(base_dir)
if not base_path.exists():
print(f"错误: 目录不存在 - {base_dir}")
return {}
structure = {
'directories': [], # 目录节点列表
'files': [] # 文件节点列表
}
print("=" * 80)
print("扫描目录结构...")
print("=" * 80)
# 遍历所有目录和文件
for item in base_path.rglob("*"):
relative_path = item.relative_to(base_path)
parts = relative_path.parts
if item.is_dir():
# 目录节点
level = len(parts) - 1 # 层级从0开始
dir_name = parts[-1]
parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None
structure['directories'].append({
'name': dir_name,
'path': str(relative_path),
'level': level,
'parent_path': parent_path
})
elif item.is_file() and item.suffix == '.docx' and not item.name.startswith("~$"):
# 文件节点
level = len(parts) - 1
file_name = item.name
parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None
# 构建MinIO路径
now = datetime.now()
minio_path = f'/615873064429507639/TEMPLATE/{now.year}/{now.month:02d}/{file_name}'
structure['files'].append({
'name': file_name,
'path': str(relative_path),
'level': level,
'parent_path': parent_path,
'file_path': str(item),
'minio_path': minio_path
})
# 按层级排序
structure['directories'].sort(key=lambda x: (x['level'], x['path']))
structure['files'].sort(key=lambda x: (x['level'], x['path']))
print(f"找到 {len(structure['directories'])} 个目录节点")
print(f"找到 {len(structure['files'])} 个文件节点")
return structure
def get_existing_templates(conn) -> Dict:
"""
获取数据库中现有的模板记录
Returns:
字典key为file_pathvalue为模板信息
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, parent_id, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
# 使用file_path作为key如果存在
result_by_path = {}
# 使用name作为key用于目录节点
result_by_name = {}
for template in templates:
if template['file_path']:
result_by_path[template['file_path']] = {
'id': template['id'],
'name': template['name'],
'parent_id': template['parent_id'],
'file_path': template['file_path'],
'state': template['state']
}
else:
# 目录节点
name = template['name']
if name not in result_by_name:
result_by_name[name] = []
result_by_name[name].append({
'id': template['id'],
'name': template['name'],
'parent_id': template['parent_id'],
'file_path': None,
'state': template['state']
})
cursor.close()
return {
'by_path': result_by_path,
'by_name': result_by_name
}
def create_or_update_directory(conn, dir_name: str, parent_id: Optional[int], existing_templates: Dict) -> int:
"""
创建或更新目录节点
Returns:
目录节点的ID
"""
cursor = conn.cursor()
try:
# 查找是否已存在通过名称精确匹配且file_path为None
candidates = existing_templates['by_name'].get(dir_name, [])
existing = None
for candidate in candidates:
if candidate.get('file_path') is None: # 目录节点
# 如果parent_id匹配优先选择
if candidate['parent_id'] == parent_id:
existing = candidate
break
elif existing is None:
existing = candidate
if existing:
# 更新现有目录记录
template_id = existing['id']
if existing['parent_id'] != parent_id:
update_sql = """
UPDATE f_polic_file_config
SET parent_id = %s, updated_time = NOW(), updated_by = %s, state = 1
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" [UPDATE] 更新目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})")
else:
print(f" [KEEP] 保持目录: {dir_name} (ID: {template_id})")
return template_id
else:
# 创建新目录记录
template_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
template_id,
TENANT_ID,
parent_id,
dir_name,
None, # input_data
None, # file_path目录节点没有文件路径
CREATED_BY,
CREATED_BY,
1 # state: 1表示启用
))
conn.commit()
print(f" [CREATE] 创建目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})")
# 更新existing_templates
if dir_name not in existing_templates['by_name']:
existing_templates['by_name'][dir_name] = []
existing_templates['by_name'][dir_name].append({
'id': template_id,
'name': dir_name,
'parent_id': parent_id,
'file_path': None,
'state': 1
})
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"创建或更新目录失败: {str(e)}")
finally:
cursor.close()
def update_file_parent(conn, file_info: Dict, parent_id: Optional[int], existing_templates: Dict) -> Optional[int]:
"""
更新文件节点的parent_id
Args:
file_info: 文件信息包含name、minio_path等
parent_id: 父节点ID
Returns:
文件节点的ID如果未找到则返回None
"""
cursor = conn.cursor()
try:
file_name = file_info['name']
minio_path = file_info.get('minio_path')
# 优先通过file_pathminio_path匹配最准确
existing = None
if minio_path and minio_path in existing_templates['by_path']:
existing = existing_templates['by_path'][minio_path]
if not existing:
print(f" [WARN] 未找到文件: {file_name} (MinIO路径: {minio_path})")
return None
template_id = existing['id']
if existing['parent_id'] != parent_id:
update_sql = """
UPDATE f_polic_file_config
SET parent_id = %s, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" [UPDATE] 更新文件: {file_name} (ID: {template_id}, parent_id: {parent_id})")
else:
print(f" [KEEP] 保持文件: {file_name} (ID: {template_id})")
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"更新文件parent_id失败: {str(e)}")
finally:
cursor.close()
def main():
"""主函数"""
print("=" * 80)
print("更新模板层级结构")
print("=" * 80)
print()
try:
# 连接数据库
print("1. 连接数据库...")
conn = pymysql.connect(**DB_CONFIG)
print("[OK] 数据库连接成功\n")
# 扫描目录结构
print("2. 扫描目录结构...")
structure = scan_directory_structure(TEMPLATE_BASE_DIR)
if not structure:
print("错误: 未找到任何目录或文件")
return
# 获取现有模板
print("\n3. 获取现有模板...")
existing_templates = get_existing_templates(conn)
print(f"[OK] 找到 {len(existing_templates['by_path'])} 个文件模板")
print(f"[OK] 找到 {sum(len(v) for v in existing_templates['by_name'].values())} 个目录模板\n")
# 构建路径到ID的映射处理目录节点
print("4. 创建/更新目录节点...")
print("=" * 80)
path_to_id = {}
# 按层级顺序处理目录
for dir_info in structure['directories']:
parent_id = None
if dir_info['parent_path']:
parent_id = path_to_id.get(dir_info['parent_path'])
dir_id = create_or_update_directory(conn, dir_info['name'], parent_id, existing_templates)
path_to_id[dir_info['path']] = dir_id
print(f"\n[OK] 处理了 {len(path_to_id)} 个目录节点\n")
# 更新文件节点的parent_id
print("5. 更新文件节点的parent_id...")
print("=" * 80)
for file_info in structure['files']:
parent_id = None
if file_info['parent_path']:
parent_id = path_to_id.get(file_info['parent_path'])
update_file_parent(conn, file_info, parent_id, existing_templates)
print(f"\n[OK] 处理了 {len(structure['files'])} 个文件节点\n")
# 打印层级结构
print("6. 最终层级结构:")
print("=" * 80)
print_hierarchy(conn)
print("\n" + "=" * 80)
print("更新完成!")
print("=" * 80)
except Exception as e:
print(f"\n[ERROR] 发生错误: {e}")
import traceback
traceback.print_exc()
if 'conn' in locals():
conn.rollback()
finally:
if 'conn' in locals():
conn.close()
print("\n数据库连接已关闭")
def print_hierarchy(conn, parent_id=None, level=0, prefix=""):
"""打印层级结构"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
if parent_id is None:
sql = """
SELECT id, name, parent_id, file_path
FROM f_polic_file_config
WHERE tenant_id = %s AND parent_id IS NULL
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID,))
else:
sql = """
SELECT id, name, parent_id, file_path
FROM f_polic_file_config
WHERE tenant_id = %s AND parent_id = %s
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID, parent_id))
items = cursor.fetchall()
for i, item in enumerate(items):
is_last = i == len(items) - 1
current_prefix = prefix + ("└── " if is_last else "├── ")
next_prefix = prefix + (" " if is_last else "")
node_type = "📁" if item['file_path'] is None else "📄"
print(f"{current_prefix}{node_type} {item['name']} (ID: {item['id']})")
# 递归打印子节点
print_hierarchy(conn, item['id'], level + 1, next_prefix)
finally:
cursor.close()
if __name__ == '__main__':
main()