""" 更新所有模板的字段关联关系 1. 输入字段:所有模板都关联 clue_info 和 target_basic_info_clue 2. 输出字段:根据模板中的占位符自动关联对应的输出字段 """ import os import pymysql from pathlib import Path from typing import Dict, List, Set, Optional from dotenv import load_dotenv import re from docx import Document # 加载环境变量 load_dotenv() # 数据库配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', '152.136.177.240'), 'port': int(os.getenv('DB_PORT', 5012)), 'user': os.getenv('DB_USER', 'finyx'), 'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'), 'database': os.getenv('DB_NAME', 'finyx'), 'charset': 'utf8mb4' } CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" def print_section(title): """打印章节标题""" print("\n" + "="*70) print(f" {title}") print("="*70) def print_result(success, message): """打印结果""" status = "[OK]" if success else "[FAIL]" print(f"{status} {message}") def generate_id(): """生成ID""" import time return int(time.time() * 1000000) def get_actual_tenant_id(conn) -> int: """获取数据库中的实际tenant_id""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: cursor.execute("SELECT DISTINCT tenant_id FROM f_polic_file_config LIMIT 1") result = cursor.fetchone() if result: return result['tenant_id'] return 1 finally: cursor.close() def get_input_fields(conn, tenant_id: int) -> Dict[str, int]: """ 获取输入字段(clue_info 和 target_basic_info_clue) Returns: 字典,key为field_code,value为field_id """ cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, filed_code, name FROM f_polic_field WHERE tenant_id = %s AND field_type = 1 AND filed_code IN ('clue_info', 'target_basic_info_clue') AND state = 1 """ cursor.execute(sql, (tenant_id,)) fields = cursor.fetchall() result = {} for field in fields: result[field['filed_code']] = field['id'] print(f" 输入字段: {field['name']} ({field['filed_code']}) - ID: {field['id']}") return result finally: cursor.close() def get_output_fields(conn, tenant_id: int) -> Dict[str, int]: """ 获取所有输出字段 Returns: 字典,key为filed_code,value为field_id """ cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, filed_code, name FROM f_polic_field WHERE tenant_id = %s AND field_type = 2 AND state = 1 """ cursor.execute(sql, (tenant_id,)) fields = cursor.fetchall() result = {} for field in fields: result[field['filed_code']] = field['id'] return result finally: cursor.close() def extract_placeholders_from_docx(file_path: Path) -> Set[str]: """从docx文件中提取所有占位符""" placeholders = set() placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}') try: doc = Document(file_path) # 从段落中提取 for paragraph in doc.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: placeholders.add(field_code) # 从表格中提取 for table in doc.tables: try: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: placeholders.add(field_code) except: continue except Exception as e: print(f" [错误] 读取文件失败: {str(e)}") return placeholders def get_all_templates(conn, tenant_id: int) -> List[Dict]: """获取所有模板(只获取文件节点,不包括目录节点)""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, name, file_path FROM f_polic_file_config WHERE tenant_id = %s AND file_path IS NOT NULL AND file_path != '' AND state = 1 """ cursor.execute(sql, (tenant_id,)) templates = cursor.fetchall() return templates finally: cursor.close() def get_existing_relations(conn, tenant_id: int, file_id: int) -> Set[int]: """获取模板的现有关联关系""" cursor = conn.cursor() try: sql = """ SELECT filed_id FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s AND state = 1 """ cursor.execute(sql, (tenant_id, file_id)) results = cursor.fetchall() return {row[0] for row in results} finally: cursor.close() def update_template_field_relations(conn, tenant_id: int, file_id: int, file_name: str, input_field_ids: List[int], output_field_ids: List[int], dry_run: bool = False): """ 更新模板的字段关联关系 Args: conn: 数据库连接 tenant_id: 租户ID file_id: 模板ID file_name: 模板名称 input_field_ids: 输入字段ID列表 output_field_ids: 输出字段ID列表 dry_run: 是否只是预览(不实际更新) """ cursor = conn.cursor() try: all_field_ids = set(input_field_ids + output_field_ids) if not all_field_ids: print(f" [跳过] {file_name}: 没有字段需要关联") return # 获取现有关联 existing_field_ids = get_existing_relations(conn, tenant_id, file_id) # 需要添加的字段 to_add = all_field_ids - existing_field_ids # 需要删除的字段(如果某个字段不在新列表中,但存在于现有关联中,且不是必需的输入字段,则删除) # 注意:我们保留所有现有关联,只添加新的 to_remove = existing_field_ids - all_field_ids if not to_add and not to_remove: print(f" [保持] {file_name}: 关联关系已是最新") return if dry_run: print(f" [预览] {file_name}:") if to_add: print(f" 将添加: {len(to_add)} 个字段") if to_remove: print(f" 将删除: {len(to_remove)} 个字段") return # 删除需要移除的关联 if to_remove: placeholders = ','.join(['%s'] * len(to_remove)) delete_sql = f""" DELETE FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s AND filed_id IN ({placeholders}) """ cursor.execute(delete_sql, [tenant_id, file_id] + list(to_remove)) deleted_count = cursor.rowcount # 添加新的关联 added_count = 0 for field_id in to_add: # 检查是否已存在(防止重复) check_sql = """ SELECT id FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s AND filed_id = %s """ cursor.execute(check_sql, (tenant_id, file_id, field_id)) if cursor.fetchone(): continue relation_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_field (id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( relation_id, tenant_id, file_id, field_id, CREATED_BY, UPDATED_BY )) added_count += 1 conn.commit() action_parts = [] if added_count > 0: action_parts.append(f"添加 {added_count} 个") if to_remove and deleted_count > 0: action_parts.append(f"删除 {deleted_count} 个") if action_parts: print(f" [更新] {file_name}: {', '.join(action_parts)}") except Exception as e: conn.rollback() print(f" [错误] {file_name}: 更新失败 - {str(e)}") finally: cursor.close() def create_missing_output_field(conn, tenant_id: int, field_code: str) -> Optional[int]: """ 如果输出字段不存在,创建它 Returns: 字段ID,如果创建失败则返回None """ cursor = conn.cursor() try: # 先检查是否已存在 check_cursor = conn.cursor(pymysql.cursors.DictCursor) check_cursor.execute(""" SELECT id FROM f_polic_field WHERE tenant_id = %s AND filed_code = %s """, (tenant_id, field_code)) existing = check_cursor.fetchone() check_cursor.close() if existing: return existing['id'] # 创建新字段 field_id = generate_id() field_name = field_code.replace('_', ' ') # 简单的名称生成 insert_sql = """ INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( field_id, tenant_id, field_name, field_code, 2, # field_type=2 表示输出字段 CREATED_BY, UPDATED_BY )) conn.commit() print(f" [创建字段] {field_code} (ID: {field_id})") return field_id except Exception as e: conn.rollback() print(f" [错误] 创建字段失败 {field_code}: {str(e)}") return None finally: cursor.close() def main(): """主函数""" print_section("更新所有模板的字段关联关系") # 1. 连接数据库 print_section("1. 连接数据库") try: conn = pymysql.connect(**DB_CONFIG) print_result(True, "数据库连接成功") except Exception as e: print_result(False, f"数据库连接失败: {str(e)}") return try: # 2. 获取实际的tenant_id print_section("2. 获取实际的tenant_id") tenant_id = get_actual_tenant_id(conn) print_result(True, f"实际tenant_id: {tenant_id}") # 3. 获取输入字段 print_section("3. 获取输入字段") input_fields = get_input_fields(conn, tenant_id) if not input_fields: print_result(False, "未找到输入字段 clue_info 和 target_basic_info_clue") print(" 将尝试创建这些字段...") # 创建缺失的输入字段 for field_code in ['clue_info', 'target_basic_info_clue']: field_id = create_missing_input_field(conn, tenant_id, field_code) if field_id: input_fields[field_code] = field_id if not input_fields: print_result(False, "无法获取或创建输入字段,终止操作") return input_field_ids = list(input_fields.values()) print_result(True, f"找到 {len(input_field_ids)} 个输入字段") # 4. 获取输出字段 print_section("4. 获取输出字段") output_fields = get_output_fields(conn, tenant_id) print_result(True, f"找到 {len(output_fields)} 个输出字段") # 5. 获取所有模板 print_section("5. 获取所有模板") templates = get_all_templates(conn, tenant_id) print_result(True, f"找到 {len(templates)} 个模板") if not templates: print_result(False, "未找到模板") return # 6. 扫描模板占位符并更新关联关系 print_section("6. 扫描模板占位符并更新关联关系") total_updated = 0 total_kept = 0 total_errors = 0 all_placeholders_found = set() missing_fields = set() for i, template in enumerate(templates, 1): template_id = template['id'] template_name = template['name'] file_path = template['file_path'] if i % 10 == 0: print(f" 处理进度: {i}/{len(templates)}") # 检查本地文件是否存在 local_file = PROJECT_ROOT / file_path if not local_file.exists(): print(f" [跳过] {template_name}: 文件不存在 - {file_path}") total_errors += 1 continue # 提取占位符 placeholders = extract_placeholders_from_docx(local_file) all_placeholders_found.update(placeholders) # 根据占位符找到对应的输出字段ID output_field_ids = [] for placeholder in placeholders: if placeholder in output_fields: output_field_ids.append(output_fields[placeholder]) else: # 字段不存在,尝试创建 missing_fields.add(placeholder) field_id = create_missing_output_field(conn, tenant_id, placeholder) if field_id: output_fields[placeholder] = field_id output_field_ids.append(field_id) # 更新关联关系 try: update_template_field_relations( conn, tenant_id, template_id, template_name, input_field_ids, output_field_ids, dry_run=False ) total_updated += 1 except Exception as e: print(f" [错误] {template_name}: {str(e)}") total_errors += 1 # 7. 统计结果 print_section("7. 更新结果统计") print(f" 总模板数: {len(templates)}") print(f" 已更新: {total_updated} 个") print(f" 错误: {total_errors} 个") print(f" 发现的占位符总数: {len(all_placeholders_found)} 个") print(f" 缺失的字段(已创建): {len(missing_fields)} 个") if missing_fields: print(f"\n 创建的字段列表:") for field_code in sorted(missing_fields): print(f" - {field_code}") # 8. 验证关联关系 print_section("8. 验证关联关系") cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 统计有输入字段关联的模板 cursor.execute(""" SELECT COUNT(DISTINCT fff.file_id) as count FROM f_polic_file_field fff INNER JOIN f_polic_field f ON fff.filed_id = f.id WHERE fff.tenant_id = %s AND f.field_type = 1 AND fff.state = 1 """, (tenant_id,)) templates_with_input = cursor.fetchone()['count'] print(f" 有输入字段关联的模板: {templates_with_input} 个") # 统计有输出字段关联的模板 cursor.execute(""" SELECT COUNT(DISTINCT fff.file_id) as count FROM f_polic_file_field fff INNER JOIN f_polic_field f ON fff.filed_id = f.id WHERE fff.tenant_id = %s AND f.field_type = 2 AND fff.state = 1 """, (tenant_id,)) templates_with_output = cursor.fetchone()['count'] print(f" 有输出字段关联的模板: {templates_with_output} 个") # 统计总关联数 cursor.execute(""" SELECT COUNT(*) as count FROM f_polic_file_field WHERE tenant_id = %s AND state = 1 """, (tenant_id,)) total_relations = cursor.fetchone()['count'] print(f" 总关联关系数: {total_relations} 条") finally: cursor.close() finally: conn.close() print_result(True, "数据库连接已关闭") print_section("完成") def create_missing_input_field(conn, tenant_id: int, field_code: str) -> Optional[int]: """创建缺失的输入字段""" cursor = conn.cursor() try: field_id = generate_id() field_name_map = { 'clue_info': '线索信息', 'target_basic_info_clue': '被核查人基本信息(线索)' } field_name = field_name_map.get(field_code, field_code.replace('_', ' ')) insert_sql = """ INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( field_id, tenant_id, field_name, field_code, 1, # field_type=1 表示输入字段 CREATED_BY, UPDATED_BY )) conn.commit() print(f" [创建输入字段] {field_code} ({field_name}) - ID: {field_id}") return field_id except Exception as e: conn.rollback() print(f" [错误] 创建输入字段失败 {field_code}: {str(e)}") return None finally: cursor.close() if __name__ == "__main__": main()