ai-business-write/rebuild_template_field_relations.py

537 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
重新建立模板和字段的关联关系
根据模板名称,重新建立 f_polic_file_field 表的关联关系
不再依赖 input_data 和 template_code 字段
"""
import pymysql
import os
import json
from typing import Dict, List, Set, Optional
from datetime import datetime
from collections import defaultdict
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
# 模板名称到字段编码的映射(根据业务逻辑定义)
# 格式:{模板名称: {'input_fields': [字段编码列表], 'output_fields': [字段编码列表]}}
TEMPLATE_FIELD_MAPPING = {
# 初步核实审批表
'初步核实审批表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_organization',
'target_position', 'target_gender', 'target_date_of_birth', 'target_age',
'target_education_level', 'target_political_status', 'target_professional_rank',
'clue_source', 'target_issue_description', 'department_opinion', 'filler_name'
]
},
# 谈话前安全风险评估表
'谈话前安全风险评估表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_family_situation', 'target_social_relations', 'target_health_status',
'target_personality', 'target_tolerance', 'target_issue_severity',
'target_other_issues_possibility', 'target_previous_investigation',
'target_negative_events', 'target_other_situation', 'risk_level'
]
},
# 请示报告卡
'请示报告卡': {
'input_fields': ['clue_info'],
'output_fields': ['target_name', 'target_organization_and_position', 'report_card_request_time']
},
# 初核方案
'初核方案': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_work_basic_info',
'target_issue_description', 'investigation_unit_name', 'investigation_team_leader_name',
'investigation_team_member_names', 'investigation_location'
]
},
# 谈话通知书
'谈话通知书': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话通知书第一联
'谈话通知书第一联': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话通知书第二联
'谈话通知书第二联': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话通知书第三联
'谈话通知书第三联': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话笔录
'谈话笔录': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_gender',
'target_date_of_birth_full', 'target_political_status', 'target_address',
'target_registered_address', 'target_contact', 'target_place_of_origin',
'target_ethnicity', 'target_id_number', 'investigation_team_code'
]
},
# 谈话后安全风险评估表
'谈话后安全风险评估表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_gender',
'target_date_of_birth_full', 'target_political_status', 'target_address',
'target_registered_address', 'target_contact', 'target_place_of_origin',
'target_ethnicity', 'target_id_number', 'investigation_team_code'
]
},
# XXX初核情况报告
'XXX初核情况报告': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_issue_description',
'target_work_basic_info', 'investigation_unit_name', 'investigation_team_leader_name'
]
},
# 走读式谈话审批
'走读式谈话审批': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name'
]
},
# 走读式谈话流程
'走读式谈话流程': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name'
]
},
# 谈话审批 / 谈话审批表
'谈话审批': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name'
]
},
'谈话审批表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_gender',
'target_date_of_birth_full', 'target_political_status', 'target_address',
'target_registered_address', 'target_contact', 'target_place_of_origin',
'target_ethnicity', 'target_id_number', 'investigation_team_code'
]
},
}
# 模板名称的标准化映射(处理不同的命名方式)
TEMPLATE_NAME_NORMALIZE = {
'1.请示报告卡XXX': '请示报告卡',
'2.初步核实审批表XXX': '初步核实审批表',
'3.附件初核方案(XXX)': '初核方案',
'8.XXX初核情况报告': 'XXX初核情况报告',
'2.谈话审批': '谈话审批',
'2谈话审批表': '谈话审批表',
}
def generate_id():
"""生成ID使用时间戳+随机数的方式,模拟雪花算法)"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def normalize_template_name(name: str) -> str:
"""标准化模板名称"""
# 先检查映射表
if name in TEMPLATE_NAME_NORMALIZE:
return TEMPLATE_NAME_NORMALIZE[name]
# 移除常见的后缀和前缀
name = name.strip()
# 移除括号内容
import re
name = re.sub(r'[(].*?[)]', '', name)
name = name.strip()
# 移除数字前缀和点号
name = re.sub(r'^\d+\.', '', name)
name = name.strip()
return name
def get_all_templates(conn) -> Dict:
"""获取所有模板配置"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, parent_id, state
FROM f_polic_file_config
WHERE tenant_id = %s
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
result = {}
for template in templates:
name = template['name']
normalized_name = normalize_template_name(name)
# 处理state字段可能是二进制格式
state = template['state']
if isinstance(state, bytes):
state = int.from_bytes(state, byteorder='big')
elif isinstance(state, (int, str)):
state = int(state)
else:
state = 0
result[template['id']] = {
'id': template['id'],
'name': name,
'normalized_name': normalized_name,
'parent_id': template['parent_id'],
'state': state
}
cursor.close()
return result
def get_all_fields(conn) -> Dict:
"""获取所有字段定义"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
ORDER BY field_type, filed_code
"""
cursor.execute(sql, (TENANT_ID,))
fields = cursor.fetchall()
result = {
'by_code': {},
'by_name': {},
'input_fields': [],
'output_fields': []
}
for field in fields:
field_code = field['filed_code']
field_name = field['name']
field_type = field['field_type']
result['by_code'][field_code] = field
result['by_name'][field_name] = field
if field_type == 1:
result['input_fields'].append(field)
elif field_type == 2:
result['output_fields'].append(field)
cursor.close()
return result
def get_existing_relations(conn) -> Set[tuple]:
"""获取现有的关联关系"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT file_id, filed_id
FROM f_polic_file_field
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
relations = cursor.fetchall()
result = {(rel['file_id'], rel['filed_id']) for rel in relations}
cursor.close()
return result
def rebuild_template_relations(conn, template_id: int, template_name: str,
normalized_name: str, field_mapping: Dict,
dry_run: bool = True) -> Dict:
"""重建单个模板的关联关系"""
cursor = conn.cursor()
# 查找模板对应的字段配置
template_config = None
# 优先精确匹配标准化名称
if normalized_name in TEMPLATE_FIELD_MAPPING:
template_config = TEMPLATE_FIELD_MAPPING[normalized_name]
else:
# 尝试模糊匹配
for name, config in TEMPLATE_FIELD_MAPPING.items():
if name == normalized_name or name in normalized_name or normalized_name in name:
template_config = config
break
# 也检查原始名称
if name in template_name or template_name in name:
template_config = config
break
if not template_config:
return {
'template_id': template_id,
'template_name': template_name,
'status': 'skipped',
'reason': '未找到字段配置映射',
'input_count': 0,
'output_count': 0
}
input_field_codes = template_config.get('input_fields', [])
output_field_codes = template_config.get('output_fields', [])
# 查找字段ID
input_field_ids = []
output_field_ids = []
for field_code in input_field_codes:
field = field_mapping['by_code'].get(field_code)
if field:
if field['field_type'] == 1:
input_field_ids.append(field['id'])
else:
print(f" ⚠ 警告: 字段 {field_code} 应该是输入字段,但实际类型为 {field['field_type']}")
else:
print(f" ⚠ 警告: 字段 {field_code} 不存在")
for field_code in output_field_codes:
field = field_mapping['by_code'].get(field_code)
if field:
if field['field_type'] == 2:
output_field_ids.append(field['id'])
else:
print(f" ⚠ 警告: 字段 {field_code} 应该是输出字段,但实际类型为 {field['field_type']}")
else:
print(f" ⚠ 警告: 字段 {field_code} 不存在")
# 删除旧的关联关系
if not dry_run:
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
deleted_count = cursor.rowcount
else:
deleted_count = 0
# 创建新的关联关系
created_count = 0
all_field_ids = input_field_ids + output_field_ids
for field_id in all_field_ids:
if not dry_run:
# 检查是否已存在(虽然已经删除了,但为了安全还是检查一下)
check_sql = """
SELECT id FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s AND filed_id = %s
"""
cursor.execute(check_sql, (TENANT_ID, template_id, field_id))
existing = cursor.fetchone()
if not existing:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
relation_id, TENANT_ID, template_id, field_id,
CREATED_BY, UPDATED_BY, 1 # state=1 表示启用
))
created_count += 1
else:
created_count += 1
if not dry_run:
conn.commit()
return {
'template_id': template_id,
'template_name': template_name,
'normalized_name': normalized_name,
'status': 'success',
'deleted_count': deleted_count,
'input_count': len(input_field_ids),
'output_count': len(output_field_ids),
'created_count': created_count
}
def main(dry_run: bool = True):
"""主函数"""
print("="*80)
print("重新建立模板和字段的关联关系")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
else:
print("\n[实际执行模式 - 将修改数据库]")
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功\n")
# 获取所有模板
print("1. 获取所有模板配置...")
templates = get_all_templates(conn)
print(f" 找到 {len(templates)} 个模板")
# 获取所有字段
print("\n2. 获取所有字段定义...")
field_mapping = get_all_fields(conn)
print(f" 输入字段: {len(field_mapping['input_fields'])}")
print(f" 输出字段: {len(field_mapping['output_fields'])}")
print(f" 总字段数: {len(field_mapping['by_code'])}")
# 获取现有关联关系
print("\n3. 获取现有关联关系...")
existing_relations = get_existing_relations(conn)
print(f" 现有关联关系: {len(existing_relations)}")
# 重建关联关系
print("\n4. 重建模板和字段的关联关系...")
print("="*80)
results = []
for template_id, template_info in templates.items():
template_name = template_info['name']
normalized_name = template_info['normalized_name']
state = template_info['state']
# 处理所有模板(包括未启用的,因为可能需要建立关联)
# 但可以记录状态
status_note = f" (state={state})" if state != 1 else ""
if state != 1:
print(f"\n处理未启用的模板: {template_name}{status_note}")
print(f"\n处理模板: {template_name}")
print(f" 标准化名称: {normalized_name}")
result = rebuild_template_relations(
conn, template_id, template_name, normalized_name,
field_mapping, dry_run=dry_run
)
results.append(result)
if result['status'] == 'success':
print(f" ✓ 成功: 删除 {result['deleted_count']} 条旧关联, "
f"创建 {result['created_count']} 条新关联 "
f"(输入字段: {result['input_count']}, 输出字段: {result['output_count']})")
else:
print(f"{result['status']}: {result.get('reason', '')}")
# 统计信息
print("\n" + "="*80)
print("处理结果统计")
print("="*80)
success_count = sum(1 for r in results if r['status'] == 'success')
skipped_count = sum(1 for r in results if r['status'] == 'skipped')
total_input = sum(r.get('input_count', 0) for r in results)
total_output = sum(r.get('output_count', 0) for r in results)
total_created = sum(r.get('created_count', 0) for r in results)
print(f"\n成功处理: {success_count} 个模板")
print(f"跳过: {skipped_count} 个模板")
print(f"总输入字段关联: {total_input}")
print(f"总输出字段关联: {total_output}")
print(f"总关联关系: {total_created}")
# 显示详细结果
print("\n详细结果:")
for result in results:
if result['status'] == 'success':
print(f" - {result['template_name']}: "
f"输入字段 {result['input_count']} 个, "
f"输出字段 {result['output_count']}")
else:
print(f" - {result['template_name']}: {result['status']} - {result.get('reason', '')}")
print("\n" + "="*80)
if dry_run:
print("\n这是DRY RUN模式未实际修改数据库。")
print("要实际执行,请运行: python rebuild_template_field_relations.py --execute")
else:
print("\n✓ 关联关系已更新完成")
except Exception as e:
print(f"\n✗ 发生错误: {e}")
import traceback
traceback.print_exc()
if not dry_run:
conn.rollback()
finally:
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
import sys
dry_run = '--execute' not in sys.argv
if not dry_run:
print("\n⚠ 警告: 这将修改数据库!")
response = input("确认要继续吗? (yes/no): ")
if response.lower() != 'yes':
print("操作已取消")
sys.exit(0)
main(dry_run=dry_run)