929 lines
32 KiB
Python
929 lines
32 KiB
Python
"""
|
||
模板更新脚本 - 支持自定义数据库连接和租户ID配置
|
||
|
||
功能:
|
||
1. 更新模板层级结构(根据template_finish/目录结构)
|
||
2. 更新模板字段关联关系(输入字段和输出字段)
|
||
|
||
使用方法:
|
||
1. 命令行参数方式:
|
||
python update_templates_custom.py --host 192.168.1.100 --port 3306 --user root --password 123456 --database finyx --tenant-id 1
|
||
|
||
2. 交互式输入方式:
|
||
python update_templates_custom.py
|
||
"""
|
||
import os
|
||
import sys
|
||
import pymysql
|
||
import argparse
|
||
from pathlib import Path
|
||
from typing import Dict, List, Set, Optional
|
||
import re
|
||
from docx import Document
|
||
import getpass
|
||
|
||
# 设置输出编码为UTF-8(Windows兼容)
|
||
if sys.platform == 'win32':
|
||
import io
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
||
|
||
# 项目根目录
|
||
PROJECT_ROOT = Path(__file__).parent
|
||
TEMPLATES_DIR = PROJECT_ROOT / "template_finish"
|
||
|
||
CREATED_BY = 655162080928945152
|
||
UPDATED_BY = 655162080928945152
|
||
|
||
|
||
def print_section(title):
|
||
"""打印章节标题"""
|
||
print("\n" + "="*70)
|
||
print(f" {title}")
|
||
print("="*70)
|
||
|
||
|
||
def print_result(success, message):
|
||
"""打印结果"""
|
||
status = "[OK]" if success else "[FAIL]"
|
||
print(f"{status} {message}")
|
||
|
||
|
||
def generate_id():
|
||
"""生成ID"""
|
||
import time
|
||
return int(time.time() * 1000000)
|
||
|
||
|
||
def get_db_config_from_args() -> Optional[Dict]:
|
||
"""从命令行参数获取数据库配置"""
|
||
parser = argparse.ArgumentParser(
|
||
description='模板更新脚本 - 支持自定义数据库连接和租户ID',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
示例:
|
||
# 使用命令行参数
|
||
python update_templates_custom.py --host 192.168.1.100 --port 3306 --user root --password 123456 --database finyx --tenant-id 1
|
||
|
||
# 使用交互式输入
|
||
python update_templates_custom.py
|
||
"""
|
||
)
|
||
|
||
parser.add_argument('--host', type=str, help='MySQL服务器地址')
|
||
parser.add_argument('--port', type=int, help='MySQL服务器端口')
|
||
parser.add_argument('--user', type=str, help='MySQL用户名')
|
||
parser.add_argument('--password', type=str, help='MySQL密码')
|
||
parser.add_argument('--database', type=str, help='数据库名称')
|
||
parser.add_argument('--tenant-id', type=int, help='租户ID')
|
||
parser.add_argument('--dry-run', action='store_true', help='预览模式(不实际更新数据库)')
|
||
parser.add_argument('--update-hierarchy', action='store_true', default=True, help='更新模板层级结构(默认启用)')
|
||
parser.add_argument('--update-fields', action='store_true', default=True, help='更新字段关联关系(默认启用)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 如果所有参数都提供了,返回配置
|
||
if all([args.host, args.port, args.user, args.password, args.database, args.tenant_id]):
|
||
return {
|
||
'host': args.host,
|
||
'port': args.port,
|
||
'user': args.user,
|
||
'password': args.password,
|
||
'database': args.database,
|
||
'charset': 'utf8mb4',
|
||
'tenant_id': args.tenant_id,
|
||
'dry_run': args.dry_run,
|
||
'update_hierarchy': args.update_hierarchy,
|
||
'update_fields': args.update_fields
|
||
}
|
||
|
||
return None
|
||
|
||
|
||
def get_db_config_interactive() -> Dict:
|
||
"""交互式获取数据库配置"""
|
||
print_section("数据库连接配置")
|
||
print("请输入数据库连接信息(直接回车使用默认值):")
|
||
|
||
host = input("MySQL服务器地址 [152.136.177.240]: ").strip() or "152.136.177.240"
|
||
port_str = input("MySQL服务器端口 [5012]: ").strip() or "5012"
|
||
port = int(port_str) if port_str.isdigit() else 5012
|
||
user = input("MySQL用户名 [finyx]: ").strip() or "finyx"
|
||
password = getpass.getpass("MySQL密码 [留空使用默认]: ").strip()
|
||
if not password:
|
||
password = "6QsGK6MpePZDE57Z"
|
||
database = input("数据库名称 [finyx]: ").strip() or "finyx"
|
||
|
||
print("\n租户配置:")
|
||
tenant_id_str = input("租户ID (tenant_id) [必填]: ").strip()
|
||
if not tenant_id_str:
|
||
print("[错误] 租户ID不能为空")
|
||
sys.exit(1)
|
||
try:
|
||
tenant_id = int(tenant_id_str)
|
||
except ValueError:
|
||
print("[错误] 租户ID必须是数字")
|
||
sys.exit(1)
|
||
|
||
print("\n更新选项:")
|
||
update_hierarchy = input("更新模板层级结构?[Y/n]: ").strip().lower() != 'n'
|
||
update_fields = input("更新字段关联关系?[Y/n]: ").strip().lower() != 'n'
|
||
dry_run = input("预览模式(不实际更新)?[y/N]: ").strip().lower() == 'y'
|
||
|
||
return {
|
||
'host': host,
|
||
'port': port,
|
||
'user': user,
|
||
'password': password,
|
||
'database': database,
|
||
'charset': 'utf8mb4',
|
||
'tenant_id': tenant_id,
|
||
'dry_run': dry_run,
|
||
'update_hierarchy': update_hierarchy,
|
||
'update_fields': update_fields
|
||
}
|
||
|
||
|
||
def test_db_connection(config: Dict) -> Optional[pymysql.Connection]:
|
||
"""测试数据库连接"""
|
||
try:
|
||
conn = pymysql.connect(
|
||
host=config['host'],
|
||
port=config['port'],
|
||
user=config['user'],
|
||
password=config['password'],
|
||
database=config['database'],
|
||
charset=config['charset']
|
||
)
|
||
return conn
|
||
except Exception as e:
|
||
print_result(False, f"数据库连接失败: {str(e)}")
|
||
return None
|
||
|
||
|
||
# ==================== 模板层级结构更新 ====================
|
||
|
||
def scan_directory_structure(base_dir: Path) -> Dict:
|
||
"""扫描目录结构"""
|
||
directories = []
|
||
files = []
|
||
|
||
def scan_recursive(current_path: Path, parent_path: Optional[str] = None):
|
||
"""递归扫描目录"""
|
||
if not current_path.exists() or not current_path.is_dir():
|
||
return
|
||
|
||
# 获取相对路径
|
||
rel_path = current_path.relative_to(base_dir)
|
||
rel_path_str = str(rel_path).replace('\\', '/')
|
||
|
||
# 添加目录节点
|
||
if rel_path_str != '.':
|
||
directories.append({
|
||
'name': current_path.name,
|
||
'path': rel_path_str,
|
||
'parent_path': parent_path
|
||
})
|
||
|
||
# 扫描子项
|
||
for item in sorted(current_path.iterdir()):
|
||
if item.is_dir():
|
||
scan_recursive(item, rel_path_str)
|
||
elif item.is_file() and item.suffix.lower() in ['.docx', '.doc']:
|
||
file_rel_path = item.relative_to(base_dir)
|
||
file_rel_path_str = str(file_rel_path).replace('\\', '/')
|
||
files.append({
|
||
'name': item.name,
|
||
'path': file_rel_path_str,
|
||
'parent_path': rel_path_str if rel_path_str != '.' else None
|
||
})
|
||
|
||
scan_recursive(base_dir)
|
||
|
||
return {
|
||
'directories': directories,
|
||
'files': files
|
||
}
|
||
|
||
|
||
def get_existing_templates(conn, tenant_id: int) -> Dict:
|
||
"""获取现有模板"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
# 获取所有模板(包括目录和文件)
|
||
cursor.execute("""
|
||
SELECT id, name, file_path, parent_id
|
||
FROM f_polic_file_config
|
||
WHERE tenant_id = %s
|
||
AND state = 1
|
||
""", (tenant_id,))
|
||
templates = cursor.fetchall()
|
||
|
||
result = {
|
||
'by_path': {}, # file_path -> template
|
||
'by_name': {}, # name -> [templates]
|
||
'by_id': {} # id -> template
|
||
}
|
||
|
||
for t in templates:
|
||
result['by_id'][t['id']] = t
|
||
if t['file_path']:
|
||
result['by_path'][t['file_path']] = t
|
||
else:
|
||
# 目录节点
|
||
name = t['name']
|
||
if name not in result['by_name']:
|
||
result['by_name'][name] = []
|
||
result['by_name'][name].append(t)
|
||
|
||
return result
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def create_or_update_directory(conn, tenant_id: int, name: str, parent_id: Optional[int],
|
||
existing_templates: Dict, dry_run: bool = False) -> int:
|
||
"""创建或更新目录节点"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
# 查找现有目录(通过名称和parent_id匹配)
|
||
candidates = existing_templates['by_name'].get(name, [])
|
||
existing = None
|
||
for candidate in candidates:
|
||
if candidate.get('parent_id') == parent_id and not candidate.get('file_path'):
|
||
existing = candidate
|
||
break
|
||
|
||
if existing:
|
||
# 更新现有目录
|
||
if not dry_run:
|
||
cursor.execute("""
|
||
UPDATE f_polic_file_config
|
||
SET parent_id = %s, updated_time = NOW(), updated_by = %s
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (parent_id, UPDATED_BY, existing['id'], tenant_id))
|
||
conn.commit()
|
||
print(f" [更新目录] {name} (ID: {existing['id']})")
|
||
return existing['id']
|
||
else:
|
||
# 创建新目录
|
||
dir_id = generate_id()
|
||
if not dry_run:
|
||
cursor.execute("""
|
||
INSERT INTO f_polic_file_config
|
||
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, NULL, NOW(), %s, NOW(), %s, 1)
|
||
""", (dir_id, tenant_id, parent_id, name, CREATED_BY, UPDATED_BY))
|
||
conn.commit()
|
||
print(f" [创建目录] {name} (ID: {dir_id})")
|
||
return dir_id
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def create_or_update_file(conn, tenant_id: int, file_info: Dict, parent_id: Optional[int],
|
||
existing_templates: Dict, dry_run: bool = False) -> int:
|
||
"""创建或更新文件节点"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
|
||
try:
|
||
file_path = file_info['path']
|
||
file_name = file_info['name']
|
||
|
||
# 查找现有文件(通过file_path匹配)
|
||
existing = existing_templates['by_path'].get(file_path)
|
||
|
||
if existing:
|
||
# 更新现有文件
|
||
if not dry_run:
|
||
cursor.execute("""
|
||
UPDATE f_polic_file_config
|
||
SET parent_id = %s, name = %s, updated_time = NOW(), updated_by = %s
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (parent_id, file_name, UPDATED_BY, existing['id'], tenant_id))
|
||
conn.commit()
|
||
print(f" [更新文件] {file_name} (ID: {existing['id']})")
|
||
return existing['id']
|
||
else:
|
||
# 创建新文件
|
||
file_id = generate_id()
|
||
if not dry_run:
|
||
cursor.execute("""
|
||
INSERT INTO f_polic_file_config
|
||
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
""", (file_id, tenant_id, parent_id, file_name, file_path, CREATED_BY, UPDATED_BY))
|
||
conn.commit()
|
||
print(f" [创建文件] {file_name} (ID: {file_id})")
|
||
return file_id
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def update_template_hierarchy(conn, tenant_id: int, dry_run: bool = False):
|
||
"""更新模板层级结构"""
|
||
print_section("更新模板层级结构")
|
||
|
||
# 1. 扫描目录结构
|
||
print("1. 扫描目录结构...")
|
||
structure = scan_directory_structure(TEMPLATES_DIR)
|
||
print_result(True, f"找到 {len(structure['directories'])} 个目录,{len(structure['files'])} 个文件")
|
||
|
||
if not structure['directories'] and not structure['files']:
|
||
print_result(False, "未找到任何目录或文件")
|
||
return
|
||
|
||
# 2. 获取现有模板
|
||
print("\n2. 获取现有模板...")
|
||
existing_templates = get_existing_templates(conn, tenant_id)
|
||
print_result(True, f"找到 {len(existing_templates['by_path'])} 个文件模板,{len(existing_templates['by_name'])} 个目录模板")
|
||
|
||
# 3. 创建/更新目录节点
|
||
print("\n3. 创建/更新目录节点...")
|
||
path_to_id = {}
|
||
dir_created = 0
|
||
dir_updated = 0
|
||
|
||
for dir_info in structure['directories']:
|
||
parent_id = None
|
||
if dir_info['parent_path']:
|
||
parent_id = path_to_id.get(dir_info['parent_path'])
|
||
|
||
existing = None
|
||
candidates = existing_templates['by_name'].get(dir_info['name'], [])
|
||
for candidate in candidates:
|
||
if candidate.get('parent_id') == parent_id and not candidate.get('file_path'):
|
||
existing = candidate
|
||
break
|
||
|
||
if existing:
|
||
dir_id = existing['id']
|
||
if existing.get('parent_id') != parent_id:
|
||
dir_updated += 1
|
||
else:
|
||
dir_id = generate_id()
|
||
dir_created += 1
|
||
|
||
if not dry_run:
|
||
if existing and existing.get('parent_id') != parent_id:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE f_polic_file_config
|
||
SET parent_id = %s, updated_time = NOW(), updated_by = %s
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (parent_id, UPDATED_BY, dir_id, tenant_id))
|
||
conn.commit()
|
||
cursor.close()
|
||
elif not existing:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO f_polic_file_config
|
||
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, NULL, NOW(), %s, NOW(), %s, 1)
|
||
""", (dir_id, tenant_id, parent_id, dir_info['name'], CREATED_BY, UPDATED_BY))
|
||
conn.commit()
|
||
cursor.close()
|
||
|
||
path_to_id[dir_info['path']] = dir_id
|
||
|
||
print_result(True, f"创建 {dir_created} 个目录,更新 {dir_updated} 个目录")
|
||
|
||
# 4. 创建/更新文件节点
|
||
print("\n4. 创建/更新文件节点...")
|
||
file_created = 0
|
||
file_updated = 0
|
||
|
||
for file_info in structure['files']:
|
||
parent_id = None
|
||
if file_info['parent_path']:
|
||
parent_id = path_to_id.get(file_info['parent_path'])
|
||
|
||
existing = existing_templates['by_path'].get(file_info['path'])
|
||
|
||
if existing:
|
||
file_id = existing['id']
|
||
if existing.get('parent_id') != parent_id or existing.get('name') != file_info['name']:
|
||
file_updated += 1
|
||
else:
|
||
file_id = generate_id()
|
||
file_created += 1
|
||
|
||
if not dry_run:
|
||
if existing:
|
||
if existing.get('parent_id') != parent_id or existing.get('name') != file_info['name']:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE f_polic_file_config
|
||
SET parent_id = %s, name = %s, updated_time = NOW(), updated_by = %s
|
||
WHERE id = %s AND tenant_id = %s
|
||
""", (parent_id, file_info['name'], UPDATED_BY, file_id, tenant_id))
|
||
conn.commit()
|
||
cursor.close()
|
||
else:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO f_polic_file_config
|
||
(id, tenant_id, parent_id, name, file_path, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
""", (file_id, tenant_id, parent_id, file_info['name'], file_info['path'], CREATED_BY, UPDATED_BY))
|
||
conn.commit()
|
||
cursor.close()
|
||
|
||
print_result(True, f"创建 {file_created} 个文件,更新 {file_updated} 个文件")
|
||
|
||
return {
|
||
'directories_created': dir_created,
|
||
'directories_updated': dir_updated,
|
||
'files_created': file_created,
|
||
'files_updated': file_updated
|
||
}
|
||
|
||
|
||
# ==================== 字段关联关系更新 ====================
|
||
|
||
def get_input_fields(conn, tenant_id: int) -> Dict[str, int]:
|
||
"""获取输入字段"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
sql = """
|
||
SELECT id, filed_code, name
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s
|
||
AND field_type = 1
|
||
AND filed_code IN ('clue_info', 'target_basic_info_clue')
|
||
AND state = 1
|
||
"""
|
||
cursor.execute(sql, (tenant_id,))
|
||
fields = cursor.fetchall()
|
||
|
||
result = {}
|
||
for field in fields:
|
||
result[field['filed_code']] = field['id']
|
||
|
||
return result
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def get_output_fields(conn, tenant_id: int) -> Dict[str, int]:
|
||
"""获取所有输出字段"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
sql = """
|
||
SELECT id, filed_code, name
|
||
FROM f_polic_field
|
||
WHERE tenant_id = %s
|
||
AND field_type = 2
|
||
AND state = 1
|
||
"""
|
||
cursor.execute(sql, (tenant_id,))
|
||
fields = cursor.fetchall()
|
||
|
||
result = {}
|
||
for field in fields:
|
||
result[field['filed_code']] = field['id']
|
||
|
||
return result
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def extract_placeholders_from_docx(file_path: Path) -> Set[str]:
|
||
"""从docx文件中提取所有占位符"""
|
||
placeholders = set()
|
||
placeholder_pattern = re.compile(r'\{\{([^}]+)\}\}')
|
||
|
||
try:
|
||
doc = Document(file_path)
|
||
|
||
# 从段落中提取
|
||
for paragraph in doc.paragraphs:
|
||
text = paragraph.text
|
||
matches = placeholder_pattern.findall(text)
|
||
for match in matches:
|
||
field_code = match.strip()
|
||
if field_code:
|
||
placeholders.add(field_code)
|
||
|
||
# 从表格中提取
|
||
for table in doc.tables:
|
||
try:
|
||
for row in table.rows:
|
||
for cell in row.cells:
|
||
for paragraph in cell.paragraphs:
|
||
text = paragraph.text
|
||
matches = placeholder_pattern.findall(text)
|
||
for match in matches:
|
||
field_code = match.strip()
|
||
if field_code:
|
||
placeholders.add(field_code)
|
||
except:
|
||
continue
|
||
except Exception as e:
|
||
pass # 静默处理错误
|
||
|
||
return placeholders
|
||
|
||
|
||
def get_all_templates(conn, tenant_id: int) -> List[Dict]:
|
||
"""获取所有模板(只获取文件节点)"""
|
||
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
try:
|
||
sql = """
|
||
SELECT id, name, file_path
|
||
FROM f_polic_file_config
|
||
WHERE tenant_id = %s
|
||
AND file_path IS NOT NULL
|
||
AND file_path != ''
|
||
AND state = 1
|
||
"""
|
||
cursor.execute(sql, (tenant_id,))
|
||
templates = cursor.fetchall()
|
||
return templates
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def get_existing_relations(conn, tenant_id: int, file_id: int) -> Set[int]:
|
||
"""获取模板的现有关联关系"""
|
||
cursor = conn.cursor()
|
||
try:
|
||
sql = """
|
||
SELECT filed_id
|
||
FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
AND file_id = %s
|
||
AND state = 1
|
||
"""
|
||
cursor.execute(sql, (tenant_id, file_id))
|
||
results = cursor.fetchall()
|
||
return {row[0] for row in results}
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def create_missing_input_field(conn, tenant_id: int, field_code: str) -> Optional[int]:
|
||
"""创建缺失的输入字段"""
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
field_id = generate_id()
|
||
field_name_map = {
|
||
'clue_info': '线索信息',
|
||
'target_basic_info_clue': '被核查人基本信息(线索)'
|
||
}
|
||
field_name = field_name_map.get(field_code, field_code.replace('_', ' '))
|
||
|
||
insert_sql = """
|
||
INSERT INTO f_polic_field
|
||
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
"""
|
||
cursor.execute(insert_sql, (
|
||
field_id,
|
||
tenant_id,
|
||
field_name,
|
||
field_code,
|
||
1, # field_type=1 表示输入字段
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
conn.commit()
|
||
|
||
return field_id
|
||
|
||
except Exception as e:
|
||
conn.rollback()
|
||
return None
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def create_missing_output_field(conn, tenant_id: int, field_code: str) -> Optional[int]:
|
||
"""创建缺失的输出字段"""
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 先检查是否已存在
|
||
check_cursor = conn.cursor(pymysql.cursors.DictCursor)
|
||
check_cursor.execute("""
|
||
SELECT id FROM f_polic_field
|
||
WHERE tenant_id = %s AND filed_code = %s
|
||
""", (tenant_id, field_code))
|
||
existing = check_cursor.fetchone()
|
||
check_cursor.close()
|
||
|
||
if existing:
|
||
return existing['id']
|
||
|
||
# 创建新字段
|
||
field_id = generate_id()
|
||
field_name = field_code.replace('_', ' ')
|
||
|
||
insert_sql = """
|
||
INSERT INTO f_polic_field
|
||
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
"""
|
||
cursor.execute(insert_sql, (
|
||
field_id,
|
||
tenant_id,
|
||
field_name,
|
||
field_code,
|
||
2, # field_type=2 表示输出字段
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
conn.commit()
|
||
|
||
return field_id
|
||
|
||
except Exception as e:
|
||
conn.rollback()
|
||
return None
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def update_template_field_relations(conn, tenant_id: int, file_id: int, file_name: str,
|
||
input_field_ids: List[int], output_field_ids: List[int],
|
||
dry_run: bool = False):
|
||
"""更新模板的字段关联关系"""
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
all_field_ids = set(input_field_ids + output_field_ids)
|
||
|
||
if not all_field_ids:
|
||
return
|
||
|
||
# 获取现有关联
|
||
existing_field_ids = get_existing_relations(conn, tenant_id, file_id)
|
||
|
||
# 需要添加的字段
|
||
to_add = all_field_ids - existing_field_ids
|
||
# 需要删除的字段
|
||
to_remove = existing_field_ids - all_field_ids
|
||
|
||
if not to_add and not to_remove:
|
||
return
|
||
|
||
if dry_run:
|
||
return
|
||
|
||
# 删除需要移除的关联
|
||
if to_remove:
|
||
placeholders = ','.join(['%s'] * len(to_remove))
|
||
delete_sql = f"""
|
||
DELETE FROM f_polic_file_field
|
||
WHERE tenant_id = %s
|
||
AND file_id = %s
|
||
AND filed_id IN ({placeholders})
|
||
"""
|
||
cursor.execute(delete_sql, [tenant_id, file_id] + list(to_remove))
|
||
|
||
# 添加新的关联
|
||
for field_id in to_add:
|
||
# 检查是否已存在
|
||
check_sql = """
|
||
SELECT id FROM f_polic_file_field
|
||
WHERE tenant_id = %s AND file_id = %s AND filed_id = %s
|
||
"""
|
||
cursor.execute(check_sql, (tenant_id, file_id, field_id))
|
||
if cursor.fetchone():
|
||
continue
|
||
|
||
relation_id = generate_id()
|
||
insert_sql = """
|
||
INSERT INTO f_polic_file_field
|
||
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
|
||
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, 1)
|
||
"""
|
||
cursor.execute(insert_sql, (
|
||
relation_id,
|
||
tenant_id,
|
||
file_id,
|
||
field_id,
|
||
CREATED_BY,
|
||
UPDATED_BY
|
||
))
|
||
|
||
conn.commit()
|
||
|
||
except Exception as e:
|
||
conn.rollback()
|
||
raise
|
||
finally:
|
||
cursor.close()
|
||
|
||
|
||
def update_all_field_relations(conn, tenant_id: int, dry_run: bool = False):
|
||
"""更新所有模板的字段关联关系"""
|
||
print_section("更新字段关联关系")
|
||
|
||
# 1. 获取输入字段
|
||
print("1. 获取输入字段...")
|
||
input_fields = get_input_fields(conn, tenant_id)
|
||
|
||
if not input_fields:
|
||
print(" 创建缺失的输入字段...")
|
||
for field_code in ['clue_info', 'target_basic_info_clue']:
|
||
field_id = create_missing_input_field(conn, tenant_id, field_code)
|
||
if field_id:
|
||
input_fields[field_code] = field_id
|
||
|
||
if not input_fields:
|
||
print_result(False, "无法获取或创建输入字段")
|
||
return
|
||
|
||
input_field_ids = list(input_fields.values())
|
||
print_result(True, f"找到 {len(input_field_ids)} 个输入字段")
|
||
|
||
# 2. 获取输出字段
|
||
print("\n2. 获取输出字段...")
|
||
output_fields = get_output_fields(conn, tenant_id)
|
||
print_result(True, f"找到 {len(output_fields)} 个输出字段")
|
||
|
||
# 3. 获取所有模板
|
||
print("\n3. 获取所有模板...")
|
||
templates = get_all_templates(conn, tenant_id)
|
||
print_result(True, f"找到 {len(templates)} 个模板")
|
||
|
||
if not templates:
|
||
print_result(False, "未找到模板")
|
||
return
|
||
|
||
# 4. 扫描模板占位符并更新关联关系
|
||
print("\n4. 扫描模板占位符并更新关联关系...")
|
||
|
||
total_updated = 0
|
||
total_kept = 0
|
||
total_errors = 0
|
||
all_placeholders_found = set()
|
||
missing_fields = set()
|
||
|
||
for i, template in enumerate(templates, 1):
|
||
template_id = template['id']
|
||
template_name = template['name']
|
||
file_path = template['file_path']
|
||
|
||
if i % 20 == 0:
|
||
print(f" 处理进度: {i}/{len(templates)}")
|
||
|
||
# 检查本地文件是否存在
|
||
local_file = PROJECT_ROOT / file_path
|
||
if not local_file.exists():
|
||
total_errors += 1
|
||
continue
|
||
|
||
# 提取占位符
|
||
placeholders = extract_placeholders_from_docx(local_file)
|
||
all_placeholders_found.update(placeholders)
|
||
|
||
# 根据占位符找到对应的输出字段ID
|
||
output_field_ids = []
|
||
for placeholder in placeholders:
|
||
if placeholder in output_fields:
|
||
output_field_ids.append(output_fields[placeholder])
|
||
else:
|
||
# 字段不存在,尝试创建
|
||
missing_fields.add(placeholder)
|
||
field_id = create_missing_output_field(conn, tenant_id, placeholder)
|
||
if field_id:
|
||
output_fields[placeholder] = field_id
|
||
output_field_ids.append(field_id)
|
||
|
||
# 更新关联关系
|
||
try:
|
||
existing = get_existing_relations(conn, tenant_id, template_id)
|
||
to_add = set(input_field_ids + output_field_ids) - existing
|
||
to_remove = existing - set(input_field_ids + output_field_ids)
|
||
|
||
if to_add or to_remove:
|
||
update_template_field_relations(
|
||
conn, tenant_id, template_id, template_name,
|
||
input_field_ids, output_field_ids, dry_run
|
||
)
|
||
total_updated += 1
|
||
else:
|
||
total_kept += 1
|
||
except Exception as e:
|
||
total_errors += 1
|
||
|
||
# 5. 统计结果
|
||
print_section("字段关联更新结果")
|
||
print(f" 总模板数: {len(templates)}")
|
||
print(f" 已更新: {total_updated} 个")
|
||
print(f" 保持不变: {total_kept} 个")
|
||
print(f" 错误: {total_errors} 个")
|
||
print(f" 发现的占位符总数: {len(all_placeholders_found)} 个")
|
||
print(f" 创建的字段数: {len(missing_fields)} 个")
|
||
|
||
return {
|
||
'total_templates': len(templates),
|
||
'updated': total_updated,
|
||
'kept': total_kept,
|
||
'errors': total_errors,
|
||
'placeholders_found': len(all_placeholders_found),
|
||
'fields_created': len(missing_fields)
|
||
}
|
||
|
||
|
||
# ==================== 主函数 ====================
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print_section("模板更新脚本")
|
||
print("支持自定义数据库连接和租户ID配置")
|
||
|
||
# 获取配置
|
||
config = get_db_config_from_args()
|
||
if not config:
|
||
config = get_db_config_interactive()
|
||
|
||
# 显示配置信息
|
||
print_section("配置信息")
|
||
print(f" 数据库服务器: {config['host']}:{config['port']}")
|
||
print(f" 数据库名称: {config['database']}")
|
||
print(f" 用户名: {config['user']}")
|
||
print(f" 租户ID: {config['tenant_id']}")
|
||
print(f" 预览模式: {'是' if config['dry_run'] else '否'}")
|
||
print(f" 更新层级结构: {'是' if config['update_hierarchy'] else '否'}")
|
||
print(f" 更新字段关联: {'是' if config['update_fields'] else '否'}")
|
||
|
||
if config['dry_run']:
|
||
print("\n[注意] 当前为预览模式,不会实际更新数据库")
|
||
|
||
# 确认
|
||
if not config.get('dry_run'):
|
||
confirm = input("\n确认执行更新?[y/N]: ").strip().lower()
|
||
if confirm != 'y':
|
||
print("已取消")
|
||
return
|
||
|
||
# 连接数据库
|
||
print_section("连接数据库")
|
||
conn = test_db_connection(config)
|
||
if not conn:
|
||
return
|
||
|
||
print_result(True, "数据库连接成功")
|
||
|
||
try:
|
||
tenant_id = config['tenant_id']
|
||
dry_run = config['dry_run']
|
||
|
||
results = {}
|
||
|
||
# 更新模板层级结构
|
||
if config['update_hierarchy']:
|
||
hierarchy_result = update_template_hierarchy(conn, tenant_id, dry_run)
|
||
results['hierarchy'] = hierarchy_result
|
||
|
||
# 更新字段关联关系
|
||
if config['update_fields']:
|
||
fields_result = update_all_field_relations(conn, tenant_id, dry_run)
|
||
results['fields'] = fields_result
|
||
|
||
# 总结
|
||
print_section("更新完成")
|
||
if config['dry_run']:
|
||
print(" 本次为预览模式,未实际更新数据库")
|
||
else:
|
||
print(" 数据库已更新")
|
||
|
||
if 'hierarchy' in results:
|
||
h = results['hierarchy']
|
||
print(f"\n 层级结构:")
|
||
print(f" - 创建目录: {h['directories_created']} 个")
|
||
print(f" - 更新目录: {h['directories_updated']} 个")
|
||
print(f" - 创建文件: {h['files_created']} 个")
|
||
print(f" - 更新文件: {h['files_updated']} 个")
|
||
|
||
if 'fields' in results:
|
||
f = results['fields']
|
||
print(f"\n 字段关联:")
|
||
print(f" - 总模板数: {f['total_templates']} 个")
|
||
print(f" - 已更新: {f['updated']} 个")
|
||
print(f" - 保持不变: {f['kept']} 个")
|
||
print(f" - 发现的占位符: {f['placeholders_found']} 个")
|
||
print(f" - 创建的字段: {f['fields_created']} 个")
|
||
|
||
finally:
|
||
conn.close()
|
||
print_result(True, "数据库连接已关闭")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print("\n\n[中断] 用户取消操作")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
print(f"\n[错误] 发生异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
sys.exit(1)
|