ai-business-write/improved_match_and_update.py

479 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
改进的匹配和更新脚本
增强匹配逻辑,能够匹配数据库中的已有数据
"""
import os
import json
import pymysql
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
# 项目根目录
PROJECT_ROOT = Path(__file__).parent
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
# 文档类型映射
DOCUMENT_TYPE_MAPPING = {
"1.请示报告卡XXX": {
"template_code": "REPORT_CARD",
"name": "1.请示报告卡XXX",
"business_type": "INVESTIGATION"
},
"2.初步核实审批表XXX": {
"template_code": "PRELIMINARY_VERIFICATION_APPROVAL",
"name": "2.初步核实审批表XXX",
"business_type": "INVESTIGATION"
},
"3.附件初核方案(XXX)": {
"template_code": "INVESTIGATION_PLAN",
"name": "3.附件初核方案(XXX)",
"business_type": "INVESTIGATION"
},
"谈话通知书第一联": {
"template_code": "NOTIFICATION_LETTER_1",
"name": "谈话通知书第一联",
"business_type": "INVESTIGATION"
},
"谈话通知书第二联": {
"template_code": "NOTIFICATION_LETTER_2",
"name": "谈话通知书第二联",
"business_type": "INVESTIGATION"
},
"谈话通知书第三联": {
"template_code": "NOTIFICATION_LETTER_3",
"name": "谈话通知书第三联",
"business_type": "INVESTIGATION"
},
"1.请示报告卡(初核谈话)": {
"template_code": "REPORT_CARD_INTERVIEW",
"name": "1.请示报告卡(初核谈话)",
"business_type": "INVESTIGATION"
},
"2谈话审批表": {
"template_code": "INTERVIEW_APPROVAL_FORM",
"name": "2谈话审批表",
"business_type": "INVESTIGATION"
},
"3.谈话前安全风险评估表": {
"template_code": "PRE_INTERVIEW_RISK_ASSESSMENT",
"name": "3.谈话前安全风险评估表",
"business_type": "INVESTIGATION"
},
"4.谈话方案": {
"template_code": "INTERVIEW_PLAN",
"name": "4.谈话方案",
"business_type": "INVESTIGATION"
},
"5.谈话后安全风险评估表": {
"template_code": "POST_INTERVIEW_RISK_ASSESSMENT",
"name": "5.谈话后安全风险评估表",
"business_type": "INVESTIGATION"
},
"1.谈话笔录": {
"template_code": "INTERVIEW_RECORD",
"name": "1.谈话笔录",
"business_type": "INVESTIGATION"
},
"2.谈话询问对象情况摸底调查30问": {
"template_code": "INVESTIGATION_30_QUESTIONS",
"name": "2.谈话询问对象情况摸底调查30问",
"business_type": "INVESTIGATION"
},
"3.被谈话人权利义务告知书": {
"template_code": "RIGHTS_OBLIGATIONS_NOTICE",
"name": "3.被谈话人权利义务告知书",
"business_type": "INVESTIGATION"
},
"4.点对点交接单": {
"template_code": "HANDOVER_FORM",
"name": "4.点对点交接单",
"business_type": "INVESTIGATION"
},
"5.陪送交接单(新)": {
"template_code": "ESCORT_HANDOVER_FORM",
"name": "5.陪送交接单(新)",
"business_type": "INVESTIGATION"
},
"6.1保密承诺书(谈话对象使用-非中共党员用)": {
"template_code": "CONFIDENTIALITY_COMMITMENT_NON_PARTY",
"name": "6.1保密承诺书(谈话对象使用-非中共党员用)",
"business_type": "INVESTIGATION"
},
"6.2保密承诺书(谈话对象使用-中共党员用)": {
"template_code": "CONFIDENTIALITY_COMMITMENT_PARTY",
"name": "6.2保密承诺书(谈话对象使用-中共党员用)",
"business_type": "INVESTIGATION"
},
"7.办案人员-办案安全保密承诺书": {
"template_code": "INVESTIGATOR_CONFIDENTIALITY_COMMITMENT",
"name": "7.办案人员-办案安全保密承诺书",
"business_type": "INVESTIGATION"
},
"8-1请示报告卡初核报告结论 ": {
"template_code": "REPORT_CARD_CONCLUSION",
"name": "8-1请示报告卡初核报告结论 ",
"business_type": "INVESTIGATION"
},
"8.XXX初核情况报告": {
"template_code": "INVESTIGATION_REPORT",
"name": "8.XXX初核情况报告",
"business_type": "INVESTIGATION"
}
}
def normalize_name(name: str) -> str:
"""标准化名称,用于模糊匹配"""
# 去掉开头的编号(如 "1."、"2."、"8-1" 等)
name = re.sub(r'^\d+[\.\-]\s*', '', name)
# 去掉括号及其内容(如 "XXX"、"(初核谈话)" 等)
name = re.sub(r'[(].*?[)]', '', name)
# 去掉空格和特殊字符
name = name.strip()
return name
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def identify_document_type(file_name: str) -> Optional[Dict]:
"""根据完整文件名识别文档类型"""
base_name = Path(file_name).stem
if base_name in DOCUMENT_TYPE_MAPPING:
return DOCUMENT_TYPE_MAPPING[base_name]
return None
def scan_directory_structure(base_dir: Path) -> Dict:
"""扫描目录结构,构建树状层级"""
structure = {
'directories': {},
'files': {}
}
def process_path(path: Path, parent_path: Optional[str] = None, level: int = 0):
"""递归处理路径"""
if path.is_file() and path.suffix == '.docx':
file_name = path.stem
doc_config = identify_document_type(file_name)
structure['files'][str(path)] = {
'name': file_name,
'parent': parent_path,
'level': level,
'template_code': doc_config['template_code'] if doc_config else None,
'full_path': str(path),
'normalized_name': normalize_name(file_name)
}
elif path.is_dir():
dir_name = path.name
structure['directories'][str(path)] = {
'name': dir_name,
'parent': parent_path,
'level': level,
'normalized_name': normalize_name(dir_name)
}
for child in sorted(path.iterdir()):
if child.name != '__pycache__':
process_path(child, str(path), level + 1)
if TEMPLATES_DIR.exists():
for item in sorted(TEMPLATES_DIR.iterdir()):
if item.name != '__pycache__':
process_path(item, None, 0)
return structure
def get_existing_data(conn) -> Dict:
"""获取数据库中的现有数据,增强匹配能力"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, parent_id, template_code, input_data, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
configs = cursor.fetchall()
result = {
'by_id': {},
'by_name': {},
'by_template_code': {},
'by_normalized_name': {} # 新增:标准化名称索引
}
for config in configs:
config_id = config['id']
config_name = config['name']
# 提取 template_code
template_code = config.get('template_code')
if not template_code and config.get('input_data'):
try:
input_data = json.loads(config['input_data']) if isinstance(config['input_data'], str) else config['input_data']
if isinstance(input_data, dict):
template_code = input_data.get('template_code')
except:
pass
config['extracted_template_code'] = template_code
config['normalized_name'] = normalize_name(config_name)
result['by_id'][config_id] = config
result['by_name'][config_name] = config
if template_code:
if template_code not in result['by_template_code']:
result['by_template_code'][template_code] = config
# 标准化名称索引(可能有多个记录匹配同一个标准化名称)
normalized = config['normalized_name']
if normalized not in result['by_normalized_name']:
result['by_normalized_name'][normalized] = []
result['by_normalized_name'][normalized].append(config)
cursor.close()
return result
def find_matching_config(file_info: Dict, existing_data: Dict) -> Optional[Dict]:
"""
查找匹配的数据库记录
优先级1. template_code 精确匹配 2. 名称精确匹配 3. 标准化名称匹配
"""
template_code = file_info.get('template_code')
file_name = file_info['name']
normalized_name = file_info.get('normalized_name', normalize_name(file_name))
# 优先级1: template_code 精确匹配
if template_code:
matched = existing_data['by_template_code'].get(template_code)
if matched:
return matched
# 优先级2: 名称精确匹配
matched = existing_data['by_name'].get(file_name)
if matched:
return matched
# 优先级3: 标准化名称匹配
candidates = existing_data['by_normalized_name'].get(normalized_name, [])
if candidates:
# 如果有多个候选,优先选择有正确 template_code 的
for candidate in candidates:
if candidate.get('extracted_template_code') == template_code:
return candidate
# 否则返回第一个
return candidates[0]
return None
def plan_tree_structure(dir_structure: Dict, existing_data: Dict) -> List[Dict]:
"""规划树状结构,使用改进的匹配逻辑"""
plan = []
directories = sorted(dir_structure['directories'].items(),
key=lambda x: (x[1]['level'], x[0]))
files = sorted(dir_structure['files'].items(),
key=lambda x: (x[1]['level'], x[0]))
dir_id_map = {}
# 处理目录
for dir_path, dir_info in directories:
dir_name = dir_info['name']
parent_path = dir_info['parent']
level = dir_info['level']
parent_id = None
if parent_path:
parent_id = dir_id_map.get(parent_path)
# 查找匹配的数据库记录
matched = find_matching_config(dir_info, existing_data)
if matched:
plan.append({
'type': 'directory',
'name': dir_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'update',
'config_id': matched['id'],
'current_parent_id': matched.get('parent_id'),
'matched_by': 'existing'
})
dir_id_map[dir_path] = matched['id']
else:
new_id = generate_id()
plan.append({
'type': 'directory',
'name': dir_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'create',
'config_id': new_id,
'current_parent_id': None,
'matched_by': 'new'
})
dir_id_map[dir_path] = new_id
# 处理文件
for file_path, file_info in files:
file_name = file_info['name']
parent_path = file_info['parent']
level = file_info['level']
template_code = file_info['template_code']
parent_id = dir_id_map.get(parent_path) if parent_path else None
# 查找匹配的数据库记录
matched = find_matching_config(file_info, existing_data)
if matched:
plan.append({
'type': 'file',
'name': file_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'update',
'config_id': matched['id'],
'template_code': template_code,
'current_parent_id': matched.get('parent_id'),
'matched_by': 'existing'
})
else:
new_id = generate_id()
plan.append({
'type': 'file',
'name': file_name,
'parent_name': dir_structure['directories'].get(parent_path, {}).get('name') if parent_path else None,
'parent_id': parent_id,
'level': level,
'action': 'create',
'config_id': new_id,
'template_code': template_code,
'current_parent_id': None,
'matched_by': 'new'
})
return plan
def print_matching_report(plan: List[Dict]):
"""打印匹配报告"""
print("\n" + "="*80)
print("匹配报告")
print("="*80)
matched = [p for p in plan if p.get('matched_by') == 'existing']
unmatched = [p for p in plan if p.get('matched_by') == 'new']
print(f"\n已匹配的记录: {len(matched)}")
print(f"未匹配的记录(将创建): {len(unmatched)}\n")
if unmatched:
print("未匹配的记录列表:")
for item in unmatched:
print(f" - {item['name']} ({item['type']})")
print("\n匹配详情:")
by_level = {}
for item in plan:
level = item['level']
if level not in by_level:
by_level[level] = []
by_level[level].append(item)
for level in sorted(by_level.keys()):
print(f"\n【层级 {level}")
for item in by_level[level]:
indent = " " * level
match_status = "" if item.get('matched_by') == 'existing' else ""
print(f"{indent}{match_status} {item['name']} (ID: {item['config_id']})")
if item.get('parent_name'):
print(f"{indent} 父节点: {item['parent_name']}")
if item['action'] == 'update':
current = item.get('current_parent_id', 'None')
new = item.get('parent_id', 'None')
if current != new:
print(f"{indent} parent_id: {current}{new}")
def main():
"""主函数"""
print("="*80)
print("改进的模板树状结构分析和更新")
print("="*80)
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功\n")
except Exception as e:
print(f"✗ 数据库连接失败: {e}")
return
try:
print("扫描目录结构...")
dir_structure = scan_directory_structure(TEMPLATES_DIR)
print(f" 找到 {len(dir_structure['directories'])} 个目录")
print(f" 找到 {len(dir_structure['files'])} 个文件\n")
print("获取数据库现有数据...")
existing_data = get_existing_data(conn)
print(f" 数据库中有 {len(existing_data['by_id'])} 条记录\n")
print("规划树状结构(使用改进的匹配逻辑)...")
plan = plan_tree_structure(dir_structure, existing_data)
print(f" 生成 {len(plan)} 个更新计划\n")
print_matching_report(plan)
# 询问是否继续
print("\n" + "="*80)
response = input("\n是否生成更新SQL脚本(yes/no默认no): ").strip().lower()
if response == 'yes':
from analyze_and_update_template_tree import generate_update_sql
sql_file = generate_update_sql(plan)
print(f"\n✓ SQL脚本已生成: {sql_file}")
else:
print("\n已取消")
finally:
conn.close()
if __name__ == '__main__':
main()