""" 根据Excel数据设计文档同步更新模板的input_data、template_code和字段关联关系 """ import os import json import pymysql import pandas as pd from pathlib import Path from typing import Dict, List, Optional, Set from datetime import datetime from collections import defaultdict # 数据库连接配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } TENANT_ID = 615873064429507639 CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 # Excel文件路径 EXCEL_FILE = '技术文档/智慧监督项目模板数据结构设计表-20251125-一凡标注.xlsx' # 模板名称映射(Excel中的名称 -> 数据库中的名称) TEMPLATE_NAME_MAPPING = { '请示报告卡': '1.请示报告卡(XXX)', '初步核实审批表': '2.初步核实审批表(XXX)', '初核方案': '3.附件初核方案(XXX)', '谈话通知书': '谈话通知书', '谈话通知书第一联': '谈话通知书第一联', '谈话通知书第二联': '谈话通知书第二联', '谈话通知书第三联': '谈话通知书第三联', '走读式谈话审批': '走读式谈话审批', '走读式谈话流程': '走读式谈话流程', '请示报告卡(初核报告结论)': '8-1请示报告卡(初核报告结论) ', 'XXX初核情况报告': '8.XXX初核情况报告', } # 模板编码映射(Excel中的名称 -> template_code) TEMPLATE_CODE_MAPPING = { '请示报告卡': 'REPORT_CARD', '初步核实审批表': 'PRELIMINARY_VERIFICATION_APPROVAL', '初核方案': 'INVESTIGATION_PLAN', '谈话通知书第一联': 'NOTIFICATION_LETTER_1', '谈话通知书第二联': 'NOTIFICATION_LETTER_2', '谈话通知书第三联': 'NOTIFICATION_LETTER_3', '请示报告卡(初核报告结论)': 'REPORT_CARD_CONCLUSION', 'XXX初核情况报告': 'INVESTIGATION_REPORT', } # 字段名称到字段编码的映射 FIELD_NAME_TO_CODE_MAP = { # 输入字段 '线索信息': 'clue_info', '被核查人员工作基本情况线索': 'target_basic_info_clue', # 输出字段 - 基本信息 '被核查人姓名': 'target_name', '被核查人员单位及职务': 'target_organization_and_position', '被核查人员性别': 'target_gender', '被核查人员出生年月': 'target_date_of_birth', '被核查人员出生年月日': 'target_date_of_birth_full', '被核查人员政治面貌': 'target_political_status', '被核查人员职级': 'target_professional_rank', '被核查人员单位': 'target_organization', '被核查人员职务': 'target_position', # 输出字段 - 其他信息 '线索来源': 'clue_source', '主要问题线索': 'target_issue_description', '初步核实审批表承办部门意见': 'department_opinion', '初步核实审批表填表人': 'filler_name', '请示报告卡请示时间': 'report_card_request_time', '被核查人员身份证件及号码': 'target_id_number', '被核查人员身份证号': 'target_id_number', '应到时间': 'appointment_time', '应到地点': 'appointment_location', '批准时间': 'approval_time', '承办部门': 'handling_department', '承办人': 'handler_name', '谈话通知时间': 'notification_time', '谈话通知地点': 'notification_location', '被核查人员住址': 'target_address', '被核查人员户籍住址': 'target_registered_address', '被核查人员联系方式': 'target_contact', '被核查人员籍贯': 'target_place_of_origin', '被核查人员民族': 'target_ethnicity', '被核查人员工作基本情况': 'target_work_basic_info', '核查单位名称': 'investigation_unit_name', '核查组组长姓名': 'investigation_team_leader_name', '核查组成员姓名': 'investigation_team_member_names', '核查地点': 'investigation_location', } def generate_id(): """生成ID""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part def normalize_template_name(name: str) -> str: """标准化模板名称,用于匹配""" import re # 去掉开头的编号和括号内容 name = re.sub(r'^\d+[\.\-]\s*', '', name) name = re.sub(r'[((].*?[))]', '', name) name = name.strip() return name def parse_excel_data() -> Dict: """解析Excel文件,提取模板和字段的关联关系""" print("="*80) print("解析Excel数据设计文档") print("="*80) if not Path(EXCEL_FILE).exists(): print(f"✗ Excel文件不存在: {EXCEL_FILE}") return None try: df = pd.read_excel(EXCEL_FILE) print(f"✓ 成功读取Excel文件,共 {len(df)} 行数据\n") templates = defaultdict(lambda: { 'template_name': '', 'template_code': '', 'input_fields': [], 'output_fields': [] }) current_template = None current_input_field = None for idx, row in df.iterrows(): level1 = row.get('一级分类') level2 = row.get('二级分类') level3 = row.get('三级分类') input_field = row.get('输入数据字段') output_field = row.get('输出数据字段') # 处理二级分类(模板名称) if pd.notna(level2) and level2: current_template = str(level2).strip() # 获取模板编码 template_code = TEMPLATE_CODE_MAPPING.get(current_template, '') if not template_code: # 如果没有映射,尝试生成 template_code = current_template.upper().replace(' ', '_') templates[current_template]['template_name'] = current_template templates[current_template]['template_code'] = template_code current_input_field = None # 重置输入字段 print(f" 模板: {current_template} (code: {template_code})") # 处理三级分类(子模板,如谈话通知书第一联) if pd.notna(level3) and level3: current_template = str(level3).strip() template_code = TEMPLATE_CODE_MAPPING.get(current_template, '') if not template_code: template_code = current_template.upper().replace(' ', '_') templates[current_template]['template_name'] = current_template templates[current_template]['template_code'] = template_code current_input_field = None print(f" 子模板: {current_template} (code: {template_code})") # 处理输入字段 if pd.notna(input_field) and input_field: input_field_name = str(input_field).strip() if input_field_name != current_input_field: current_input_field = input_field_name field_code = FIELD_NAME_TO_CODE_MAP.get(input_field_name, input_field_name.lower().replace(' ', '_')) if current_template: templates[current_template]['input_fields'].append({ 'name': input_field_name, 'field_code': field_code }) # 处理输出字段 if pd.notna(output_field) and output_field: output_field_name = str(output_field).strip() field_code = FIELD_NAME_TO_CODE_MAP.get(output_field_name, output_field_name.lower().replace(' ', '_')) if current_template: templates[current_template]['output_fields'].append({ 'name': output_field_name, 'field_code': field_code }) # 去重 for template_name, template_info in templates.items(): # 输入字段去重 seen_input = set() unique_input = [] for field in template_info['input_fields']: key = field['field_code'] if key not in seen_input: seen_input.add(key) unique_input.append(field) template_info['input_fields'] = unique_input # 输出字段去重 seen_output = set() unique_output = [] for field in template_info['output_fields']: key = field['field_code'] if key not in seen_output: seen_output.add(key) unique_output.append(field) template_info['output_fields'] = unique_output print(f"\n✓ 解析完成,共 {len(templates)} 个模板") for template_name, template_info in templates.items(): print(f" - {template_name}: {len(template_info['input_fields'])} 个输入字段, {len(template_info['output_fields'])} 个输出字段") return dict(templates) except Exception as e: print(f"✗ 解析Excel文件失败: {e}") import traceback traceback.print_exc() return None def get_database_templates(conn) -> Dict: """获取数据库中的模板配置""" cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, template_code, input_data, parent_id FROM f_polic_file_config WHERE tenant_id = %s """ cursor.execute(sql, (TENANT_ID,)) configs = cursor.fetchall() result = {} for config in configs: name = config['name'] result[name] = config # 也添加标准化名称的映射 normalized = normalize_template_name(name) if normalized not in result: result[normalized] = config cursor.close() return result def get_database_fields(conn) -> Dict: """获取数据库中的字段定义""" cursor = conn.cursor(pymysql.cursors.DictCursor) sql = """ SELECT id, name, filed_code, field_type FROM f_polic_field WHERE tenant_id = %s """ cursor.execute(sql, (TENANT_ID,)) fields = cursor.fetchall() result = { 'by_code': {}, 'by_name': {} } for field in fields: field_code = field['filed_code'] field_name = field['name'] result['by_code'][field_code] = field result['by_name'][field_name] = field cursor.close() return result def find_matching_template(excel_template_name: str, db_templates: Dict) -> Optional[Dict]: """查找匹配的数据库模板""" # 1. 精确匹配 if excel_template_name in db_templates: return db_templates[excel_template_name] # 2. 通过映射表匹配 mapped_name = TEMPLATE_NAME_MAPPING.get(excel_template_name) if mapped_name and mapped_name in db_templates: return db_templates[mapped_name] # 3. 标准化名称匹配 normalized = normalize_template_name(excel_template_name) if normalized in db_templates: return db_templates[normalized] # 4. 模糊匹配 for db_name, db_config in db_templates.items(): if normalized in normalize_template_name(db_name) or normalize_template_name(db_name) in normalized: return db_config return None def update_template_config(conn, template_id: int, template_code: str, input_fields: List[Dict], dry_run: bool = True): """更新模板配置的input_data和template_code""" cursor = conn.cursor() try: # 构建input_data input_data = { 'template_code': template_code, 'business_type': 'INVESTIGATION', 'input_fields': [f['field_code'] for f in input_fields] } input_data_json = json.dumps(input_data, ensure_ascii=False) if not dry_run: update_sql = """ UPDATE f_polic_file_config SET template_code = %s, input_data = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, (template_code, input_data_json, UPDATED_BY, template_id, TENANT_ID)) conn.commit() print(f" ✓ 更新模板配置") else: print(f" [模拟] 将更新模板配置: template_code={template_code}") finally: cursor.close() def update_template_field_relations(conn, template_id: int, input_fields: List[Dict], output_fields: List[Dict], db_fields: Dict, dry_run: bool = True): """更新模板和字段的关联关系""" cursor = conn.cursor() try: # 先删除旧的关联关系 if not dry_run: delete_sql = """ DELETE FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s """ cursor.execute(delete_sql, (TENANT_ID, template_id)) # 创建新的关联关系 relations_created = 0 # 关联输入字段(field_type=1) for field_info in input_fields: field_code = field_info['field_code'] field = db_fields['by_code'].get(field_code) if not field: print(f" ⚠ 输入字段不存在: {field_code}") continue if field['field_type'] != 1: print(f" ⚠ 字段类型不匹配: {field_code} (期望输入字段,实际为输出字段)") continue if not dry_run: # 检查是否已存在 check_sql = """ SELECT id FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s AND filed_id = %s """ cursor.execute(check_sql, (TENANT_ID, template_id, field['id'])) existing = cursor.fetchone() if not existing: relation_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_field (id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s) """ cursor.execute(insert_sql, ( relation_id, TENANT_ID, template_id, field['id'], CREATED_BY, UPDATED_BY, 1 )) relations_created += 1 else: relations_created += 1 # 关联输出字段(field_type=2) for field_info in output_fields: field_code = field_info['field_code'] field = db_fields['by_code'].get(field_code) if not field: # 尝试通过名称匹配 field_name = field_info['name'] field = db_fields['by_name'].get(field_name) if not field: print(f" ⚠ 输出字段不存在: {field_code} ({field_info['name']})") continue if field['field_type'] != 2: print(f" ⚠ 字段类型不匹配: {field_code} (期望输出字段,实际为输入字段)") continue if not dry_run: # 检查是否已存在 check_sql = """ SELECT id FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s AND filed_id = %s """ cursor.execute(check_sql, (TENANT_ID, template_id, field['id'])) existing = cursor.fetchone() if not existing: relation_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_field (id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s) """ cursor.execute(insert_sql, ( relation_id, TENANT_ID, template_id, field['id'], CREATED_BY, UPDATED_BY, 1 )) relations_created += 1 else: relations_created += 1 if not dry_run: conn.commit() print(f" ✓ 创建 {relations_created} 个字段关联关系") else: print(f" [模拟] 将创建 {relations_created} 个字段关联关系") finally: cursor.close() def main(): """主函数""" print("="*80) print("同步模板字段信息(根据Excel数据设计文档)") print("="*80) # 解析Excel excel_data = parse_excel_data() if not excel_data: return # 连接数据库 try: conn = pymysql.connect(**DB_CONFIG) print("\n✓ 数据库连接成功") except Exception as e: print(f"\n✗ 数据库连接失败: {e}") return try: # 获取数据库中的模板和字段 print("\n获取数据库中的模板和字段...") db_templates = get_database_templates(conn) db_fields = get_database_fields(conn) print(f" 数据库中有 {len(db_templates)} 个模板") print(f" 数据库中有 {len(db_fields['by_code'])} 个字段") # 匹配和更新 print("\n" + "="*80) print("匹配模板并更新配置") print("="*80) matched_count = 0 unmatched_templates = [] for excel_template_name, template_info in excel_data.items(): print(f"\n处理模板: {excel_template_name}") # 查找匹配的数据库模板 db_template = find_matching_template(excel_template_name, db_templates) if not db_template: print(f" ✗ 未找到匹配的数据库模板") unmatched_templates.append(excel_template_name) continue print(f" ✓ 匹配到数据库模板: {db_template['name']} (ID: {db_template['id']})") matched_count += 1 # 更新模板配置 template_code = template_info['template_code'] input_fields = template_info['input_fields'] output_fields = template_info['output_fields'] print(f" 模板编码: {template_code}") print(f" 输入字段: {len(input_fields)} 个") print(f" 输出字段: {len(output_fields)} 个") # 先执行模拟更新 print(" [模拟模式]") update_template_config(conn, db_template['id'], template_code, input_fields, dry_run=True) update_template_field_relations(conn, db_template['id'], input_fields, output_fields, db_fields, dry_run=True) # 显示统计 print("\n" + "="*80) print("统计信息") print("="*80) print(f"Excel中的模板数: {len(excel_data)}") print(f"成功匹配: {matched_count} 个") print(f"未匹配: {len(unmatched_templates)} 个") if unmatched_templates: print("\n未匹配的模板:") for template in unmatched_templates: print(f" - {template}") # 询问是否执行实际更新 print("\n" + "="*80) response = input("\n是否执行实际更新?(yes/no,默认no): ").strip().lower() if response == 'yes': print("\n执行实际更新...") for excel_template_name, template_info in excel_data.items(): db_template = find_matching_template(excel_template_name, db_templates) if db_template: print(f"\n更新: {db_template['name']}") update_template_config(conn, db_template['id'], template_info['template_code'], template_info['input_fields'], dry_run=False) update_template_field_relations(conn, db_template['id'], template_info['input_fields'], template_info['output_fields'], db_fields, dry_run=False) print("\n" + "="*80) print("✓ 同步完成!") print("="*80) else: print("\n已取消更新") finally: conn.close() print("\n数据库连接已关闭") if __name__ == '__main__': main()