""" 清理并重新同步模板数据到指定数据库 功能: 1. 清理指定tenant_id下的旧数据(包括MinIO路径的数据) 2. 清理相关的字段关联关系 3. 重新扫描template_finish/目录 4. 重新创建/更新模板数据 5. 重新建立字段关联关系 使用方法: python clean_and_resync_templates.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1 """ import os import sys import pymysql import argparse from pathlib import Path from typing import Dict, List, Set, Optional import re from docx import Document import getpass # 设置输出编码为UTF-8(Windows兼容) if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') # 项目根目录 PROJECT_ROOT = Path(__file__).parent TEMPLATES_DIR = PROJECT_ROOT / "template_finish" CREATED_BY = 655162080928945152 UPDATED_BY = 655162080928945152 def print_section(title): """打印章节标题""" print("\n" + "="*70) print(f" {title}") print("="*70) def print_result(success, message): """打印结果""" status = "[OK]" if success else "[FAIL]" print(f"{status} {message}") def generate_id(): """生成ID""" import time return int(time.time() * 1000000) def get_db_config_from_args() -> Optional[Dict]: """从命令行参数获取数据库配置""" parser = argparse.ArgumentParser( description='清理并重新同步模板数据到指定数据库', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python clean_and_resync_templates.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1 """ ) parser.add_argument('--host', type=str, required=True, help='MySQL服务器地址') parser.add_argument('--port', type=int, required=True, help='MySQL服务器端口') parser.add_argument('--user', type=str, required=True, help='MySQL用户名') parser.add_argument('--password', type=str, required=True, help='MySQL密码') parser.add_argument('--database', type=str, required=True, help='数据库名称') parser.add_argument('--tenant-id', type=int, required=True, help='租户ID') parser.add_argument('--dry-run', action='store_true', help='预览模式(不实际更新数据库)') parser.add_argument('--skip-clean', action='store_true', help='跳过清理步骤(只同步)') args = parser.parse_args() return { 'host': args.host, 'port': args.port, 'user': args.user, 'password': args.password, 'database': args.database, 'charset': 'utf8mb4', 'tenant_id': args.tenant_id, 'dry_run': args.dry_run, 'skip_clean': args.skip_clean } def test_db_connection(config: Dict) -> Optional[pymysql.Connection]: """测试数据库连接""" try: conn = pymysql.connect( host=config['host'], port=config['port'], user=config['user'], password=config['password'], database=config['database'], charset=config['charset'] ) return conn except Exception as e: print_result(False, f"数据库连接失败: {str(e)}") return None def scan_local_templates() -> Dict[str, Path]: """扫描本地template_finish目录,返回file_path -> Path的映射""" templates = {} if not TEMPLATES_DIR.exists(): return templates for item in TEMPLATES_DIR.rglob("*"): if item.is_file() and item.suffix.lower() in ['.docx', '.doc']: rel_path = item.relative_to(PROJECT_ROOT) rel_path_str = str(rel_path).replace('\\', '/') templates[rel_path_str] = item return templates def clean_old_data(conn, tenant_id: int, local_templates: Dict[str, Path], dry_run: bool = False): """清理旧数据""" print_section("清理旧数据") cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 1. 获取所有模板 cursor.execute(""" SELECT id, name, file_path FROM f_polic_file_config WHERE tenant_id = %s AND state = 1 """, (tenant_id,)) all_templates = cursor.fetchall() print(f" 数据库中的模板总数: {len(all_templates)}") # 2. 识别需要删除的模板 to_delete = [] minio_paths = [] invalid_paths = [] duplicate_paths = [] # 统计file_path path_count = {} for template in all_templates: file_path = template.get('file_path') if file_path: if file_path not in path_count: path_count[file_path] = [] path_count[file_path].append(template) for template in all_templates: file_path = template.get('file_path') template_id = template['id'] # 检查是否是MinIO路径 if file_path and ('minio' in file_path.lower() or file_path.startswith('http://') or file_path.startswith('https://')): minio_paths.append(template) to_delete.append(template_id) continue # 检查文件路径是否在本地存在 if file_path: if file_path not in local_templates: invalid_paths.append(template) to_delete.append(template_id) continue # 检查是否有重复路径 if len(path_count.get(file_path, [])) > 1: # 保留第一个,删除其他的 if template != path_count[file_path][0]: duplicate_paths.append(template) to_delete.append(template_id) continue # 3. 统计需要删除的数据 print(f"\n 需要删除的模板:") print(f" - MinIO路径的模板: {len(minio_paths)} 个") print(f" - 无效路径的模板: {len(invalid_paths)} 个") print(f" - 重复路径的模板: {len(duplicate_paths)} 个") print(f" - 总计: {len(to_delete)} 个") if to_delete and not dry_run: # 4. 删除字段关联关系 print("\n 删除字段关联关系...") if to_delete: placeholders = ','.join(['%s'] * len(to_delete)) delete_relations_sql = f""" DELETE FROM f_polic_file_field WHERE tenant_id = %s AND file_id IN ({placeholders}) """ cursor.execute(delete_relations_sql, [tenant_id] + to_delete) deleted_relations = cursor.rowcount print(f" 删除了 {deleted_relations} 条字段关联关系") # 5. 删除模板记录 print("\n 删除模板记录...") delete_templates_sql = f""" UPDATE f_polic_file_config SET state = 0, updated_time = NOW(), updated_by = %s WHERE tenant_id = %s AND id IN ({placeholders}) """ cursor.execute(delete_templates_sql, [UPDATED_BY, tenant_id] + to_delete) deleted_templates = cursor.rowcount print(f" 删除了 {deleted_templates} 个模板记录(标记为state=0)") conn.commit() print_result(True, f"清理完成:删除了 {deleted_templates} 个模板记录") elif to_delete: print("\n [预览模式] 将删除上述模板记录") else: print_result(True, "没有需要清理的数据") return { 'total': len(all_templates), 'deleted': len(to_delete), 'minio_paths': len(minio_paths), 'invalid_paths': len(invalid_paths), 'duplicate_paths': len(duplicate_paths) } finally: cursor.close() def scan_directory_structure(base_dir: Path) -> Dict: """扫描目录结构""" directories = [] files = [] def scan_recursive(current_path: Path, parent_path: Optional[str] = None): """递归扫描目录""" if not current_path.exists() or not current_path.is_dir(): return # 获取相对路径 rel_path = current_path.relative_to(base_dir) rel_path_str = str(rel_path).replace('\\', '/') # 添加目录节点 if rel_path_str != '.': directories.append({ 'name': current_path.name, 'path': rel_path_str, 'parent_path': parent_path }) # 扫描子项 for item in sorted(current_path.iterdir()): if item.is_dir(): scan_recursive(item, rel_path_str) elif item.is_file() and item.suffix.lower() in ['.docx', '.doc']: file_rel_path = item.relative_to(base_dir) file_rel_path_str = str(file_rel_path).replace('\\', '/') files.append({ 'name': item.name, 'path': file_rel_path_str, 'parent_path': rel_path_str if rel_path_str != '.' else None }) scan_recursive(base_dir) return { 'directories': directories, 'files': files } def get_existing_templates(conn, tenant_id: int) -> Dict: """获取现有模板(只获取state=1的)""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: cursor.execute(""" SELECT id, name, file_path, parent_id FROM f_polic_file_config WHERE tenant_id = %s AND state = 1 """, (tenant_id,)) templates = cursor.fetchall() result = { 'by_path': {}, 'by_name': {}, 'by_id': {} } for t in templates: result['by_id'][t['id']] = t if t['file_path']: result['by_path'][t['file_path']] = t else: name = t['name'] if name not in result['by_name']: result['by_name'][name] = [] result['by_name'][name].append(t) return result finally: cursor.close() def sync_template_hierarchy(conn, tenant_id: int, dry_run: bool = False): """同步模板层级结构""" print_section("同步模板层级结构") # 1. 扫描目录结构 print("1. 扫描目录结构...") structure = scan_directory_structure(TEMPLATES_DIR) print_result(True, f"找到 {len(structure['directories'])} 个目录,{len(structure['files'])} 个文件") if not structure['directories'] and not structure['files']: print_result(False, "未找到任何目录或文件") return None # 2. 获取现有模板 print("\n2. 获取现有模板...") existing_templates = get_existing_templates(conn, tenant_id) print_result(True, f"找到 {len(existing_templates['by_path'])} 个文件模板,{len(existing_templates['by_name'])} 个目录模板") # 3. 创建/更新目录节点 print("\n3. 创建/更新目录节点...") path_to_id = {} dir_created = 0 dir_updated = 0 for dir_info in structure['directories']: parent_id = None if dir_info['parent_path']: parent_id = path_to_id.get(dir_info['parent_path']) existing = None candidates = existing_templates['by_name'].get(dir_info['name'], []) for candidate in candidates: if candidate.get('parent_id') == parent_id and not candidate.get('file_path'): existing = candidate break if existing: dir_id = existing['id'] if existing.get('parent_id') != parent_id: dir_updated += 1 if not dry_run: cursor = conn.cursor() cursor.execute(""" UPDATE f_polic_file_config SET parent_id = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """, (parent_id, UPDATED_BY, dir_id, tenant_id)) conn.commit() cursor.close() else: dir_id = generate_id() dir_created += 1 if not dry_run: cursor = conn.cursor() cursor.execute(""" INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, NULL, NOW(), %s, NOW(), %s, 1) """, (dir_id, tenant_id, parent_id, dir_info['name'], CREATED_BY, UPDATED_BY)) conn.commit() cursor.close() path_to_id[dir_info['path']] = dir_id print_result(True, f"创建 {dir_created} 个目录,更新 {dir_updated} 个目录") # 4. 创建/更新文件节点 print("\n4. 创建/更新文件节点...") file_created = 0 file_updated = 0 for file_info in structure['files']: parent_id = None if file_info['parent_path']: parent_id = path_to_id.get(file_info['parent_path']) existing = existing_templates['by_path'].get(file_info['path']) if existing: file_id = existing['id'] if existing.get('parent_id') != parent_id or existing.get('name') != file_info['name']: file_updated += 1 if not dry_run: cursor = conn.cursor() cursor.execute(""" UPDATE f_polic_file_config SET parent_id = %s, name = %s, updated_time = NOW(), updated_by = %s WHERE id = %s AND tenant_id = %s """, (parent_id, file_info['name'], UPDATED_BY, file_id, tenant_id)) conn.commit() cursor.close() else: file_id = generate_id() file_created += 1 if not dry_run: cursor = conn.cursor() cursor.execute(""" INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """, (file_id, tenant_id, parent_id, file_info['name'], file_info['path'], CREATED_BY, UPDATED_BY)) conn.commit() cursor.close() print_result(True, f"创建 {file_created} 个文件,更新 {file_updated} 个文件") return { 'directories_created': dir_created, 'directories_updated': dir_updated, 'files_created': file_created, 'files_updated': file_updated } def get_input_fields(conn, tenant_id: int) -> Dict[str, int]: """获取输入字段""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, filed_code, name FROM f_polic_field WHERE tenant_id = %s AND field_type = 1 AND filed_code IN ('clue_info', 'target_basic_info_clue') AND state = 1 """ cursor.execute(sql, (tenant_id,)) fields = cursor.fetchall() result = {} for field in fields: result[field['filed_code']] = field['id'] return result finally: cursor.close() def get_output_fields(conn, tenant_id: int) -> Dict[str, int]: """获取所有输出字段""" cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, filed_code, name FROM f_polic_field WHERE tenant_id = %s AND field_type = 2 AND state = 1 """ cursor.execute(sql, (tenant_id,)) fields = cursor.fetchall() result = {} for field in fields: result[field['filed_code']] = field['id'] return result finally: cursor.close() def extract_placeholders_from_docx(file_path: Path) -> Set[str]: """从docx文件中提取所有占位符""" placeholders = set() placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}') try: doc = Document(file_path) # 从段落中提取 for paragraph in doc.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: placeholders.add(field_code) # 从表格中提取 for table in doc.tables: try: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: text = paragraph.text matches = placeholder_pattern.findall(text) for match in matches: field_code = match.strip() if field_code: placeholders.add(field_code) except: continue except Exception as e: pass return placeholders def create_missing_input_field(conn, tenant_id: int, field_code: str) -> Optional[int]: """创建缺失的输入字段""" cursor = conn.cursor() try: field_id = generate_id() field_name_map = { 'clue_info': '线索信息', 'target_basic_info_clue': '被核查人基本信息(线索)' } field_name = field_name_map.get(field_code, field_code.replace('_', ' ')) insert_sql = """ INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( field_id, tenant_id, field_name, field_code, 1, CREATED_BY, UPDATED_BY )) conn.commit() return field_id except Exception as e: conn.rollback() return None finally: cursor.close() def create_missing_output_field(conn, tenant_id: int, field_code: str) -> Optional[int]: """创建缺失的输出字段""" cursor = conn.cursor() try: # 先检查是否已存在 check_cursor = conn.cursor(pymysql.cursors.DictCursor) check_cursor.execute(""" SELECT id FROM f_polic_field WHERE tenant_id = %s AND filed_code = %s """, (tenant_id, field_code)) existing = check_cursor.fetchone() check_cursor.close() if existing: return existing['id'] # 创建新字段 field_id = generate_id() field_name = field_code.replace('_', ' ') insert_sql = """ INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( field_id, tenant_id, field_name, field_code, 2, CREATED_BY, UPDATED_BY )) conn.commit() return field_id except Exception as e: conn.rollback() return None finally: cursor.close() def get_existing_relations(conn, tenant_id: int, file_id: int) -> Set[int]: """获取模板的现有关联关系""" cursor = conn.cursor() try: sql = """ SELECT filed_id FROM f_polic_file_field WHERE tenant_id = %s AND file_id = %s AND state = 1 """ cursor.execute(sql, (tenant_id, file_id)) results = cursor.fetchall() return {row[0] for row in results} finally: cursor.close() def sync_field_relations(conn, tenant_id: int, dry_run: bool = False): """同步字段关联关系""" print_section("同步字段关联关系") # 1. 获取输入字段 print("1. 获取输入字段...") input_fields = get_input_fields(conn, tenant_id) if not input_fields: print(" 创建缺失的输入字段...") for field_code in ['clue_info', 'target_basic_info_clue']: field_id = create_missing_input_field(conn, tenant_id, field_code) if field_id: input_fields[field_code] = field_id if not input_fields: print_result(False, "无法获取或创建输入字段") return None input_field_ids = list(input_fields.values()) print_result(True, f"找到 {len(input_field_ids)} 个输入字段") # 2. 获取输出字段 print("\n2. 获取输出字段...") output_fields = get_output_fields(conn, tenant_id) print_result(True, f"找到 {len(output_fields)} 个输出字段") # 3. 获取所有模板 print("\n3. 获取所有模板...") cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, name, file_path FROM f_polic_file_config WHERE tenant_id = %s AND file_path IS NOT NULL AND file_path != '' AND state = 1 """ cursor.execute(sql, (tenant_id,)) templates = cursor.fetchall() finally: cursor.close() print_result(True, f"找到 {len(templates)} 个模板") if not templates: print_result(False, "未找到模板") return None # 4. 先清理所有现有关联关系 print("\n4. 清理现有关联关系...") if not dry_run: cursor = conn.cursor() try: cursor.execute(""" DELETE FROM f_polic_file_field WHERE tenant_id = %s """, (tenant_id,)) deleted_count = cursor.rowcount conn.commit() print_result(True, f"删除了 {deleted_count} 条旧关联关系") finally: cursor.close() else: print(" [预览模式] 将清理所有现有关联关系") # 5. 扫描模板占位符并创建关联关系 print("\n5. 扫描模板占位符并创建关联关系...") total_updated = 0 total_errors = 0 all_placeholders_found = set() missing_fields = set() for i, template in enumerate(templates, 1): template_id = template['id'] template_name = template['name'] file_path = template['file_path'] if i % 20 == 0: print(f" 处理进度: {i}/{len(templates)}") # 检查本地文件是否存在 local_file = PROJECT_ROOT / file_path if not local_file.exists(): total_errors += 1 continue # 提取占位符 placeholders = extract_placeholders_from_docx(local_file) all_placeholders_found.update(placeholders) # 根据占位符找到对应的输出字段ID output_field_ids = [] for placeholder in placeholders: if placeholder in output_fields: output_field_ids.append(output_fields[placeholder]) else: # 字段不存在,尝试创建 missing_fields.add(placeholder) field_id = create_missing_output_field(conn, tenant_id, placeholder) if field_id: output_fields[placeholder] = field_id output_field_ids.append(field_id) # 创建关联关系 all_field_ids = input_field_ids + output_field_ids if not dry_run and all_field_ids: cursor = conn.cursor() try: for field_id in all_field_ids: relation_id = generate_id() insert_sql = """ INSERT INTO f_polic_file_field (id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """ cursor.execute(insert_sql, ( relation_id, tenant_id, template_id, field_id, CREATED_BY, UPDATED_BY )) conn.commit() total_updated += 1 except Exception as e: conn.rollback() total_errors += 1 finally: cursor.close() else: total_updated += 1 # 6. 统计结果 print_section("字段关联同步结果") print(f" 总模板数: {len(templates)}") print(f" 已处理: {total_updated} 个") print(f" 错误: {total_errors} 个") print(f" 发现的占位符总数: {len(all_placeholders_found)} 个") print(f" 创建的字段数: {len(missing_fields)} 个") return { 'total_templates': len(templates), 'updated': total_updated, 'errors': total_errors, 'placeholders_found': len(all_placeholders_found), 'fields_created': len(missing_fields) } def main(): """主函数""" print_section("清理并重新同步模板数据") # 获取配置 config = get_db_config_from_args() # 显示配置信息 print_section("配置信息") print(f" 数据库服务器: {config['host']}:{config['port']}") print(f" 数据库名称: {config['database']}") print(f" 用户名: {config['user']}") print(f" 租户ID: {config['tenant_id']}") print(f" 预览模式: {'是' if config['dry_run'] else '否'}") print(f" 跳过清理: {'是' if config['skip_clean'] else '否'}") if config['dry_run']: print("\n[注意] 当前为预览模式,不会实际更新数据库") # 确认 if not config.get('dry_run'): print("\n[警告] 此操作将清理指定租户下的旧数据并重新同步") confirm = input("确认执行?[yes/N]: ").strip().lower() if confirm != 'yes': print("已取消") return # 连接数据库 print_section("连接数据库") conn = test_db_connection(config) if not conn: return print_result(True, "数据库连接成功") try: tenant_id = config['tenant_id'] dry_run = config['dry_run'] skip_clean = config['skip_clean'] results = {} # 1. 扫描本地模板 print_section("扫描本地模板") local_templates = scan_local_templates() print_result(True, f"找到 {len(local_templates)} 个本地模板文件") # 2. 清理旧数据 if not skip_clean: clean_result = clean_old_data(conn, tenant_id, local_templates, dry_run) results['clean'] = clean_result else: print_section("跳过清理步骤") print(" 已跳过清理步骤") # 3. 同步模板层级结构 hierarchy_result = sync_template_hierarchy(conn, tenant_id, dry_run) results['hierarchy'] = hierarchy_result # 4. 同步字段关联关系 fields_result = sync_field_relations(conn, tenant_id, dry_run) results['fields'] = fields_result # 5. 总结 print_section("同步完成") if config['dry_run']: print(" 本次为预览模式,未实际更新数据库") else: print(" 数据库已更新") if 'clean' in results: c = results['clean'] print(f"\n 清理结果:") print(f" - 总模板数: {c['total']} 个") print(f" - 删除模板: {c['deleted']} 个") print(f" * MinIO路径: {c['minio_paths']} 个") print(f" * 无效路径: {c['invalid_paths']} 个") print(f" * 重复路径: {c['duplicate_paths']} 个") if 'hierarchy' in results and results['hierarchy']: h = results['hierarchy'] print(f"\n 层级结构:") print(f" - 创建目录: {h['directories_created']} 个") print(f" - 更新目录: {h['directories_updated']} 个") print(f" - 创建文件: {h['files_created']} 个") print(f" - 更新文件: {h['files_updated']} 个") if 'fields' in results and results['fields']: f = results['fields'] print(f"\n 字段关联:") print(f" - 总模板数: {f['total_templates']} 个") print(f" - 已处理: {f['updated']} 个") print(f" - 发现的占位符: {f['placeholders_found']} 个") print(f" - 创建的字段: {f['fields_created']} 个") finally: conn.close() print_result(True, "数据库连接已关闭") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n[中断] 用户取消操作") sys.exit(0) except Exception as e: print(f"\n[错误] 发生异常: {str(e)}") import traceback traceback.print_exc() sys.exit(1)