ai-business-write/sync_template_fields_from_excel.py

553 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
根据Excel数据设计文档同步更新模板的input_data、template_code和字段关联关系
"""
import os
import json
import pymysql
import pandas as pd
from pathlib import Path
from typing import Dict, List, Optional, Set
from datetime import datetime
from collections import defaultdict
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
# Excel文件路径
EXCEL_FILE = '技术文档/智慧监督项目模板数据结构设计表-20251125-一凡标注.xlsx'
# 模板名称映射Excel中的名称 -> 数据库中的名称)
TEMPLATE_NAME_MAPPING = {
'请示报告卡': '1.请示报告卡XXX',
'初步核实审批表': '2.初步核实审批表XXX',
'初核方案': '3.附件初核方案(XXX)',
'谈话通知书': '谈话通知书',
'谈话通知书第一联': '谈话通知书第一联',
'谈话通知书第二联': '谈话通知书第二联',
'谈话通知书第三联': '谈话通知书第三联',
'走读式谈话审批': '走读式谈话审批',
'走读式谈话流程': '走读式谈话流程',
'请示报告卡(初核报告结论)': '8-1请示报告卡初核报告结论 ',
'XXX初核情况报告': '8.XXX初核情况报告',
}
# 模板编码映射Excel中的名称 -> template_code
TEMPLATE_CODE_MAPPING = {
'请示报告卡': 'REPORT_CARD',
'初步核实审批表': 'PRELIMINARY_VERIFICATION_APPROVAL',
'初核方案': 'INVESTIGATION_PLAN',
'谈话通知书第一联': 'NOTIFICATION_LETTER_1',
'谈话通知书第二联': 'NOTIFICATION_LETTER_2',
'谈话通知书第三联': 'NOTIFICATION_LETTER_3',
'请示报告卡(初核报告结论)': 'REPORT_CARD_CONCLUSION',
'XXX初核情况报告': 'INVESTIGATION_REPORT',
}
# 字段名称到字段编码的映射
FIELD_NAME_TO_CODE_MAP = {
# 输入字段
'线索信息': 'clue_info',
'被核查人员工作基本情况线索': 'target_basic_info_clue',
# 输出字段 - 基本信息
'被核查人姓名': 'target_name',
'被核查人员单位及职务': 'target_organization_and_position',
'被核查人员性别': 'target_gender',
'被核查人员出生年月': 'target_date_of_birth',
'被核查人员出生年月日': 'target_date_of_birth_full',
'被核查人员政治面貌': 'target_political_status',
'被核查人员职级': 'target_professional_rank',
'被核查人员单位': 'target_organization',
'被核查人员职务': 'target_position',
# 输出字段 - 其他信息
'线索来源': 'clue_source',
'主要问题线索': 'target_issue_description',
'初步核实审批表承办部门意见': 'department_opinion',
'初步核实审批表填表人': 'filler_name',
'请示报告卡请示时间': 'report_card_request_time',
'被核查人员身份证件及号码': 'target_id_number',
'被核查人员身份证号': 'target_id_number',
'应到时间': 'appointment_time',
'应到地点': 'appointment_location',
'批准时间': 'approval_time',
'承办部门': 'handling_department',
'承办人': 'handler_name',
'谈话通知时间': 'notification_time',
'谈话通知地点': 'notification_location',
'被核查人员住址': 'target_address',
'被核查人员户籍住址': 'target_registered_address',
'被核查人员联系方式': 'target_contact',
'被核查人员籍贯': 'target_place_of_origin',
'被核查人员民族': 'target_ethnicity',
'被核查人员工作基本情况': 'target_work_basic_info',
'核查单位名称': 'investigation_unit_name',
'核查组组长姓名': 'investigation_team_leader_name',
'核查组成员姓名': 'investigation_team_member_names',
'核查地点': 'investigation_location',
}
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def normalize_template_name(name: str) -> str:
"""标准化模板名称,用于匹配"""
import re
# 去掉开头的编号和括号内容
name = re.sub(r'^\d+[\.\-]\s*', '', name)
name = re.sub(r'[(].*?[)]', '', name)
name = name.strip()
return name
def parse_excel_data() -> Dict:
"""解析Excel文件提取模板和字段的关联关系"""
print("="*80)
print("解析Excel数据设计文档")
print("="*80)
if not Path(EXCEL_FILE).exists():
print(f"✗ Excel文件不存在: {EXCEL_FILE}")
return None
try:
df = pd.read_excel(EXCEL_FILE)
print(f"✓ 成功读取Excel文件{len(df)} 行数据\n")
templates = defaultdict(lambda: {
'template_name': '',
'template_code': '',
'input_fields': [],
'output_fields': []
})
current_template = None
current_input_field = None
for idx, row in df.iterrows():
level1 = row.get('一级分类')
level2 = row.get('二级分类')
level3 = row.get('三级分类')
input_field = row.get('输入数据字段')
output_field = row.get('输出数据字段')
# 处理二级分类(模板名称)
if pd.notna(level2) and level2:
current_template = str(level2).strip()
# 获取模板编码
template_code = TEMPLATE_CODE_MAPPING.get(current_template, '')
if not template_code:
# 如果没有映射,尝试生成
template_code = current_template.upper().replace(' ', '_')
templates[current_template]['template_name'] = current_template
templates[current_template]['template_code'] = template_code
current_input_field = None # 重置输入字段
print(f" 模板: {current_template} (code: {template_code})")
# 处理三级分类(子模板,如谈话通知书第一联)
if pd.notna(level3) and level3:
current_template = str(level3).strip()
template_code = TEMPLATE_CODE_MAPPING.get(current_template, '')
if not template_code:
template_code = current_template.upper().replace(' ', '_')
templates[current_template]['template_name'] = current_template
templates[current_template]['template_code'] = template_code
current_input_field = None
print(f" 子模板: {current_template} (code: {template_code})")
# 处理输入字段
if pd.notna(input_field) and input_field:
input_field_name = str(input_field).strip()
if input_field_name != current_input_field:
current_input_field = input_field_name
field_code = FIELD_NAME_TO_CODE_MAP.get(input_field_name, input_field_name.lower().replace(' ', '_'))
if current_template:
templates[current_template]['input_fields'].append({
'name': input_field_name,
'field_code': field_code
})
# 处理输出字段
if pd.notna(output_field) and output_field:
output_field_name = str(output_field).strip()
field_code = FIELD_NAME_TO_CODE_MAP.get(output_field_name, output_field_name.lower().replace(' ', '_'))
if current_template:
templates[current_template]['output_fields'].append({
'name': output_field_name,
'field_code': field_code
})
# 去重
for template_name, template_info in templates.items():
# 输入字段去重
seen_input = set()
unique_input = []
for field in template_info['input_fields']:
key = field['field_code']
if key not in seen_input:
seen_input.add(key)
unique_input.append(field)
template_info['input_fields'] = unique_input
# 输出字段去重
seen_output = set()
unique_output = []
for field in template_info['output_fields']:
key = field['field_code']
if key not in seen_output:
seen_output.add(key)
unique_output.append(field)
template_info['output_fields'] = unique_output
print(f"\n✓ 解析完成,共 {len(templates)} 个模板")
for template_name, template_info in templates.items():
print(f" - {template_name}: {len(template_info['input_fields'])} 个输入字段, {len(template_info['output_fields'])} 个输出字段")
return dict(templates)
except Exception as e:
print(f"✗ 解析Excel文件失败: {e}")
import traceback
traceback.print_exc()
return None
def get_database_templates(conn) -> Dict:
"""获取数据库中的模板配置"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, template_code, input_data, parent_id
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
configs = cursor.fetchall()
result = {}
for config in configs:
name = config['name']
result[name] = config
# 也添加标准化名称的映射
normalized = normalize_template_name(name)
if normalized not in result:
result[normalized] = config
cursor.close()
return result
def get_database_fields(conn) -> Dict:
"""获取数据库中的字段定义"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type
FROM f_polic_field
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
fields = cursor.fetchall()
result = {
'by_code': {},
'by_name': {}
}
for field in fields:
field_code = field['filed_code']
field_name = field['name']
result['by_code'][field_code] = field
result['by_name'][field_name] = field
cursor.close()
return result
def find_matching_template(excel_template_name: str, db_templates: Dict) -> Optional[Dict]:
"""查找匹配的数据库模板"""
# 1. 精确匹配
if excel_template_name in db_templates:
return db_templates[excel_template_name]
# 2. 通过映射表匹配
mapped_name = TEMPLATE_NAME_MAPPING.get(excel_template_name)
if mapped_name and mapped_name in db_templates:
return db_templates[mapped_name]
# 3. 标准化名称匹配
normalized = normalize_template_name(excel_template_name)
if normalized in db_templates:
return db_templates[normalized]
# 4. 模糊匹配
for db_name, db_config in db_templates.items():
if normalized in normalize_template_name(db_name) or normalize_template_name(db_name) in normalized:
return db_config
return None
def update_template_config(conn, template_id: int, template_code: str, input_fields: List[Dict], dry_run: bool = True):
"""更新模板配置的input_data和template_code"""
cursor = conn.cursor()
try:
# 构建input_data
input_data = {
'template_code': template_code,
'business_type': 'INVESTIGATION',
'input_fields': [f['field_code'] for f in input_fields]
}
input_data_json = json.dumps(input_data, ensure_ascii=False)
if not dry_run:
update_sql = """
UPDATE f_polic_file_config
SET template_code = %s, input_data = %s, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (template_code, input_data_json, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" ✓ 更新模板配置")
else:
print(f" [模拟] 将更新模板配置: template_code={template_code}")
finally:
cursor.close()
def update_template_field_relations(conn, template_id: int, input_fields: List[Dict], output_fields: List[Dict],
db_fields: Dict, dry_run: bool = True):
"""更新模板和字段的关联关系"""
cursor = conn.cursor()
try:
# 先删除旧的关联关系
if not dry_run:
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
# 创建新的关联关系
relations_created = 0
# 关联输入字段field_type=1
for field_info in input_fields:
field_code = field_info['field_code']
field = db_fields['by_code'].get(field_code)
if not field:
print(f" ⚠ 输入字段不存在: {field_code}")
continue
if field['field_type'] != 1:
print(f" ⚠ 字段类型不匹配: {field_code} (期望输入字段,实际为输出字段)")
continue
if not dry_run:
# 检查是否已存在
check_sql = """
SELECT id FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s AND filed_id = %s
"""
cursor.execute(check_sql, (TENANT_ID, template_id, field['id']))
existing = cursor.fetchone()
if not existing:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
relation_id, TENANT_ID, template_id, field['id'],
CREATED_BY, UPDATED_BY, 1
))
relations_created += 1
else:
relations_created += 1
# 关联输出字段field_type=2
for field_info in output_fields:
field_code = field_info['field_code']
field = db_fields['by_code'].get(field_code)
if not field:
# 尝试通过名称匹配
field_name = field_info['name']
field = db_fields['by_name'].get(field_name)
if not field:
print(f" ⚠ 输出字段不存在: {field_code} ({field_info['name']})")
continue
if field['field_type'] != 2:
print(f" ⚠ 字段类型不匹配: {field_code} (期望输出字段,实际为输入字段)")
continue
if not dry_run:
# 检查是否已存在
check_sql = """
SELECT id FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s AND filed_id = %s
"""
cursor.execute(check_sql, (TENANT_ID, template_id, field['id']))
existing = cursor.fetchone()
if not existing:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
relation_id, TENANT_ID, template_id, field['id'],
CREATED_BY, UPDATED_BY, 1
))
relations_created += 1
else:
relations_created += 1
if not dry_run:
conn.commit()
print(f" ✓ 创建 {relations_created} 个字段关联关系")
else:
print(f" [模拟] 将创建 {relations_created} 个字段关联关系")
finally:
cursor.close()
def main():
"""主函数"""
print("="*80)
print("同步模板字段信息根据Excel数据设计文档")
print("="*80)
# 解析Excel
excel_data = parse_excel_data()
if not excel_data:
return
# 连接数据库
try:
conn = pymysql.connect(**DB_CONFIG)
print("\n✓ 数据库连接成功")
except Exception as e:
print(f"\n✗ 数据库连接失败: {e}")
return
try:
# 获取数据库中的模板和字段
print("\n获取数据库中的模板和字段...")
db_templates = get_database_templates(conn)
db_fields = get_database_fields(conn)
print(f" 数据库中有 {len(db_templates)} 个模板")
print(f" 数据库中有 {len(db_fields['by_code'])} 个字段")
# 匹配和更新
print("\n" + "="*80)
print("匹配模板并更新配置")
print("="*80)
matched_count = 0
unmatched_templates = []
for excel_template_name, template_info in excel_data.items():
print(f"\n处理模板: {excel_template_name}")
# 查找匹配的数据库模板
db_template = find_matching_template(excel_template_name, db_templates)
if not db_template:
print(f" ✗ 未找到匹配的数据库模板")
unmatched_templates.append(excel_template_name)
continue
print(f" ✓ 匹配到数据库模板: {db_template['name']} (ID: {db_template['id']})")
matched_count += 1
# 更新模板配置
template_code = template_info['template_code']
input_fields = template_info['input_fields']
output_fields = template_info['output_fields']
print(f" 模板编码: {template_code}")
print(f" 输入字段: {len(input_fields)}")
print(f" 输出字段: {len(output_fields)}")
# 先执行模拟更新
print(" [模拟模式]")
update_template_config(conn, db_template['id'], template_code, input_fields, dry_run=True)
update_template_field_relations(conn, db_template['id'], input_fields, output_fields, db_fields, dry_run=True)
# 显示统计
print("\n" + "="*80)
print("统计信息")
print("="*80)
print(f"Excel中的模板数: {len(excel_data)}")
print(f"成功匹配: {matched_count}")
print(f"未匹配: {len(unmatched_templates)}")
if unmatched_templates:
print("\n未匹配的模板:")
for template in unmatched_templates:
print(f" - {template}")
# 询问是否执行实际更新
print("\n" + "="*80)
response = input("\n是否执行实际更新?(yes/no默认no): ").strip().lower()
if response == 'yes':
print("\n执行实际更新...")
for excel_template_name, template_info in excel_data.items():
db_template = find_matching_template(excel_template_name, db_templates)
if db_template:
print(f"\n更新: {db_template['name']}")
update_template_config(conn, db_template['id'], template_info['template_code'],
template_info['input_fields'], dry_run=False)
update_template_field_relations(conn, db_template['id'],
template_info['input_fields'],
template_info['output_fields'],
db_fields, dry_run=False)
print("\n" + "="*80)
print("✓ 同步完成!")
print("="*80)
else:
print("\n已取消更新")
finally:
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
main()