ai-business-write/update_template_hierarchy_v2.py

446 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
根据 template_finish/ 目录结构更新 f_polic_file_config 表中的层级结构
使用路径作为唯一标识,确保正确建立层级关系
"""
import os
import sys
import json
import pymysql
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from collections import defaultdict
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
TEMPLATE_BASE_DIR = 'template_finish'
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def scan_directory_structure(base_dir: str) -> Dict:
"""
扫描目录结构,构建层级关系
Returns:
字典,包含目录和文件的层级信息
"""
base_path = Path(base_dir)
if not base_path.exists():
print(f"错误: 目录不存在 - {base_dir}")
return {}
structure = {
'directories': [], # 目录节点列表
'files': [] # 文件节点列表
}
print("=" * 80)
print("扫描目录结构...")
print("=" * 80)
# 遍历所有目录和文件
for item in base_path.rglob("*"):
relative_path = item.relative_to(base_path)
parts = relative_path.parts
if item.is_dir():
# 目录节点
level = len(parts) - 1 # 层级从0开始
dir_name = parts[-1]
parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None
structure['directories'].append({
'name': dir_name,
'path': str(relative_path),
'level': level,
'parent_path': parent_path
})
elif item.is_file() and item.suffix == '.docx' and not item.name.startswith("~$"):
# 文件节点
level = len(parts) - 1
file_name = item.name
parent_path = str(Path(*parts[:-1])) if len(parts) > 1 else None
structure['files'].append({
'name': file_name,
'path': str(relative_path),
'level': level,
'parent_path': parent_path,
'file_path': str(item)
})
# 按层级排序
structure['directories'].sort(key=lambda x: (x['level'], x['path']))
structure['files'].sort(key=lambda x: (x['level'], x['path']))
print(f"找到 {len(structure['directories'])} 个目录节点")
print(f"找到 {len(structure['files'])} 个文件节点")
return structure
def get_existing_templates(conn) -> Dict:
"""
获取数据库中现有的模板记录
Returns:
字典key为file_path如果存在或名称value为模板信息列表
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, parent_id, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
result = {}
for template in templates:
# 优先使用file_path作为key更准确
if template['file_path']:
key = template['file_path']
if key not in result:
result[key] = []
result[key].append({
'id': template['id'],
'name': template['name'],
'parent_id': template['parent_id'],
'file_path': template['file_path'],
'state': template['state']
})
# 同时使用名称作为key用于目录节点和没有file_path的记录
name_key = template['name']
if name_key not in result:
result[name_key] = []
result[name_key].append({
'id': template['id'],
'name': template['name'],
'parent_id': template['parent_id'],
'file_path': template['file_path'],
'state': template['state']
})
cursor.close()
return result
def create_or_update_directory(conn, dir_name: str, parent_id: Optional[int], existing_templates: Dict) -> int:
"""
创建或更新目录节点
Returns:
目录节点的ID
"""
cursor = conn.cursor()
try:
# 查找是否已存在通过名称精确匹配且file_path为None
candidates = existing_templates.get(dir_name, [])
existing = None
for candidate in candidates:
if candidate.get('file_path') is None: # 目录节点
existing = candidate
break
if existing:
# 更新现有目录记录
template_id = existing['id']
if existing['parent_id'] != parent_id:
update_sql = """
UPDATE f_polic_file_config
SET parent_id = %s, updated_time = NOW(), updated_by = %s, state = 1
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" [UPDATE] 更新目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})")
else:
print(f" [KEEP] 保持目录: {dir_name} (ID: {template_id})")
return template_id
else:
# 创建新目录记录
template_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
template_id,
TENANT_ID,
parent_id,
dir_name,
None, # input_data
None, # file_path目录节点没有文件路径
CREATED_BY,
CREATED_BY,
1 # state: 1表示启用
))
conn.commit()
print(f" [CREATE] 创建目录: {dir_name} (ID: {template_id}, parent_id: {parent_id})")
# 更新existing_templates
if dir_name not in existing_templates:
existing_templates[dir_name] = []
existing_templates[dir_name].append({
'id': template_id,
'name': dir_name,
'parent_id': parent_id,
'file_path': None,
'state': 1
})
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"创建或更新目录失败: {str(e)}")
finally:
cursor.close()
def normalize_name(name: str) -> str:
"""
标准化名称(去掉扩展名、括号内容、数字前缀等)
"""
import re
# 去掉扩展名
name = Path(name).stem if '.' in name else name
# 去掉括号内容
name = re.sub(r'[(].*?[)]', '', name)
name = name.strip()
# 去掉数字前缀和点号
name = re.sub(r'^\d+[\.\-]?\s*', '', name)
name = name.strip()
return name
def update_file_parent(conn, file_info: Dict, parent_id: Optional[int], existing_templates: Dict) -> Optional[int]:
"""
更新文件节点的parent_id
Args:
file_info: 文件信息包含name和file_path
parent_id: 父节点ID
Returns:
文件节点的ID如果未找到则返回None
"""
cursor = conn.cursor()
try:
file_name = file_info['name']
# 根据文件路径构建预期的MinIO路径
# 文件路径格式: template_finish/2-初核模版/1.初核请示/1.请示报告卡XXX.docx
# MinIO路径格式: /615873064429507639/TEMPLATE/2025/12/1.请示报告卡XXX.docx
expected_path = None
if 'file_path' in file_info:
# 从相对路径构建MinIO路径
from datetime import datetime
now = datetime.now()
relative_path = file_info.get('path', '')
file_name_only = Path(file_name).name
expected_path = f'/615873064429507639/TEMPLATE/{now.year}/{now.month:02d}/{file_name_only}'
# 优先通过file_path匹配最准确
existing = None
if expected_path and expected_path in existing_templates:
candidates = existing_templates[expected_path]
if candidates:
existing = candidates[0]
# 如果没找到,通过标准化名称匹配
if not existing:
normalized_file_name = normalize_name(file_name)
candidates = []
for key, templates in existing_templates.items():
for template in templates:
if template.get('file_path'): # 只匹配有file_path的文件节点
normalized_template_name = normalize_name(template['name'])
if normalized_template_name == normalized_file_name:
# 检查file_path是否匹配
if expected_path and template['file_path'] == expected_path:
existing = template
break
candidates.append(template)
if existing:
break
if not existing and candidates:
# 如果有多个候选选择parent_id匹配的
for candidate in candidates:
if candidate['parent_id'] == parent_id:
existing = candidate
break
# 如果还是没找到,选择第一个
if not existing:
existing = candidates[0]
if not existing:
print(f" [WARN] 未找到文件: {file_name} (标准化: {normalize_name(file_name)})")
return None
template_id = existing['id']
if existing['parent_id'] != parent_id:
update_sql = """
UPDATE f_polic_file_config
SET parent_id = %s, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (parent_id, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" [UPDATE] 更新文件: {file_name} -> {existing['name']} (ID: {template_id}, parent_id: {parent_id})")
else:
print(f" [KEEP] 保持文件: {file_name} -> {existing['name']} (ID: {template_id})")
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"更新文件parent_id失败: {str(e)}")
finally:
cursor.close()
def main():
"""主函数"""
print("=" * 80)
print("更新模板层级结构")
print("=" * 80)
print()
try:
# 连接数据库
print("1. 连接数据库...")
conn = pymysql.connect(**DB_CONFIG)
print("[OK] 数据库连接成功\n")
# 扫描目录结构
print("2. 扫描目录结构...")
structure = scan_directory_structure(TEMPLATE_BASE_DIR)
if not structure:
print("错误: 未找到任何目录或文件")
return
# 获取现有模板
print("\n3. 获取现有模板...")
existing_templates = get_existing_templates(conn)
print(f"[OK] 找到 {len(existing_templates)} 个现有模板\n")
# 构建路径到ID的映射处理目录节点
print("4. 创建/更新目录节点...")
print("=" * 80)
path_to_id = {}
# 按层级顺序处理目录
for dir_info in structure['directories']:
parent_id = None
if dir_info['parent_path']:
parent_id = path_to_id.get(dir_info['parent_path'])
dir_id = create_or_update_directory(conn, dir_info['name'], parent_id, existing_templates)
path_to_id[dir_info['path']] = dir_id
print(f"\n[OK] 处理了 {len(path_to_id)} 个目录节点\n")
# 更新文件节点的parent_id
print("5. 更新文件节点的parent_id...")
print("=" * 80)
for file_info in structure['files']:
parent_id = None
if file_info['parent_path']:
parent_id = path_to_id.get(file_info['parent_path'])
update_file_parent(conn, file_info, parent_id, existing_templates)
print(f"\n[OK] 处理了 {len(structure['files'])} 个文件节点\n")
# 打印层级结构
print("6. 最终层级结构:")
print("=" * 80)
print_hierarchy(conn)
print("\n" + "=" * 80)
print("更新完成!")
print("=" * 80)
except Exception as e:
print(f"\n[ERROR] 发生错误: {e}")
import traceback
traceback.print_exc()
if 'conn' in locals():
conn.rollback()
finally:
if 'conn' in locals():
conn.close()
print("\n数据库连接已关闭")
def print_hierarchy(conn, parent_id=None, level=0, prefix=""):
"""打印层级结构"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
if parent_id is None:
sql = """
SELECT id, name, parent_id, file_path
FROM f_polic_file_config
WHERE tenant_id = %s AND parent_id IS NULL
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID,))
else:
sql = """
SELECT id, name, parent_id, file_path
FROM f_polic_file_config
WHERE tenant_id = %s AND parent_id = %s
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID, parent_id))
items = cursor.fetchall()
for i, item in enumerate(items):
is_last = i == len(items) - 1
current_prefix = prefix + ("└── " if is_last else "├── ")
next_prefix = prefix + (" " if is_last else "")
node_type = "📁" if item['file_path'] is None else "📄"
print(f"{current_prefix}{node_type} {item['name']} (ID: {item['id']})")
# 递归打印子节点
print_hierarchy(conn, item['id'], level + 1, next_prefix)
finally:
cursor.close()
if __name__ == '__main__':
main()