""" 改进的匹配和更新脚本 增强匹配逻辑,能够匹配数据库中的已有数据 """ import os import json import pymysql import re from pathlib import Path from typing import Dict, List, Optional, Tuple from datetime import datetime # 数据库连接配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" # 文档类型映射 DOCUMENT_TYPE_MAPPING = { "1.请示报告卡(XXX)": { "template_code": "REPORT_CARD", "name": "1.请示报告卡(XXX)", "business_type": "INVESTIGATION" }, "2.初步核实审批表(XXX)": { "template_code": "PRELIMINARY_VERIFICATION_APPROVAL", "name": "2.初步核实审批表(XXX)", "business_type": "INVESTIGATION" }, "3.附件初核方案(XXX)": { "template_code": "INVESTIGATION_PLAN", "name": "3.附件初核方案(XXX)", "business_type": "INVESTIGATION" }, "谈话通知书第一联": { "template_code": "NOTIFICATION_LETTER_1", "name": "谈话通知书第一联", "business_type": "INVESTIGATION" }, "谈话通知书第二联": { "template_code": "NOTIFICATION_LETTER_2", "name": "谈话通知书第二联", "business_type": "INVESTIGATION" }, "谈话通知书第三联": { "template_code": "NOTIFICATION_LETTER_3", "name": "谈话通知书第三联", "business_type": "INVESTIGATION" }, "1.请示报告卡(初核谈话)": { "template_code": "REPORT_CARD_INTERVIEW", "name": "1.请示报告卡(初核谈话)", "business_type": "INVESTIGATION" }, "2谈话审批表": { "template_code": "INTERVIEW_APPROVAL_FORM", "name": "2谈话审批表", "business_type": "INVESTIGATION" }, "3.谈话前安全风险评估表": { "template_code": "PRE_INTERVIEW_RISK_ASSESSMENT", "name": "3.谈话前安全风险评估表", "business_type": "INVESTIGATION" }, "4.谈话方案": { "template_code": "INTERVIEW_PLAN", "name": "4.谈话方案", "business_type": "INVESTIGATION" }, "5.谈话后安全风险评估表": { "template_code": "POST_INTERVIEW_RISK_ASSESSMENT", "name": "5.谈话后安全风险评估表", "business_type": "INVESTIGATION" }, "1.谈话笔录": { "template_code": "INTERVIEW_RECORD", "name": "1.谈话笔录", "business_type": "INVESTIGATION" }, "2.谈话询问对象情况摸底调查30问": { "template_code": "INVESTIGATION_30_QUESTIONS", "name": "2.谈话询问对象情况摸底调查30问", "business_type": "INVESTIGATION" }, "3.被谈话人权利义务告知书": { "template_code": "RIGHTS_OBLIGATIONS_NOTICE", "name": "3.被谈话人权利义务告知书", "business_type": "INVESTIGATION" }, "4.点对点交接单": { "template_code": "HANDOVER_FORM", "name": "4.点对点交接单", "business_type": "INVESTIGATION" }, "5.陪送交接单(新)": { "template_code": "ESCORT_HANDOVER_FORM", "name": "5.陪送交接单(新)", "business_type": "INVESTIGATION" }, "6.1保密承诺书(谈话对象使用-非中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY", "name": "6.1保密承诺书(谈话对象使用-非中共党员用)", "business_type": "INVESTIGATION" }, "6.2保密承诺书(谈话对象使用-中共党员用)": { "template_code": "CONFIDENTIALITY_COMMITMENT_PARTY", "name": "6.2保密承诺书(谈话对象使用-中共党员用)", "business_type": "INVESTIGATION" }, "7.办案人员-办案安全保密承诺书": { "template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT", "name": "7.办案人员-办案安全保密承诺书", "business_type": "INVESTIGATION" }, "8-1请示报告卡(初核报告结论) ": { "template_code": "REPORT_CARD_CONCLUSION", "name": "8-1请示报告卡(初核报告结论) ", "business_type": "INVESTIGATION" }, "8.XXX初核情况报告": { "template_code": "INVESTIGATION_REPORT", "name": "8.XXX初核情况报告", "business_type": "INVESTIGATION" } } def normalize_name(name: str) -> str: """标准化名称,用于模糊匹配""" # 去掉开头的编号(如 "1."、"2."、"8-1" 等) name = re.sub(r'^\d+[\.\-]\s*', '', name) # 去掉括号及其内容(如 "(XXX)"、"(初核谈话)" 等) name = re.sub(r'[((].*?[))]', '', name) # 去掉空格和特殊字符 name = name.strip() return name def generate_id(): """生成ID""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def identify_document_type(file_name: str) -> Optional[Dict]: """根据完整文件名识别文档类型""" base_name = Path(file_name).stem if base_name in DOCUMENT_TYPE_MAPPING: return DOCUMENT_TYPE_MAPPING[base_name] return None def scan_directory_structure(base_dir: Path) -> Dict: """扫描目录结构,构建树状层级""" structure = { 'directories': {}, 'files': {} } def process_path(path: Path, parent_path: Optional[str] = None, level: int = 0): """递归处理路径""" if path.is_file() and path.suffix == '.docx': file_name = path.stem doc_config = identify_document_type(file_name) structure['files'][str(path)] = { 'name': file_name, 'parent': parent_path, 'level': level, 'template_code': doc_config['template_code'] if doc_config else None, 'full_path': str(path), 'normalized_name': normalize_name(file_name) } elif path.is_dir(): dir_name = path.name structure['directories'][str(path)] = { 'name': dir_name, 'parent': parent_path, 'level': level, 'normalized_name': normalize_name(dir_name) } for child in sorted(path.iterdir()): if child.name != '__pycache__': process_path(child, str(path), level + 1) if TEMPLATES_DIR.exists(): for item in sorted(TEMPLATES_DIR.iterdir()): if item.name != '__pycache__': process_path(item, None, 0) return structure def get_existing_data(conn) -> Dict: """获取数据库中的现有数据,增强匹配能力""" cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, parent_id, template_code, input_data, file_path, state FROM f_polic_file_config WHERE tenant_id = %s """ cursor.execute(sql, (TENANT_ID,)) configs = cursor.fetchall() result = { 'by_id': {}, 'by_name': {}, 'by_template_code': {}, 'by_normalized_name': {} # 新增:标准化名称索引 } for config in configs: config_id = config['id'] config_name = config['name'] # 提取 template_code template_code = config.get('template_code') if not template_code and config.get('input_data'): try: input_data = json.loads(config['input_data']) if isinstance(config['input_data'], str) else config['input_data'] if isinstance(input_data, dict): template_code = input_data.get('template_code') except: pass config['extracted_template_code'] = template_code config['normalized_name'] = normalize_name(config_name) result['by_id'][config_id] = config result['by_name'][config_name] = config if template_code: if template_code not in result['by_template_code']: result['by_template_code'][template_code] = config # 标准化名称索引(可能有多个记录匹配同一个标准化名称) normalized = config['normalized_name'] if normalized not in result['by_normalized_name']: result['by_normalized_name'][normalized] = [] result['by_normalized_name'][normalized].append(config) cursor.close() return result def find_matching_config(file_info: Dict, existing_data: Dict) -> Optional[Dict]: """ 查找匹配的数据库记录 优先级:1. template_code 精确匹配 2. 名称精确匹配 3. 标准化名称匹配 """ template_code = file_info.get('template_code') file_name = file_info['name'] normalized_name = file_info.get('normalized_name', normalize_name(file_name)) # 优先级1: template_code 精确匹配 if template_code: matched = existing_data['by_template_code'].get(template_code) if matched: return matched # 优先级2: 名称精确匹配 matched = existing_data['by_name'].get(file_name) if matched: return matched # 优先级3: 标准化名称匹配 candidates = existing_data['by_normalized_name'].get(normalized_name, []) if candidates: # 如果有多个候选,优先选择有正确 template_code 的 for candidate in candidates: if candidate.get('extracted_template_code') == template_code: return candidate # 否则返回第一个 return candidates[0] return None def plan_tree_structure(dir_structure: Dict, existing_data: Dict) -> List[Dict]: """规划树状结构,使用改进的匹配逻辑""" plan = [] directories = sorted(dir_structure['directories'].items(), key=lambda x: (x[1]['level'], x[0])) files = sorted(dir_structure['files'].items(), key=lambda x: (x[1]['level'], x[0])) dir_id_map = {} # 处理目录 for dir_path, dir_info in directories: dir_name = dir_info['name'] parent_path = dir_info['parent'] level = dir_info['level'] parent_id = None if parent_path: parent_id = dir_id_map.get(parent_path) # 查找匹配的数据库记录 matched = find_matching_config(dir_info, existing_data) if matched: plan.append({ 'type': 'directory', 'name': dir_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'update', 'config_id': matched['id'], 'current_parent_id': matched.get('parent_id'), 'matched_by': 'existing' }) dir_id_map[dir_path] = matched['id'] else: new_id = generate_id() plan.append({ 'type': 'directory', 'name': dir_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'create', 'config_id': new_id, 'current_parent_id': None, 'matched_by': 'new' }) dir_id_map[dir_path] = new_id # 处理文件 for file_path, file_info in files: file_name = file_info['name'] parent_path = file_info['parent'] level = file_info['level'] template_code = file_info['template_code'] parent_id = dir_id_map.get(parent_path) if parent_path else None # 查找匹配的数据库记录 matched = find_matching_config(file_info, existing_data) if matched: plan.append({ 'type': 'file', 'name': file_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'update', 'config_id': matched['id'], 'template_code': template_code, 'current_parent_id': matched.get('parent_id'), 'matched_by': 'existing' }) else: new_id = generate_id() plan.append({ 'type': 'file', 'name': file_name, 'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None, 'parent_id': parent_id, 'level': level, 'action': 'create', 'config_id': new_id, 'template_code': template_code, 'current_parent_id': None, 'matched_by': 'new' }) return plan def print_matching_report(plan: List[Dict]): """打印匹配报告""" print("\n" + "="*80) print("匹配报告") print("="*80) matched = [p for p in plan if p.get('matched_by') == 'existing'] unmatched = [p for p in plan if p.get('matched_by') == 'new'] print(f"\n已匹配的记录: {len(matched)} 条") print(f"未匹配的记录(将创建): {len(unmatched)} 条\n") if unmatched: print("未匹配的记录列表:") for item in unmatched: print(f" - {item['name']} ({item['type']})") print("\n匹配详情:") by_level = {} for item in plan: level = item['level'] if level not in by_level: by_level[level] = [] by_level[level].append(item) for level in sorted(by_level.keys()): print(f"\n【层级 {level}】") for item in by_level[level]: indent = " " * level match_status = "✓" if item.get('matched_by') == 'existing' else "✗" print(f"{indent}{match_status} {item['name']} (ID: {item['config_id']})") if item.get('parent_name'): print(f"{indent} 父节点: {item['parent_name']}") if item['action'] == 'update': current = item.get('current_parent_id', 'None') new = item.get('parent_id', 'None') if current != new: print(f"{indent} parent_id: {current} → {new}") def main(): """主函数""" print("="*80) print("改进的模板树状结构分析和更新") print("="*80) try: conn = pymysql.connect(**DB_CONFIG) print("✓ 数据库连接成功\n") except Exception as e: print(f"✗ 数据库连接失败: {e}") return try: print("扫描目录结构...") dir_structure = scan_directory_structure(TEMPLATES_DIR) print(f" 找到 {len(dir_structure['directories'])} 个目录") print(f" 找到 {len(dir_structure['files'])} 个文件\n") print("获取数据库现有数据...") existing_data = get_existing_data(conn) print(f" 数据库中有 {len(existing_data['by_id'])} 条记录\n") print("规划树状结构(使用改进的匹配逻辑)...") plan = plan_tree_structure(dir_structure, existing_data) print(f" 生成 {len(plan)} 个更新计划\n") print_matching_report(plan) # 询问是否继续 print("\n" + "="*80) response = input("\n是否生成更新SQL脚本?(yes/no,默认no): ").strip().lower() if response == 'yes': from analyze_and_update_template_tree import generate_update_sql sql_file = generate_update_sql(plan) print(f"\n✓ SQL脚本已生成: {sql_file}") else: print("\n已取消") finally: conn.close() if __name__ == '__main__': main()