479 lines
16 KiB
Python
479 lines
16 KiB
Python
"""
|
||
改进的匹配和更新脚本
|
||
增强匹配逻辑,能够匹配数据库中的已有数据
|
||
"""
|
||
import os
|
||
import json
|
||
import pymysql
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple
|
||
from datetime import datetime
|
||
|
||
# 数据库连接配置
|
||
DB_CONFIG = {
|
||
'host': os.getenv('DB_HOST', '152.136.177.240'),
|
||
'port': int(os.getenv('DB_PORT', 5012)),
|
||
'user': os.getenv('DB_USER', 'finyx'),
|
||
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
|
||
'database': os.getenv('DB_NAME', 'finyx'),
|
||
'charset': 'utf8mb4'
|
||
}
|
||
|
||
TENANT_ID = 615873064429507639
|
||
CREATED_BY = 655162080928945152
|
||
UPDATED_BY = 655162080928945152
|
||
|
||
# 项目根目录
|
||
PROJECT_ROOT = Path(__file__).parent
|
||
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
|
||
|
||
# 文档类型映射
|
||
DOCUMENT_TYPE_MAPPING = {
|
||
"1.请示报告卡(XXX)": {
|
||
"template_code": "REPORT_CARD",
|
||
"name": "1.请示报告卡(XXX)",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"2.初步核实审批表(XXX)": {
|
||
"template_code": "PRELIMINARY_VERIFICATION_APPROVAL",
|
||
"name": "2.初步核实审批表(XXX)",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"3.附件初核方案(XXX)": {
|
||
"template_code": "INVESTIGATION_PLAN",
|
||
"name": "3.附件初核方案(XXX)",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"谈话通知书第一联": {
|
||
"template_code": "NOTIFICATION_LETTER_1",
|
||
"name": "谈话通知书第一联",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"谈话通知书第二联": {
|
||
"template_code": "NOTIFICATION_LETTER_2",
|
||
"name": "谈话通知书第二联",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"谈话通知书第三联": {
|
||
"template_code": "NOTIFICATION_LETTER_3",
|
||
"name": "谈话通知书第三联",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"1.请示报告卡(初核谈话)": {
|
||
"template_code": "REPORT_CARD_INTERVIEW",
|
||
"name": "1.请示报告卡(初核谈话)",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"2谈话审批表": {
|
||
"template_code": "INTERVIEW_APPROVAL_FORM",
|
||
"name": "2谈话审批表",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"3.谈话前安全风险评估表": {
|
||
"template_code": "PRE_INTERVIEW_RISK_ASSESSMENT",
|
||
"name": "3.谈话前安全风险评估表",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"4.谈话方案": {
|
||
"template_code": "INTERVIEW_PLAN",
|
||
"name": "4.谈话方案",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"5.谈话后安全风险评估表": {
|
||
"template_code": "POST_INTERVIEW_RISK_ASSESSMENT",
|
||
"name": "5.谈话后安全风险评估表",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"1.谈话笔录": {
|
||
"template_code": "INTERVIEW_RECORD",
|
||
"name": "1.谈话笔录",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"2.谈话询问对象情况摸底调查30问": {
|
||
"template_code": "INVESTIGATION_30_QUESTIONS",
|
||
"name": "2.谈话询问对象情况摸底调查30问",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"3.被谈话人权利义务告知书": {
|
||
"template_code": "RIGHTS_OBLIGATIONS_NOTICE",
|
||
"name": "3.被谈话人权利义务告知书",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"4.点对点交接单": {
|
||
"template_code": "HANDOVER_FORM",
|
||
"name": "4.点对点交接单",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"5.陪送交接单(新)": {
|
||
"template_code": "ESCORT_HANDOVER_FORM",
|
||
"name": "5.陪送交接单(新)",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"6.1保密承诺书(谈话对象使用-非中共党员用)": {
|
||
"template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY",
|
||
"name": "6.1保密承诺书(谈话对象使用-非中共党员用)",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"6.2保密承诺书(谈话对象使用-中共党员用)": {
|
||
"template_code": "CONFIDENTIALITY_COMMITMENT_PARTY",
|
||
"name": "6.2保密承诺书(谈话对象使用-中共党员用)",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"7.办案人员-办案安全保密承诺书": {
|
||
"template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT",
|
||
"name": "7.办案人员-办案安全保密承诺书",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"8-1请示报告卡(初核报告结论) ": {
|
||
"template_code": "REPORT_CARD_CONCLUSION",
|
||
"name": "8-1请示报告卡(初核报告结论) ",
|
||
"business_type": "INVESTIGATION"
|
||
},
|
||
"8.XXX初核情况报告": {
|
||
"template_code": "INVESTIGATION_REPORT",
|
||
"name": "8.XXX初核情况报告",
|
||
"business_type": "INVESTIGATION"
|
||
}
|
||
}
|
||
|
||
|
||
def normalize_name(name: str) -> str:
|
||
"""标准化名称,用于模糊匹配"""
|
||
# 去掉开头的编号(如 "1."、"2."、"8-1" 等)
|
||
name = re.sub(r'^\d+[\.\-]\s*', '', name)
|
||
# 去掉括号及其内容(如 "(XXX)"、"(初核谈话)" 等)
|
||
name = re.sub(r'[((].*?[))]', '', name)
|
||
# 去掉空格和特殊字符
|
||
name = name.strip()
|
||
return name
|
||
|
||
|
||
def generate_id():
|
||
"""生成ID"""
|
||
import time
|
||
import random
|
||
timestamp = int(time.time() * 1000)
|
||
random_part = random.randint(100000, 999999)
|
||
return timestamp * 1000 + random_part
|
||
|
||
|
||
def identify_document_type(file_name: str) -> Optional[Dict]:
|
||
"""根据完整文件名识别文档类型"""
|
||
base_name = Path(file_name).stem
|
||
if base_name in DOCUMENT_TYPE_MAPPING:
|
||
return DOCUMENT_TYPE_MAPPING[base_name]
|
||
return None
|
||
|
||
|
||
def scan_directory_structure(base_dir: Path) -> Dict:
|
||
"""扫描目录结构,构建树状层级"""
|
||
structure = {
|
||
'directories': {},
|
||
'files': {}
|
||
}
|
||
|
||
def process_path(path: Path, parent_path: Optional[str] = None, level: int = 0):
|
||
"""递归处理路径"""
|
||
if path.is_file() and path.suffix == '.docx':
|
||
file_name = path.stem
|
||
doc_config = identify_document_type(file_name)
|
||
|
||
structure['files'][str(path)] = {
|
||
'name': file_name,
|
||
'parent': parent_path,
|
||
'level': level,
|
||
'template_code': doc_config['template_code'] if doc_config else None,
|
||
'full_path': str(path),
|
||
'normalized_name': normalize_name(file_name)
|
||
}
|
||
elif path.is_dir():
|
||
dir_name = path.name
|
||
structure['directories'][str(path)] = {
|
||
'name': dir_name,
|
||
'parent': parent_path,
|
||
'level': level,
|
||
'normalized_name': normalize_name(dir_name)
|
||
}
|
||
|
||
for child in sorted(path.iterdir()):
|
||
if child.name != '__pycache__':
|
||
process_path(child, str(path), level + 1)
|
||
|
||
if TEMPLATES_DIR.exists():
|
||
for item in sorted(TEMPLATES_DIR.iterdir()):
|
||
if item.name != '__pycache__':
|
||
process_path(item, None, 0)
|
||
|
||
return structure
|
||
|
||
|
||
def get_existing_data(conn) -> Dict:
|
||
"""获取数据库中的现有数据,增强匹配能力"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
sql = """
|
||
SELECT id, name, parent_id, template_code, input_data, file_path, state
|
||
FROM f_polic_file_config
|
||
WHERE tenant_id = %s
|
||
"""
|
||
cursor.execute(sql, (TENANT_ID,))
|
||
configs = cursor.fetchall()
|
||
|
||
result = {
|
||
'by_id': {},
|
||
'by_name': {},
|
||
'by_template_code': {},
|
||
'by_normalized_name': {} # 新增:标准化名称索引
|
||
}
|
||
|
||
for config in configs:
|
||
config_id = config['id']
|
||
config_name = config['name']
|
||
|
||
# 提取 template_code
|
||
template_code = config.get('template_code')
|
||
if not template_code and config.get('input_data'):
|
||
try:
|
||
input_data = json.loads(config['input_data']) if isinstance(config['input_data'], str) else config['input_data']
|
||
if isinstance(input_data, dict):
|
||
template_code = input_data.get('template_code')
|
||
except:
|
||
pass
|
||
|
||
config['extracted_template_code'] = template_code
|
||
config['normalized_name'] = normalize_name(config_name)
|
||
|
||
result['by_id'][config_id] = config
|
||
result['by_name'][config_name] = config
|
||
|
||
if template_code:
|
||
if template_code not in result['by_template_code']:
|
||
result['by_template_code'][template_code] = config
|
||
|
||
# 标准化名称索引(可能有多个记录匹配同一个标准化名称)
|
||
normalized = config['normalized_name']
|
||
if normalized not in result['by_normalized_name']:
|
||
result['by_normalized_name'][normalized] = []
|
||
result['by_normalized_name'][normalized].append(config)
|
||
|
||
cursor.close()
|
||
return result
|
||
|
||
|
||
def find_matching_config(file_info: Dict, existing_data: Dict) -> Optional[Dict]:
|
||
"""
|
||
查找匹配的数据库记录
|
||
优先级:1. template_code 精确匹配 2. 名称精确匹配 3. 标准化名称匹配
|
||
"""
|
||
template_code = file_info.get('template_code')
|
||
file_name = file_info['name']
|
||
normalized_name = file_info.get('normalized_name', normalize_name(file_name))
|
||
|
||
# 优先级1: template_code 精确匹配
|
||
if template_code:
|
||
matched = existing_data['by_template_code'].get(template_code)
|
||
if matched:
|
||
return matched
|
||
|
||
# 优先级2: 名称精确匹配
|
||
matched = existing_data['by_name'].get(file_name)
|
||
if matched:
|
||
return matched
|
||
|
||
# 优先级3: 标准化名称匹配
|
||
candidates = existing_data['by_normalized_name'].get(normalized_name, [])
|
||
if candidates:
|
||
# 如果有多个候选,优先选择有正确 template_code 的
|
||
for candidate in candidates:
|
||
if candidate.get('extracted_template_code') == template_code:
|
||
return candidate
|
||
# 否则返回第一个
|
||
return candidates[0]
|
||
|
||
return None
|
||
|
||
|
||
def plan_tree_structure(dir_structure: Dict, existing_data: Dict) -> List[Dict]:
|
||
"""规划树状结构,使用改进的匹配逻辑"""
|
||
plan = []
|
||
|
||
directories = sorted(dir_structure['directories'].items(),
|
||
key=lambda x: (x[1]['level'], x[0]))
|
||
files = sorted(dir_structure['files'].items(),
|
||
key=lambda x: (x[1]['level'], x[0]))
|
||
|
||
dir_id_map = {}
|
||
|
||
# 处理目录
|
||
for dir_path, dir_info in directories:
|
||
dir_name = dir_info['name']
|
||
parent_path = dir_info['parent']
|
||
level = dir_info['level']
|
||
|
||
parent_id = None
|
||
if parent_path:
|
||
parent_id = dir_id_map.get(parent_path)
|
||
|
||
# 查找匹配的数据库记录
|
||
matched = find_matching_config(dir_info, existing_data)
|
||
|
||
if matched:
|
||
plan.append({
|
||
'type': 'directory',
|
||
'name': dir_name,
|
||
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
|
||
'parent_id': parent_id,
|
||
'level': level,
|
||
'action': 'update',
|
||
'config_id': matched['id'],
|
||
'current_parent_id': matched.get('parent_id'),
|
||
'matched_by': 'existing'
|
||
})
|
||
dir_id_map[dir_path] = matched['id']
|
||
else:
|
||
new_id = generate_id()
|
||
plan.append({
|
||
'type': 'directory',
|
||
'name': dir_name,
|
||
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
|
||
'parent_id': parent_id,
|
||
'level': level,
|
||
'action': 'create',
|
||
'config_id': new_id,
|
||
'current_parent_id': None,
|
||
'matched_by': 'new'
|
||
})
|
||
dir_id_map[dir_path] = new_id
|
||
|
||
# 处理文件
|
||
for file_path, file_info in files:
|
||
file_name = file_info['name']
|
||
parent_path = file_info['parent']
|
||
level = file_info['level']
|
||
template_code = file_info['template_code']
|
||
|
||
parent_id = dir_id_map.get(parent_path) if parent_path else None
|
||
|
||
# 查找匹配的数据库记录
|
||
matched = find_matching_config(file_info, existing_data)
|
||
|
||
if matched:
|
||
plan.append({
|
||
'type': 'file',
|
||
'name': file_name,
|
||
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
|
||
'parent_id': parent_id,
|
||
'level': level,
|
||
'action': 'update',
|
||
'config_id': matched['id'],
|
||
'template_code': template_code,
|
||
'current_parent_id': matched.get('parent_id'),
|
||
'matched_by': 'existing'
|
||
})
|
||
else:
|
||
new_id = generate_id()
|
||
plan.append({
|
||
'type': 'file',
|
||
'name': file_name,
|
||
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
|
||
'parent_id': parent_id,
|
||
'level': level,
|
||
'action': 'create',
|
||
'config_id': new_id,
|
||
'template_code': template_code,
|
||
'current_parent_id': None,
|
||
'matched_by': 'new'
|
||
})
|
||
|
||
return plan
|
||
|
||
|
||
def print_matching_report(plan: List[Dict]):
|
||
"""打印匹配报告"""
|
||
print("\n" + "="*80)
|
||
print("匹配报告")
|
||
print("="*80)
|
||
|
||
matched = [p for p in plan if p.get('matched_by') == 'existing']
|
||
unmatched = [p for p in plan if p.get('matched_by') == 'new']
|
||
|
||
print(f"\n已匹配的记录: {len(matched)} 条")
|
||
print(f"未匹配的记录(将创建): {len(unmatched)} 条\n")
|
||
|
||
if unmatched:
|
||
print("未匹配的记录列表:")
|
||
for item in unmatched:
|
||
print(f" - {item['name']} ({item['type']})")
|
||
|
||
print("\n匹配详情:")
|
||
by_level = {}
|
||
for item in plan:
|
||
level = item['level']
|
||
if level not in by_level:
|
||
by_level[level] = []
|
||
by_level[level].append(item)
|
||
|
||
for level in sorted(by_level.keys()):
|
||
print(f"\n【层级 {level}】")
|
||
for item in by_level[level]:
|
||
indent = " " * level
|
||
match_status = "✓" if item.get('matched_by') == 'existing' else "✗"
|
||
print(f"{indent}{match_status} {item['name']} (ID: {item['config_id']})")
|
||
if item.get('parent_name'):
|
||
print(f"{indent} 父节点: {item['parent_name']}")
|
||
if item['action'] == 'update':
|
||
current = item.get('current_parent_id', 'None')
|
||
new = item.get('parent_id', 'None')
|
||
if current != new:
|
||
print(f"{indent} parent_id: {current} → {new}")
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("="*80)
|
||
print("改进的模板树状结构分析和更新")
|
||
print("="*80)
|
||
|
||
try:
|
||
conn = pymysql.connect(**DB_CONFIG)
|
||
print("✓ 数据库连接成功\n")
|
||
except Exception as e:
|
||
print(f"✗ 数据库连接失败: {e}")
|
||
return
|
||
|
||
try:
|
||
print("扫描目录结构...")
|
||
dir_structure = scan_directory_structure(TEMPLATES_DIR)
|
||
print(f" 找到 {len(dir_structure['directories'])} 个目录")
|
||
print(f" 找到 {len(dir_structure['files'])} 个文件\n")
|
||
|
||
print("获取数据库现有数据...")
|
||
existing_data = get_existing_data(conn)
|
||
print(f" 数据库中有 {len(existing_data['by_id'])} 条记录\n")
|
||
|
||
print("规划树状结构(使用改进的匹配逻辑)...")
|
||
plan = plan_tree_structure(dir_structure, existing_data)
|
||
print(f" 生成 {len(plan)} 个更新计划\n")
|
||
|
||
print_matching_report(plan)
|
||
|
||
# 询问是否继续
|
||
print("\n" + "="*80)
|
||
response = input("\n是否生成更新SQL脚本?(yes/no,默认no): ").strip().lower()
|
||
|
||
if response == 'yes':
|
||
from analyze_and_update_template_tree import generate_update_sql
|
||
sql_file = generate_update_sql(plan)
|
||
print(f"\n✓ SQL脚本已生成: {sql_file}")
|
||
else:
|
||
print("\n已取消")
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|