""" 分析和更新模板树状结构 根据 template_finish 目录结构规划树状层级,并更新数据库中的 parent_id 字段 """ import os import json import pymysql from pathlib import Path from typing import Dict, List, Optional, Tuple from datetime import datetime # 数据库连接配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 CURRENT_TIME = datetime.now() # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" # 从 init_all_templates.py 复制的文档类型映射 DOCUMENT_TYPE_MAPPING = { "1.请示报告卡(XXX)": { "template_code": "REPORT_CARD", "name": "1.请示报告卡(XXX)", "business_type": "INVESTIGATION" }, "2.初步核实审批表(XXX)": { "template_code": "PRELIMINARY_VERIFICATION_APPROVAL", "name": "2.初步核实审批表(XXX)", "business_type": "INVESTIGATION" }, "3.附件初核方案(XXX)": { "template_code": "INVESTIGATION_PLAN", "name": "3.附件初核方案(XXX)", "business_type": "INVESTIGATION" }, "谈话通知书第一联": { "template_code": "NOTIFICATION_LETTER_1", "name": "谈话通知书第一联", "business_type": "INVESTIGATION" }, "谈话通知书第二联": { "template_code": "NOTIFICATION_LETTER_2", "name": "谈话通知书第二联", "business_type": "INVESTIGATION" }, "谈话通知书第三联": { "template_code": "NOTIFICATION_LETTER_3", "name": "谈话通知书第三联", "business_type": "INVESTIGATION" }, "1.请示报告卡(初核谈话)": { "template_code": "REPORT_CARD_INTERVIEW", "name": "1.请示报告卡(初核谈话)", "business_type": "INVESTIGATION" }, "2谈话审批表": { "template_code": "INTERVIEW_APPROVAL_FORM", "name": "2谈话审批表", "business_type": "INVESTIGATION" }, "3.谈话前安全风险评估表": { "template_code": "PRE_INTERVIEW_RISK_ASSESSMENT", "name": "3.谈话前安全风险评估表", "business_type": "INVESTIGATION" }, "4.谈话方案": { "template_code": "INTERVIEW_PLAN", "name": "4.谈话方案", "business_type": "INVESTIGATION" }, "5.谈话后安全风险评估表": { "template_code": "POST_INTERVIEW_RISK_ASSESSMENT", "name": "5.谈话后安全风险评估表", "business_type": "INVESTIGATION" }, "1.谈话笔录": { "template_code": "INTERVIEW_RECORD", "name": "1.谈话笔录", "business_type": "INVESTIGATION" }, "2.谈话询问对象情况摸底调查30问": { "template_code": "INVESTIGATION_30_QUESTIONS", "name": "2.谈话询问对象情况摸底调查30问", "business_type": "INVESTIGATION" }, "3.被谈话人权利义务告知书": { "template_code": "RIGHTS_OBLIGATIONS_NOTICE", "name": "3.被谈话人权利义务告知书", "business_type": "INVESTIGATION" }, "4.点对点交接单": { "template_code": "HANDOVER_FORM", "name": "4.点对点交接单", "business_type": "INVESTIGATION" }, "4.点对点交接单2": { "template_code": "HANDOVER_FORM_2", "name": "4.点对点交接单2", "business_type": "INVESTIGATION" }, "5.陪送交接单(新)": { "template_code": "ESCORT_HANDOVER_FORM", "name": "5.陪送交接单(新)", "business_type": "INVESTIGATION" }, "6.1保密承诺书(谈话对象使用-非中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY", "name": "6.1保密承诺书(谈话对象使用-非中共党员用)", "business_type": "INVESTIGATION" }, "6.2保密承诺书(谈话对象使用-中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_PARTY", "name": "6.2保密承诺书(谈话对象使用-中共党员用)", "business_type": "INVESTIGATION" }, "7.办案人员-办案安全保密承诺书": { "template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT", "name": "7.办案人员-办案安全保密承诺书", "business_type": "INVESTIGATION" }, "8-1请示报告卡(初核报告结论) ": { "template_code": "REPORT_CARD_CONCLUSION", "name": "8-1请示报告卡(初核报告结论) ", "business_type": "INVESTIGATION" }, "8.XXX初核情况报告": { "template_code": "INVESTIGATION_REPORT", "name": "8.XXX初核情况报告", "business_type": "INVESTIGATION" } } def generate_id(): """生成ID(使用时间戳+随机数的方式,模拟雪花算法)""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def identify_document_type(file_name: str) -> Optional[Dict]: """根据完整文件名识别文档类型""" base_name = Path(file_name).stem if base_name in DOCUMENT_TYPE_MAPPING: return DOCUMENT_TYPE_MAPPING[base_name] return None def scan_directory_structure(base_dir: Path) -> Dict: """ 扫描目录结构,构建树状层级 Returns: 包含目录和文件层级结构的字典 """ structure = { 'directories': {}, # {path: {'name': ..., 'parent': ..., 'level': ...}} 'files': {} # {file_path: {'name': ..., 'parent': ..., 'template_code': ...}} } def process_path(path: Path, parent_path: Optional[str] = None, level: int = 0): """递归处理路径""" if path.is_file() and path.suffix == '.docx': # 处理文件 file_name = path.stem doc_config = identify_document_type(file_name) structure['files'][str(path)] = { 'name': file_name, 'parent': parent_path, 'level': level, 'template_code': doc_config['template_code'] if doc_config else None, 'full_path': str(path) } elif path.is_dir(): # 处理目录 dir_name = path.name structure['directories'][str(path)] = { 'name': dir_name, 'parent': parent_path, 'level': level } # 递归处理子目录和文件 for child in sorted(path.iterdir()): if child.name != '__pycache__': process_path(child, str(path), level + 1) # 从根目录开始扫描 if TEMPLATES_DIR.exists(): for item in sorted(TEMPLATES_DIR.iterdir()): if item.name != '__pycache__': process_path(item, None, 0) return structure def get_existing_data(conn) -> Dict: """ 获取数据库中的现有数据 Returns: { 'by_id': {id: {...}}, 'by_name': {name: {...}}, 'by_template_code': {template_code: {...}} } """ cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, parent_id, template_code, input_data, file_path, state FROM f_polic_file_config WHERE tenant_id = %s """ cursor.execute(sql, (TENANT_ID,)) configs = cursor.fetchall() result = { 'by_id': {}, 'by_name': {}, 'by_template_code': {} } for config in configs: config_id = config['id'] config_name = config['name'] # 尝试从 input_data 中提取 template_code template_code = config.get('template_code') if not template_code and config.get('input_data'): try: input_data = json.loads(config['input_data']) if isinstance(config['input_data'], str) else config['input_data'] if isinstance(input_data, dict): template_code = input_data.get('template_code') except: pass result['by_id'][config_id] = config result['by_name'][config_name] = config if template_code: # 如果已存在相同 template_code,保留第一个 if template_code not in result['by_template_code']: result['by_template_code'][template_code] = config cursor.close() return result def analyze_structure(): """分析目录结构和数据库数据""" print("="*80) print("分析模板目录结构和数据库数据") print("="*80) # 连接数据库 try: conn = pymysql.connect(**DB_CONFIG) print("✓ 数据库连接成功\n") except Exception as e: print(f"✗ 数据库连接失败: {e}") return None, None # 扫描目录结构 print("扫描目录结构...") dir_structure = scan_directory_structure(TEMPLATES_DIR) print(f" 找到 {len(dir_structure['directories'])} 个目录") print(f" 找到 {len(dir_structure['files'])} 个文件\n") # 获取数据库现有数据 print("获取数据库现有数据...") existing_data = get_existing_data(conn) print(f" 数据库中有 {len(existing_data['by_id'])} 条记录\n") # 分析缺少 parent_id 的记录 print("分析缺少 parent_id 的记录...") missing_parent = [] for config in existing_data['by_id'].values(): if config.get('parent_id') is None: missing_parent.append(config) print(f" 有 {len(missing_parent)} 条记录缺少 parent_id\n") conn.close() return dir_structure, existing_data def plan_tree_structure(dir_structure: Dict, existing_data: Dict) -> List[Dict]: """ 规划树状结构 Returns: 更新计划列表,每个元素包含: { 'type': 'directory' | 'file', 'name': ..., 'parent_name': ..., 'level': ..., 'action': 'create' | 'update', 'config_id': ... (如果是更新), 'template_code': ... (如果是文件) } """ plan = [] # 按层级排序目录 directories = sorted(dir_structure['directories'].items(), key=lambda x: (x[1]['level'], x[0])) # 按层级排序文件 files = sorted(dir_structure['files'].items(), key=lambda x: (x[1]['level'], x[0])) # 创建目录映射(用于查找父目录ID) dir_id_map = {} # {dir_path: config_id} # 处理目录(按层级顺序) for dir_path, dir_info in directories: dir_name = dir_info['name'] parent_path = dir_info['parent'] level = dir_info['level'] # 查找父目录ID parent_id = None if parent_path: parent_id = dir_id_map.get(parent_path) # 检查数据库中是否已存在 existing = existing_data['by_name'].get(dir_name) if existing: # 更新现有记录 plan.append({ 'type': 'directory', 'name': dir_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'update', 'config_id': existing['id'], 'current_parent_id': existing.get('parent_id') }) dir_id_map[dir_path] = existing['id'] else: # 创建新记录(目录节点) new_id = generate_id() plan.append({ 'type': 'directory', 'name': dir_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'create', 'config_id': new_id, 'current_parent_id': None }) dir_id_map[dir_path] = new_id # 处理文件 for file_path, file_info in files: file_name = file_info['name'] parent_path = file_info['parent'] level = file_info['level'] template_code = file_info['template_code'] # 查找父目录ID parent_id = dir_id_map.get(parent_path) if parent_path else None # 查找数据库中的记录(通过 template_code 或 name) existing = None if template_code: existing = existing_data['by_template_code'].get(template_code) if not existing: existing = existing_data['by_name'].get(file_name) if existing: # 更新现有记录 plan.append({ 'type': 'file', 'name': file_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'update', 'config_id': existing['id'], 'template_code': template_code, 'current_parent_id': existing.get('parent_id') }) else: # 创建新记录(文件节点) new_id = generate_id() plan.append({ 'type': 'file', 'name': file_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'create', 'config_id': new_id, 'template_code': template_code, 'current_parent_id': None }) return plan def generate_update_sql(plan: List[Dict], output_file: str = 'update_template_tree.sql'): """生成更新SQL脚本""" sql_lines = [ "-- 模板树状结构更新脚本", f"-- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "-- 注意:执行前请备份数据库!", "", "USE finyx;", "", "START TRANSACTION;", "" ] # 按层级分组 by_level = {} for item in plan: level = item['level'] if level not in by_level: by_level[level] = [] by_level[level].append(item) # 按层级顺序处理(从顶层到底层) for level in sorted(by_level.keys()): sql_lines.append(f"-- ===== 层级 {level} =====") sql_lines.append("") for item in by_level[level]: if item['action'] == 'create': # 创建新记录 if item['type'] == 'directory': sql_lines.append(f"-- 创建目录节点: {item['name']}") sql_lines.append(f"INSERT INTO f_polic_file_config") sql_lines.append(f" (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)") parent_id_sql = f"{item['parent_id']}" if item['parent_id'] else "NULL" sql_lines.append(f"VALUES ({item['config_id']}, {TENANT_ID}, {parent_id_sql}, '{item['name']}', NULL, NULL, NOW(), {CREATED_BY}, NOW(), {UPDATED_BY}, 1);") else: # 文件节点(需要 template_code) sql_lines.append(f"-- 创建文件节点: {item['name']}") input_data = json.dumps({ 'template_code': item.get('template_code', ''), 'business_type': 'INVESTIGATION' }, ensure_ascii=False).replace("'", "''") sql_lines.append(f"INSERT INTO f_polic_file_config") sql_lines.append(f" (id, tenant_id, parent_id, name, input_data, file_path, template_code, created_time, created_by, updated_time, updated_by, state)") parent_id_sql = f"{item['parent_id']}" if item['parent_id'] else "NULL" template_code_sql = f"'{item.get('template_code', '')}'" if item.get('template_code') else "NULL" sql_lines.append(f"VALUES ({item['config_id']}, {TENANT_ID}, {parent_id_sql}, '{item['name']}', '{input_data}', NULL, {template_code_sql}, NOW(), {CREATED_BY}, NOW(), {UPDATED_BY}, 1);") sql_lines.append("") else: # 更新现有记录 current_parent = item.get('current_parent_id') new_parent = item.get('parent_id') if current_parent != new_parent: sql_lines.append(f"-- 更新: {item['name']} (parent_id: {current_parent} -> {new_parent})") parent_id_sql = f"{new_parent}" if new_parent else "NULL" sql_lines.append(f"UPDATE f_polic_file_config") sql_lines.append(f"SET parent_id = {parent_id_sql}, updated_time = NOW(), updated_by = {UPDATED_BY}") sql_lines.append(f"WHERE id = {item['config_id']} AND tenant_id = {TENANT_ID};") sql_lines.append("") sql_lines.append("COMMIT;") sql_lines.append("") sql_lines.append("-- 更新完成") # 写入文件 with open(output_file, 'w', encoding='utf-8') as f: f.write('\n'.join(sql_lines)) print(f"✓ SQL脚本已生成: {output_file}") return output_file def print_analysis_report(dir_structure: Dict, existing_data: Dict, plan: List[Dict]): """打印分析报告""" print("\n" + "="*80) print("分析报告") print("="*80) print(f"\n目录结构:") print(f" - 目录数量: {len(dir_structure['directories'])}") print(f" - 文件数量: {len(dir_structure['files'])}") print(f"\n数据库现状:") print(f" - 总记录数: {len(existing_data['by_id'])}") missing_parent = sum(1 for c in existing_data['by_id'].values() if c.get('parent_id') is None) print(f" - 缺少 parent_id 的记录: {missing_parent}") print(f"\n更新计划:") create_count = sum(1 for p in plan if p['action'] == 'create') update_count = sum(1 for p in plan if p['action'] == 'update') print(f" - 需要创建: {create_count} 条") print(f" - 需要更新: {update_count} 条") print(f"\n层级分布:") by_level = {} for item in plan: level = item['level'] by_level[level] = by_level.get(level, 0) + 1 for level in sorted(by_level.keys()): print(f" - 层级 {level}: {by_level[level]} 个节点") print("\n" + "="*80) def main(): """主函数""" # 分析 dir_structure, existing_data = analyze_structure() if not dir_structure or not existing_data: return # 规划树状结构 print("规划树状结构...") plan = plan_tree_structure(dir_structure, existing_data) print(f" 生成 {len(plan)} 个更新计划\n") # 打印报告 print_analysis_report(dir_structure, existing_data, plan) # 生成SQL脚本 print("\n生成SQL更新脚本...") sql_file = generate_update_sql(plan) print("\n" + "="*80) print("分析完成!") print("="*80) print(f"\n请检查生成的SQL脚本: {sql_file}") print("确认无误后,可以执行该脚本更新数据库。") print("\n注意:执行前请备份数据库!") if __name__ == '__main__': main()