ai-business-write/clean_and_resync_templates.py
2025-12-30 10:41:35 +08:00

875 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
清理并重新同步模板数据到指定数据库
功能:
1. 清理指定tenant_id下的旧数据包括MinIO路径的数据
2. 清理相关的字段关联关系
3. 重新扫描template_finish/目录
4. 重新创建/更新模板数据
5. 重新建立字段关联关系
使用方法:
python clean_and_resync_templates.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1
"""
import os
import sys
import pymysql
import argparse
from pathlib import Path
from typing import Dict, List, Set, Optional
import re
from docx import Document
import getpass
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# 项目根目录
PROJECT_ROOT = Path(__file__).parent
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
def print_section(title):
"""打印章节标题"""
print("\n" + "="*70)
print(f" {title}")
print("="*70)
def print_result(success, message):
"""打印结果"""
status = "[OK]" if success else "[FAIL]"
print(f"{status} {message}")
def generate_id():
"""生成ID"""
import time
return int(time.time() * 1000000)
def get_db_config_from_args() -> Optional[Dict]:
"""从命令行参数获取数据库配置"""
parser = argparse.ArgumentParser(
description='清理并重新同步模板数据到指定数据库',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python clean_and_resync_templates.py --host 10.100.31.21 --port 3306 --user finyx --password FknJYz3FA5WDYtsd --database finyx --tenant-id 1
"""
)
parser.add_argument('--host', type=str, required=True, help='MySQL服务器地址')
parser.add_argument('--port', type=int, required=True, help='MySQL服务器端口')
parser.add_argument('--user', type=str, required=True, help='MySQL用户名')
parser.add_argument('--password', type=str, required=True, help='MySQL密码')
parser.add_argument('--database', type=str, required=True, help='数据库名称')
parser.add_argument('--tenant-id', type=int, required=True, help='租户ID')
parser.add_argument('--dry-run', action='store_true', help='预览模式(不实际更新数据库)')
parser.add_argument('--skip-clean', action='store_true', help='跳过清理步骤(只同步)')
args = parser.parse_args()
return {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database,
'charset': 'utf8mb4',
'tenant_id': args.tenant_id,
'dry_run': args.dry_run,
'skip_clean': args.skip_clean
}
def test_db_connection(config: Dict) -> Optional[pymysql.Connection]:
"""测试数据库连接"""
try:
conn = pymysql.connect(
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
charset=config['charset']
)
return conn
except Exception as e:
print_result(False, f"数据库连接失败: {str(e)}")
return None
def scan_local_templates() -> Dict[str, Path]:
"""扫描本地template_finish目录返回file_path -> Path的映射"""
templates = {}
if not TEMPLATES_DIR.exists():
return templates
for item in TEMPLATES_DIR.rglob("*"):
if item.is_file() and item.suffix.lower() in ['.docx', '.doc']:
rel_path = item.relative_to(PROJECT_ROOT)
rel_path_str = str(rel_path).replace('\\', '/')
templates[rel_path_str] = item
return templates
def clean_old_data(conn, tenant_id: int, local_templates: Dict[str, Path], dry_run: bool = False):
"""清理旧数据"""
print_section("清理旧数据")
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 1. 获取所有模板
cursor.execute("""
SELECT id, name, file_path
FROM f_polic_file_config
WHERE tenant_id = %s
AND state = 1
""", (tenant_id,))
all_templates = cursor.fetchall()
print(f" 数据库中的模板总数: {len(all_templates)}")
# 2. 识别需要删除的模板
to_delete = []
minio_paths = []
invalid_paths = []
duplicate_paths = []
# 统计file_path
path_count = {}
for template in all_templates:
file_path = template.get('file_path')
if file_path:
if file_path not in path_count:
path_count[file_path] = []
path_count[file_path].append(template)
for template in all_templates:
file_path = template.get('file_path')
template_id = template['id']
# 检查是否是MinIO路径
if file_path and ('minio' in file_path.lower() or file_path.startswith('http://') or file_path.startswith('https://')):
minio_paths.append(template)
to_delete.append(template_id)
continue
# 检查文件路径是否在本地存在
if file_path:
if file_path not in local_templates:
invalid_paths.append(template)
to_delete.append(template_id)
continue
# 检查是否有重复路径
if len(path_count.get(file_path, [])) > 1:
# 保留第一个,删除其他的
if template != path_count[file_path][0]:
duplicate_paths.append(template)
to_delete.append(template_id)
continue
# 3. 统计需要删除的数据
print(f"\n 需要删除的模板:")
print(f" - MinIO路径的模板: {len(minio_paths)}")
print(f" - 无效路径的模板: {len(invalid_paths)}")
print(f" - 重复路径的模板: {len(duplicate_paths)}")
print(f" - 总计: {len(to_delete)}")
if to_delete and not dry_run:
# 4. 删除字段关联关系
print("\n 删除字段关联关系...")
if to_delete:
placeholders = ','.join(['%s'] * len(to_delete))
delete_relations_sql = f"""
DELETE FROM f_polic_file_field
WHERE tenant_id = %s
AND file_id IN ({placeholders})
"""
cursor.execute(delete_relations_sql, [tenant_id] + to_delete)
deleted_relations = cursor.rowcount
print(f" 删除了 {deleted_relations} 条字段关联关系")
# 5. 删除模板记录
print("\n 删除模板记录...")
delete_templates_sql = f"""
UPDATE f_polic_file_config
SET state = 0, updated_time = NOW(), updated_by = %s
WHERE tenant_id = %s
AND id IN ({placeholders})
"""
cursor.execute(delete_templates_sql, [UPDATED_BY, tenant_id] + to_delete)
deleted_templates = cursor.rowcount
print(f" 删除了 {deleted_templates} 个模板记录标记为state=0")
conn.commit()
print_result(True, f"清理完成:删除了 {deleted_templates} 个模板记录")
elif to_delete:
print("\n [预览模式] 将删除上述模板记录")
else:
print_result(True, "没有需要清理的数据")
return {
'total': len(all_templates),
'deleted': len(to_delete),
'minio_paths': len(minio_paths),
'invalid_paths': len(invalid_paths),
'duplicate_paths': len(duplicate_paths)
}
finally:
cursor.close()
def scan_directory_structure(base_dir: Path) -> Dict:
"""扫描目录结构"""
directories = []
files = []
def scan_recursive(current_path: Path, parent_path: Optional[str] = None):
"""递归扫描目录"""
if not current_path.exists() or not current_path.is_dir():
return
# 获取相对路径
rel_path = current_path.relative_to(base_dir)
rel_path_str = str(rel_path).replace('\\', '/')
# 添加目录节点
if rel_path_str != '.':
directories.append({
'name': current_path.name,
'path': rel_path_str,
'parent_path': parent_path
})
# 扫描子项
for item in sorted(current_path.iterdir()):
if item.is_dir():
scan_recursive(item, rel_path_str)
elif item.is_file() and item.suffix.lower() in ['.docx', '.doc']:
file_rel_path = item.relative_to(base_dir)
file_rel_path_str = str(file_rel_path).replace('\\', '/')
files.append({
'name': item.name,
'path': file_rel_path_str,
'parent_path': rel_path_str if rel_path_str != '.' else None
})
scan_recursive(base_dir)
return {
'directories': directories,
'files': files
}
def get_existing_templates(conn, tenant_id: int) -> Dict:
"""获取现有模板只获取state=1的"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute("""
SELECT id, name, file_path, parent_id
FROM f_polic_file_config
WHERE tenant_id = %s
AND state = 1
""", (tenant_id,))
templates = cursor.fetchall()
result = {
'by_path': {},
'by_name': {},
'by_id': {}
}
for t in templates:
result['by_id'][t['id']] = t
if t['file_path']:
result['by_path'][t['file_path']] = t
else:
name = t['name']
if name not in result['by_name']:
result['by_name'][name] = []
result['by_name'][name].append(t)
return result
finally:
cursor.close()
def sync_template_hierarchy(conn, tenant_id: int, dry_run: bool = False):
"""同步模板层级结构"""
print_section("同步模板层级结构")
# 1. 扫描目录结构
print("1. 扫描目录结构...")
structure = scan_directory_structure(TEMPLATES_DIR)
print_result(True, f"找到 {len(structure['directories'])} 个目录,{len(structure['files'])} 个文件")
if not structure['directories'] and not structure['files']:
print_result(False, "未找到任何目录或文件")
return None
# 2. 获取现有模板
print("\n2. 获取现有模板...")
existing_templates = get_existing_templates(conn, tenant_id)
print_result(True, f"找到 {len(existing_templates['by_path'])} 个文件模板,{len(existing_templates['by_name'])} 个目录模板")
# 3. 创建/更新目录节点
print("\n3. 创建/更新目录节点...")
path_to_id = {}
dir_created = 0
dir_updated = 0
for dir_info in structure['directories']:
parent_id = None
if dir_info['parent_path']:
parent_id = path_to_id.get(dir_info['parent_path'])
existing = None
candidates = existing_templates['by_name'].get(dir_info['name'], [])
for candidate in candidates:
if candidate.get('parent_id') == parent_id and not candidate.get('file_path'):
existing = candidate
break
if existing:
dir_id = existing['id']
if existing.get('parent_id') != parent_id:
dir_updated += 1
if not dry_run:
cursor = conn.cursor()
cursor.execute("""
UPDATE f_polic_file_config
SET parent_id = %s, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
""", (parent_id, UPDATED_BY, dir_id, tenant_id))
conn.commit()
cursor.close()
else:
dir_id = generate_id()
dir_created += 1
if not dry_run:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NULL, NOW(), %s, NOW(), %s, 1)
""", (dir_id, tenant_id, parent_id, dir_info['name'], CREATED_BY, UPDATED_BY))
conn.commit()
cursor.close()
path_to_id[dir_info['path']] = dir_id
print_result(True, f"创建 {dir_created} 个目录,更新 {dir_updated} 个目录")
# 4. 创建/更新文件节点
print("\n4. 创建/更新文件节点...")
file_created = 0
file_updated = 0
for file_info in structure['files']:
parent_id = None
if file_info['parent_path']:
parent_id = path_to_id.get(file_info['parent_path'])
existing = existing_templates['by_path'].get(file_info['path'])
if existing:
file_id = existing['id']
if existing.get('parent_id') != parent_id or existing.get('name') != file_info['name']:
file_updated += 1
if not dry_run:
cursor = conn.cursor()
cursor.execute("""
UPDATE f_polic_file_config
SET parent_id = %s, name = %s, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
""", (parent_id, file_info['name'], UPDATED_BY, file_id, tenant_id))
conn.commit()
cursor.close()
else:
file_id = generate_id()
file_created += 1
if not dry_run:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
""", (file_id, tenant_id, parent_id, file_info['name'], file_info['path'], CREATED_BY, UPDATED_BY))
conn.commit()
cursor.close()
print_result(True, f"创建 {file_created} 个文件,更新 {file_updated} 个文件")
return {
'directories_created': dir_created,
'directories_updated': dir_updated,
'files_created': file_created,
'files_updated': file_updated
}
def get_input_fields(conn, tenant_id: int) -> Dict[str, int]:
"""获取输入字段"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, filed_code, name
FROM f_polic_field
WHERE tenant_id = %s
AND field_type = 1
AND filed_code IN ('clue_info', 'target_basic_info_clue')
AND state = 1
"""
cursor.execute(sql, (tenant_id,))
fields = cursor.fetchall()
result = {}
for field in fields:
result[field['filed_code']] = field['id']
return result
finally:
cursor.close()
def get_output_fields(conn, tenant_id: int) -> Dict[str, int]:
"""获取所有输出字段"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, filed_code, name
FROM f_polic_field
WHERE tenant_id = %s
AND field_type = 2
AND state = 1
"""
cursor.execute(sql, (tenant_id,))
fields = cursor.fetchall()
result = {}
for field in fields:
result[field['filed_code']] = field['id']
return result
finally:
cursor.close()
def extract_placeholders_from_docx(file_path: Path) -> Set[str]:
"""从docx文件中提取所有占位符"""
placeholders = set()
placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}')
try:
doc = Document(file_path)
# 从段落中提取
for paragraph in doc.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
placeholders.add(field_code)
# 从表格中提取
for table in doc.tables:
try:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
text = paragraph.text
matches = placeholder_pattern.findall(text)
for match in matches:
field_code = match.strip()
if field_code:
placeholders.add(field_code)
except:
continue
except Exception as e:
pass
return placeholders
def create_missing_input_field(conn, tenant_id: int, field_code: str) -> Optional[int]:
"""创建缺失的输入字段"""
cursor = conn.cursor()
try:
field_id = generate_id()
field_name_map = {
'clue_info': '线索信息',
'target_basic_info_clue': '被核查人基本信息(线索)'
}
field_name = field_name_map.get(field_code, field_code.replace('_', ' '))
insert_sql = """
INSERT INTO f_polic_field
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
"""
cursor.execute(insert_sql, (
field_id,
tenant_id,
field_name,
field_code,
1,
CREATED_BY,
UPDATED_BY
))
conn.commit()
return field_id
except Exception as e:
conn.rollback()
return None
finally:
cursor.close()
def create_missing_output_field(conn, tenant_id: int, field_code: str) -> Optional[int]:
"""创建缺失的输出字段"""
cursor = conn.cursor()
try:
# 先检查是否已存在
check_cursor = conn.cursor(pymysql.cursors.DictCursor)
check_cursor.execute("""
SELECT id FROM f_polic_field
WHERE tenant_id = %s AND filed_code = %s
""", (tenant_id, field_code))
existing = check_cursor.fetchone()
check_cursor.close()
if existing:
return existing['id']
# 创建新字段
field_id = generate_id()
field_name = field_code.replace('_', ' ')
insert_sql = """
INSERT INTO f_polic_field
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
"""
cursor.execute(insert_sql, (
field_id,
tenant_id,
field_name,
field_code,
2,
CREATED_BY,
UPDATED_BY
))
conn.commit()
return field_id
except Exception as e:
conn.rollback()
return None
finally:
cursor.close()
def get_existing_relations(conn, tenant_id: int, file_id: int) -> Set[int]:
"""获取模板的现有关联关系"""
cursor = conn.cursor()
try:
sql = """
SELECT filed_id
FROM f_polic_file_field
WHERE tenant_id = %s
AND file_id = %s
AND state = 1
"""
cursor.execute(sql, (tenant_id, file_id))
results = cursor.fetchall()
return {row[0] for row in results}
finally:
cursor.close()
def sync_field_relations(conn, tenant_id: int, dry_run: bool = False):
"""同步字段关联关系"""
print_section("同步字段关联关系")
# 1. 获取输入字段
print("1. 获取输入字段...")
input_fields = get_input_fields(conn, tenant_id)
if not input_fields:
print(" 创建缺失的输入字段...")
for field_code in ['clue_info', 'target_basic_info_clue']:
field_id = create_missing_input_field(conn, tenant_id, field_code)
if field_id:
input_fields[field_code] = field_id
if not input_fields:
print_result(False, "无法获取或创建输入字段")
return None
input_field_ids = list(input_fields.values())
print_result(True, f"找到 {len(input_field_ids)} 个输入字段")
# 2. 获取输出字段
print("\n2. 获取输出字段...")
output_fields = get_output_fields(conn, tenant_id)
print_result(True, f"找到 {len(output_fields)} 个输出字段")
# 3. 获取所有模板
print("\n3. 获取所有模板...")
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT id, name, file_path
FROM f_polic_file_config
WHERE tenant_id = %s
AND file_path IS NOT NULL
AND file_path != ''
AND state = 1
"""
cursor.execute(sql, (tenant_id,))
templates = cursor.fetchall()
finally:
cursor.close()
print_result(True, f"找到 {len(templates)} 个模板")
if not templates:
print_result(False, "未找到模板")
return None
# 4. 先清理所有现有关联关系
print("\n4. 清理现有关联关系...")
if not dry_run:
cursor = conn.cursor()
try:
cursor.execute("""
DELETE FROM f_polic_file_field
WHERE tenant_id = %s
""", (tenant_id,))
deleted_count = cursor.rowcount
conn.commit()
print_result(True, f"删除了 {deleted_count} 条旧关联关系")
finally:
cursor.close()
else:
print(" [预览模式] 将清理所有现有关联关系")
# 5. 扫描模板占位符并创建关联关系
print("\n5. 扫描模板占位符并创建关联关系...")
total_updated = 0
total_errors = 0
all_placeholders_found = set()
missing_fields = set()
for i, template in enumerate(templates, 1):
template_id = template['id']
template_name = template['name']
file_path = template['file_path']
if i % 20 == 0:
print(f" 处理进度: {i}/{len(templates)}")
# 检查本地文件是否存在
local_file = PROJECT_ROOT / file_path
if not local_file.exists():
total_errors += 1
continue
# 提取占位符
placeholders = extract_placeholders_from_docx(local_file)
all_placeholders_found.update(placeholders)
# 根据占位符找到对应的输出字段ID
output_field_ids = []
for placeholder in placeholders:
if placeholder in output_fields:
output_field_ids.append(output_fields[placeholder])
else:
# 字段不存在,尝试创建
missing_fields.add(placeholder)
field_id = create_missing_output_field(conn, tenant_id, placeholder)
if field_id:
output_fields[placeholder] = field_id
output_field_ids.append(field_id)
# 创建关联关系
all_field_ids = input_field_ids + output_field_ids
if not dry_run and all_field_ids:
cursor = conn.cursor()
try:
for field_id in all_field_ids:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
"""
cursor.execute(insert_sql, (
relation_id,
tenant_id,
template_id,
field_id,
CREATED_BY,
UPDATED_BY
))
conn.commit()
total_updated += 1
except Exception as e:
conn.rollback()
total_errors += 1
finally:
cursor.close()
else:
total_updated += 1
# 6. 统计结果
print_section("字段关联同步结果")
print(f" 总模板数: {len(templates)}")
print(f" 已处理: {total_updated}")
print(f" 错误: {total_errors}")
print(f" 发现的占位符总数: {len(all_placeholders_found)}")
print(f" 创建的字段数: {len(missing_fields)}")
return {
'total_templates': len(templates),
'updated': total_updated,
'errors': total_errors,
'placeholders_found': len(all_placeholders_found),
'fields_created': len(missing_fields)
}
def main():
"""主函数"""
print_section("清理并重新同步模板数据")
# 获取配置
config = get_db_config_from_args()
# 显示配置信息
print_section("配置信息")
print(f" 数据库服务器: {config['host']}:{config['port']}")
print(f" 数据库名称: {config['database']}")
print(f" 用户名: {config['user']}")
print(f" 租户ID: {config['tenant_id']}")
print(f" 预览模式: {'' if config['dry_run'] else ''}")
print(f" 跳过清理: {'' if config['skip_clean'] else ''}")
if config['dry_run']:
print("\n[注意] 当前为预览模式,不会实际更新数据库")
# 确认
if not config.get('dry_run'):
print("\n[警告] 此操作将清理指定租户下的旧数据并重新同步")
confirm = input("确认执行?[yes/N]: ").strip().lower()
if confirm != 'yes':
print("已取消")
return
# 连接数据库
print_section("连接数据库")
conn = test_db_connection(config)
if not conn:
return
print_result(True, "数据库连接成功")
try:
tenant_id = config['tenant_id']
dry_run = config['dry_run']
skip_clean = config['skip_clean']
results = {}
# 1. 扫描本地模板
print_section("扫描本地模板")
local_templates = scan_local_templates()
print_result(True, f"找到 {len(local_templates)} 个本地模板文件")
# 2. 清理旧数据
if not skip_clean:
clean_result = clean_old_data(conn, tenant_id, local_templates, dry_run)
results['clean'] = clean_result
else:
print_section("跳过清理步骤")
print(" 已跳过清理步骤")
# 3. 同步模板层级结构
hierarchy_result = sync_template_hierarchy(conn, tenant_id, dry_run)
results['hierarchy'] = hierarchy_result
# 4. 同步字段关联关系
fields_result = sync_field_relations(conn, tenant_id, dry_run)
results['fields'] = fields_result
# 5. 总结
print_section("同步完成")
if config['dry_run']:
print(" 本次为预览模式,未实际更新数据库")
else:
print(" 数据库已更新")
if 'clean' in results:
c = results['clean']
print(f"\n 清理结果:")
print(f" - 总模板数: {c['total']}")
print(f" - 删除模板: {c['deleted']}")
print(f" * MinIO路径: {c['minio_paths']}")
print(f" * 无效路径: {c['invalid_paths']}")
print(f" * 重复路径: {c['duplicate_paths']}")
if 'hierarchy' in results and results['hierarchy']:
h = results['hierarchy']
print(f"\n 层级结构:")
print(f" - 创建目录: {h['directories_created']}")
print(f" - 更新目录: {h['directories_updated']}")
print(f" - 创建文件: {h['files_created']}")
print(f" - 更新文件: {h['files_updated']}")
if 'fields' in results and results['fields']:
f = results['fields']
print(f"\n 字段关联:")
print(f" - 总模板数: {f['total_templates']}")
print(f" - 已处理: {f['updated']}")
print(f" - 发现的占位符: {f['placeholders_found']}")
print(f" - 创建的字段: {f['fields_created']}")
finally:
conn.close()
print_result(True, "数据库连接已关闭")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n[中断] 用户取消操作")
sys.exit(0)
except Exception as e:
print(f"\n[错误] 发生异常: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)