ai-business-write/analyze_and_update_template_tree.py

556 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
分析和更新模板树状结构
根据 template_finish 目录结构规划树状层级,并更新数据库中的 parent_id 字段
"""
import os
import json
import pymysql
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
CURRENT_TIME = datetime.now()
# 项目根目录
PROJECT_ROOT = Path(__file__).parent
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
# 从 init_all_templates.py 复制的文档类型映射
DOCUMENT_TYPE_MAPPING = {
"1.请示报告卡XXX": {
"template_code": "REPORT_CARD",
"name": "1.请示报告卡XXX",
"business_type": "INVESTIGATION"
},
"2.初步核实审批表XXX": {
"template_code": "PRELIMINARY_VERIFICATION_APPROVAL",
"name": "2.初步核实审批表XXX",
"business_type": "INVESTIGATION"
},
"3.附件初核方案(XXX)": {
"template_code": "INVESTIGATION_PLAN",
"name": "3.附件初核方案(XXX)",
"business_type": "INVESTIGATION"
},
"谈话通知书第一联": {
"template_code": "NOTIFICATION_LETTER_1",
"name": "谈话通知书第一联",
"business_type": "INVESTIGATION"
},
"谈话通知书第二联": {
"template_code": "NOTIFICATION_LETTER_2",
"name": "谈话通知书第二联",
"business_type": "INVESTIGATION"
},
"谈话通知书第三联": {
"template_code": "NOTIFICATION_LETTER_3",
"name": "谈话通知书第三联",
"business_type": "INVESTIGATION"
},
"1.请示报告卡(初核谈话)": {
"template_code": "REPORT_CARD_INTERVIEW",
"name": "1.请示报告卡(初核谈话)",
"business_type": "INVESTIGATION"
},
"2谈话审批表": {
"template_code": "INTERVIEW_APPROVAL_FORM",
"name": "2谈话审批表",
"business_type": "INVESTIGATION"
},
"3.谈话前安全风险评估表": {
"template_code": "PRE_INTERVIEW_RISK_ASSESSMENT",
"name": "3.谈话前安全风险评估表",
"business_type": "INVESTIGATION"
},
"4.谈话方案": {
"template_code": "INTERVIEW_PLAN",
"name": "4.谈话方案",
"business_type": "INVESTIGATION"
},
"5.谈话后安全风险评估表": {
"template_code": "POST_INTERVIEW_RISK_ASSESSMENT",
"name": "5.谈话后安全风险评估表",
"business_type": "INVESTIGATION"
},
"1.谈话笔录": {
"template_code": "INTERVIEW_RECORD",
"name": "1.谈话笔录",
"business_type": "INVESTIGATION"
},
"2.谈话询问对象情况摸底调查30问": {
"template_code": "INVESTIGATION_30_QUESTIONS",
"name": "2.谈话询问对象情况摸底调查30问",
"business_type": "INVESTIGATION"
},
"3.被谈话人权利义务告知书": {
"template_code": "RIGHTS_OBLIGATIONS_NOTICE",
"name": "3.被谈话人权利义务告知书",
"business_type": "INVESTIGATION"
},
"4.点对点交接单": {
"template_code": "HANDOVER_FORM",
"name": "4.点对点交接单",
"business_type": "INVESTIGATION"
},
"4.点对点交接单2": {
"template_code": "HANDOVER_FORM_2",
"name": "4.点对点交接单2",
"business_type": "INVESTIGATION"
},
"5.陪送交接单(新)": {
"template_code": "ESCORT_HANDOVER_FORM",
"name": "5.陪送交接单(新)",
"business_type": "INVESTIGATION"
},
"6.1保密承诺书(谈话对象使用-非中共党员用)": {
"template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY",
"name": "6.1保密承诺书(谈话对象使用-非中共党员用)",
"business_type": "INVESTIGATION"
},
"6.2保密承诺书(谈话对象使用-中共党员用)": {
"template_code": "CONFIDENTIALITY_COMMITMENT_PARTY",
"name": "6.2保密承诺书(谈话对象使用-中共党员用)",
"business_type": "INVESTIGATION"
},
"7.办案人员-办案安全保密承诺书": {
"template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT",
"name": "7.办案人员-办案安全保密承诺书",
"business_type": "INVESTIGATION"
},
"8-1请示报告卡初核报告结论 ": {
"template_code": "REPORT_CARD_CONCLUSION",
"name": "8-1请示报告卡初核报告结论 ",
"business_type": "INVESTIGATION"
},
"8.XXX初核情况报告": {
"template_code": "INVESTIGATION_REPORT",
"name": "8.XXX初核情况报告",
"business_type": "INVESTIGATION"
}
}
def generate_id():
"""生成ID使用时间戳+随机数的方式,模拟雪花算法)"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def identify_document_type(file_name: str) -> Optional[Dict]:
"""根据完整文件名识别文档类型"""
base_name = Path(file_name).stem
if base_name in DOCUMENT_TYPE_MAPPING:
return DOCUMENT_TYPE_MAPPING[base_name]
return None
def scan_directory_structure(base_dir: Path) -> Dict:
"""
扫描目录结构,构建树状层级
Returns:
包含目录和文件层级结构的字典
"""
structure = {
'directories': {}, # {path: {'name': ..., 'parent': ..., 'level': ...}}
'files': {} # {file_path: {'name': ..., 'parent': ..., 'template_code': ...}}
}
def process_path(path: Path, parent_path: Optional[str] = None, level: int = 0):
"""递归处理路径"""
if path.is_file() and path.suffix == '.docx':
# 处理文件
file_name = path.stem
doc_config = identify_document_type(file_name)
structure['files'][str(path)] = {
'name': file_name,
'parent': parent_path,
'level': level,
'template_code': doc_config['template_code'] if doc_config else None,
'full_path': str(path)
}
elif path.is_dir():
# 处理目录
dir_name = path.name
structure['directories'][str(path)] = {
'name': dir_name,
'parent': parent_path,
'level': level
}
# 递归处理子目录和文件
for child in sorted(path.iterdir()):
if child.name != '__pycache__':
process_path(child, str(path), level + 1)
# 从根目录开始扫描
if TEMPLATES_DIR.exists():
for item in sorted(TEMPLATES_DIR.iterdir()):
if item.name != '__pycache__':
process_path(item, None, 0)
return structure
def get_existing_data(conn) -> Dict:
"""
获取数据库中的现有数据
Returns:
{
'by_id': {id: {...}},
'by_name': {name: {...}},
'by_template_code': {template_code: {...}}
}
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, parent_id, template_code, input_data, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
configs = cursor.fetchall()
result = {
'by_id': {},
'by_name': {},
'by_template_code': {}
}
for config in configs:
config_id = config['id']
config_name = config['name']
# 尝试从 input_data 中提取 template_code
template_code = config.get('template_code')
if not template_code and config.get('input_data'):
try:
input_data = json.loads(config['input_data']) if isinstance(config['input_data'], str) else config['input_data']
if isinstance(input_data, dict):
template_code = input_data.get('template_code')
except:
pass
result['by_id'][config_id] = config
result['by_name'][config_name] = config
if template_code:
# 如果已存在相同 template_code保留第一个
if template_code not in result['by_template_code']:
result['by_template_code'][template_code] = config
cursor.close()
return result
def analyze_structure():
"""分析目录结构和数据库数据"""
print("="*80)
print("分析模板目录结构和数据库数据")
print("="*80)
# 连接数据库
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功\n")
except Exception as e:
print(f"✗ 数据库连接失败: {e}")
return None, None
# 扫描目录结构
print("扫描目录结构...")
dir_structure = scan_directory_structure(TEMPLATES_DIR)
print(f" 找到 {len(dir_structure['directories'])} 个目录")
print(f" 找到 {len(dir_structure['files'])} 个文件\n")
# 获取数据库现有数据
print("获取数据库现有数据...")
existing_data = get_existing_data(conn)
print(f" 数据库中有 {len(existing_data['by_id'])} 条记录\n")
# 分析缺少 parent_id 的记录
print("分析缺少 parent_id 的记录...")
missing_parent = []
for config in existing_data['by_id'].values():
if config.get('parent_id') is None:
missing_parent.append(config)
print(f"{len(missing_parent)} 条记录缺少 parent_id\n")
conn.close()
return dir_structure, existing_data
def plan_tree_structure(dir_structure: Dict, existing_data: Dict) -> List[Dict]:
"""
规划树状结构
Returns:
更新计划列表,每个元素包含:
{
'type': 'directory' | 'file',
'name': ...,
'parent_name': ...,
'level': ...,
'action': 'create' | 'update',
'config_id': ... (如果是更新),
'template_code': ... (如果是文件)
}
"""
plan = []
# 按层级排序目录
directories = sorted(dir_structure['directories'].items(),
key=lambda x: (x[1]['level'], x[0]))
# 按层级排序文件
files = sorted(dir_structure['files'].items(),
key=lambda x: (x[1]['level'], x[0]))
# 创建目录映射用于查找父目录ID
dir_id_map = {} # {dir_path: config_id}
# 处理目录(按层级顺序)
for dir_path, dir_info in directories:
dir_name = dir_info['name']
parent_path = dir_info['parent']
level = dir_info['level']
# 查找父目录ID
parent_id = None
if parent_path:
parent_id = dir_id_map.get(parent_path)
# 检查数据库中是否已存在
existing = existing_data['by_name'].get(dir_name)
if existing:
# 更新现有记录
plan.append({
'type': 'directory',
'name': dir_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'update',
'config_id': existing['id'],
'current_parent_id': existing.get('parent_id')
})
dir_id_map[dir_path] = existing['id']
else:
# 创建新记录(目录节点)
new_id = generate_id()
plan.append({
'type': 'directory',
'name': dir_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'create',
'config_id': new_id,
'current_parent_id': None
})
dir_id_map[dir_path] = new_id
# 处理文件
for file_path, file_info in files:
file_name = file_info['name']
parent_path = file_info['parent']
level = file_info['level']
template_code = file_info['template_code']
# 查找父目录ID
parent_id = dir_id_map.get(parent_path) if parent_path else None
# 查找数据库中的记录(通过 template_code 或 name
existing = None
if template_code:
existing = existing_data['by_template_code'].get(template_code)
if not existing:
existing = existing_data['by_name'].get(file_name)
if existing:
# 更新现有记录
plan.append({
'type': 'file',
'name': file_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'update',
'config_id': existing['id'],
'template_code': template_code,
'current_parent_id': existing.get('parent_id')
})
else:
# 创建新记录(文件节点)
new_id = generate_id()
plan.append({
'type': 'file',
'name': file_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'create',
'config_id': new_id,
'template_code': template_code,
'current_parent_id': None
})
return plan
def generate_update_sql(plan: List[Dict], output_file: str = 'update_template_tree.sql'):
"""生成更新SQL脚本"""
sql_lines = [
"-- 模板树状结构更新脚本",
f"-- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"-- 注意:执行前请备份数据库!",
"",
"USE finyx;",
"",
"START TRANSACTION;",
""
]
# 按层级分组
by_level = {}
for item in plan:
level = item['level']
if level not in by_level:
by_level[level] = []
by_level[level].append(item)
# 按层级顺序处理(从顶层到底层)
for level in sorted(by_level.keys()):
sql_lines.append(f"-- ===== 层级 {level} =====")
sql_lines.append("")
for item in by_level[level]:
if item['action'] == 'create':
# 创建新记录
if item['type'] == 'directory':
sql_lines.append(f"-- 创建目录节点: {item['name']}")
sql_lines.append(f"INSERT INTO f_polic_file_config")
sql_lines.append(f" (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)")
parent_id_sql = f"{item['parent_id']}" if item['parent_id'] else "NULL"
sql_lines.append(f"VALUES ({item['config_id']}, {TENANT_ID}, {parent_id_sql}, '{item['name']}', NULL, NULL, NOW(), {CREATED_BY}, NOW(), {UPDATED_BY}, 1);")
else:
# 文件节点(需要 template_code
sql_lines.append(f"-- 创建文件节点: {item['name']}")
input_data = json.dumps({
'template_code': item.get('template_code', ''),
'business_type': 'INVESTIGATION'
}, ensure_ascii=False).replace("'", "''")
sql_lines.append(f"INSERT INTO f_polic_file_config")
sql_lines.append(f" (id, tenant_id, parent_id, name, input_data, file_path, template_code, created_time, created_by, updated_time, updated_by, state)")
parent_id_sql = f"{item['parent_id']}" if item['parent_id'] else "NULL"
template_code_sql = f"'{item.get('template_code', '')}'" if item.get('template_code') else "NULL"
sql_lines.append(f"VALUES ({item['config_id']}, {TENANT_ID}, {parent_id_sql}, '{item['name']}', '{input_data}', NULL, {template_code_sql}, NOW(), {CREATED_BY}, NOW(), {UPDATED_BY}, 1);")
sql_lines.append("")
else:
# 更新现有记录
current_parent = item.get('current_parent_id')
new_parent = item.get('parent_id')
if current_parent != new_parent:
sql_lines.append(f"-- 更新: {item['name']} (parent_id: {current_parent} -> {new_parent})")
parent_id_sql = f"{new_parent}" if new_parent else "NULL"
sql_lines.append(f"UPDATE f_polic_file_config")
sql_lines.append(f"SET parent_id = {parent_id_sql}, updated_time = NOW(), updated_by = {UPDATED_BY}")
sql_lines.append(f"WHERE id = {item['config_id']} AND tenant_id = {TENANT_ID};")
sql_lines.append("")
sql_lines.append("COMMIT;")
sql_lines.append("")
sql_lines.append("-- 更新完成")
# 写入文件
with open(output_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(sql_lines))
print(f"✓ SQL脚本已生成: {output_file}")
return output_file
def print_analysis_report(dir_structure: Dict, existing_data: Dict, plan: List[Dict]):
"""打印分析报告"""
print("\n" + "="*80)
print("分析报告")
print("="*80)
print(f"\n目录结构:")
print(f" - 目录数量: {len(dir_structure['directories'])}")
print(f" - 文件数量: {len(dir_structure['files'])}")
print(f"\n数据库现状:")
print(f" - 总记录数: {len(existing_data['by_id'])}")
missing_parent = sum(1 for c in existing_data['by_id'].values() if c.get('parent_id') is None)
print(f" - 缺少 parent_id 的记录: {missing_parent}")
print(f"\n更新计划:")
create_count = sum(1 for p in plan if p['action'] == 'create')
update_count = sum(1 for p in plan if p['action'] == 'update')
print(f" - 需要创建: {create_count}")
print(f" - 需要更新: {update_count}")
print(f"\n层级分布:")
by_level = {}
for item in plan:
level = item['level']
by_level[level] = by_level.get(level, 0) + 1
for level in sorted(by_level.keys()):
print(f" - 层级 {level}: {by_level[level]} 个节点")
print("\n" + "="*80)
def main():
"""主函数"""
# 分析
dir_structure, existing_data = analyze_structure()
if not dir_structure or not existing_data:
return
# 规划树状结构
print("规划树状结构...")
plan = plan_tree_structure(dir_structure, existing_data)
print(f" 生成 {len(plan)} 个更新计划\n")
# 打印报告
print_analysis_report(dir_structure, existing_data, plan)
# 生成SQL脚本
print("\n生成SQL更新脚本...")
sql_file = generate_update_sql(plan)
print("\n" + "="*80)
print("分析完成!")
print("="*80)
print(f"\n请检查生成的SQL脚本: {sql_file}")
print("确认无误后,可以执行该脚本更新数据库。")
print("\n注意:执行前请备份数据库!")
if __name__ == '__main__':
main()