ai-business-write/validate_and_update_templates.py

610 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
重新校验数据库中模板和数据字段对应关系
删除旧的或者无效的模板信息
根据template_finish文件夹下的模板文件重新上传模板到minio并更新数据库
"""
import os
import re
import json
import sys
import pymysql
from minio import Minio
from minio.error import S3Error
from datetime import datetime
from pathlib import Path
from docx import Document
from typing import Dict, List, Set, Optional, Tuple
from collections import defaultdict
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# MinIO连接配置
MINIO_CONFIG = {
'endpoint': 'minio.datacubeworld.com:9000',
'access_key': 'JOLXFXny3avFSzB0uRA5',
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
'secure': True
}
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
# 固定值
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
BUCKET_NAME = 'finyx'
TEMPLATE_BASE_DIR = 'template_finish'
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def extract_placeholders_from_docx(file_path: str) -> List[str]:
"""
从docx文件中提取所有占位符
Args:
file_path: docx文件路径
Returns:
占位符列表,格式: ['field_code1', 'field_code2', ...]
"""
placeholders = set()
pattern = r'\{\{([^}]+)\}\}' # 匹配 {{field_code}} 格式
try:
doc = Document(file_path)
# 从段落中提取占位符
for paragraph in doc.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
# 从表格中提取占位符
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
except Exception as e:
print(f" 错误: 读取文件失败 - {str(e)}")
return []
return sorted(list(placeholders))
def normalize_template_name(file_name: str) -> str:
"""
标准化模板名称(去掉扩展名、括号内容、数字前缀等)
Args:
file_name: 文件名,如 "2.初步核实审批表XXX.docx"
Returns:
标准化后的名称,如 "初步核实审批表"
"""
# 去掉扩展名
name = Path(file_name).stem
# 去掉括号内容
name = re.sub(r'[(].*?[)]', '', name)
name = name.strip()
# 去掉数字前缀和点号
name = re.sub(r'^\d+[\.\-]?\s*', '', name)
name = name.strip()
return name
def scan_template_files(base_dir: str) -> Dict[str, Dict]:
"""
扫描模板文件夹,提取所有模板文件信息
Args:
base_dir: 模板文件夹路径
Returns:
字典key为文件相对路径value为模板信息
"""
base_path = Path(base_dir)
if not base_path.exists():
print(f"错误: 目录不存在 - {base_dir}")
return {}
templates = {}
print("=" * 80)
print("扫描模板文件...")
print("=" * 80)
for docx_file in sorted(base_path.rglob("*.docx")):
# 跳过临时文件
if docx_file.name.startswith("~$"):
continue
relative_path = docx_file.relative_to(base_path)
file_name = docx_file.name
print(f"\n处理文件: {relative_path}")
# 提取占位符
placeholders = extract_placeholders_from_docx(str(docx_file))
print(f" 占位符数量: {len(placeholders)}")
if placeholders:
print(f" 占位符: {', '.join(placeholders[:10])}{'...' if len(placeholders) > 10 else ''}")
# 标准化模板名称
normalized_name = normalize_template_name(file_name)
templates[str(relative_path)] = {
'file_path': str(docx_file),
'relative_path': str(relative_path),
'file_name': file_name,
'normalized_name': normalized_name,
'placeholders': placeholders
}
print(f"\n总共扫描到 {len(templates)} 个模板文件")
return templates
def get_database_templates(conn) -> Dict[int, Dict]:
"""获取数据库中的所有模板配置"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, file_path, parent_id, state, input_data
FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
result = {}
for template in templates:
result[template['id']] = {
'id': template['id'],
'name': template['name'],
'file_path': template['file_path'],
'parent_id': template['parent_id'],
'state': template['state'],
'input_data': template['input_data']
}
cursor.close()
return result
def get_database_fields(conn) -> Dict[str, Dict]:
"""
获取数据库中的所有字段定义
Returns:
字典key为field_codevalue为字段信息
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
fields = cursor.fetchall()
result = {}
for field in fields:
field_code = field['filed_code']
result[field_code] = {
'id': field['id'],
'name': field['name'],
'field_code': field_code,
'field_type': field['field_type'],
'state': field['state']
}
cursor.close()
return result
def match_placeholders_to_fields(placeholders: List[str], fields: Dict[str, Dict]) -> Tuple[List[int], List[str]]:
"""
匹配占位符到数据库字段
Args:
placeholders: 占位符列表field_code
fields: 数据库字段字典
Returns:
(匹配的字段ID列表, 未匹配的占位符列表)
"""
matched_field_ids = []
unmatched_placeholders = []
for placeholder in placeholders:
field = fields.get(placeholder)
if field:
# 只匹配输出字段field_type=2
if field['field_type'] == 2:
matched_field_ids.append(field['id'])
else:
print(f" [WARN] 警告: 占位符 {placeholder} 对应的字段类型为 {field['field_type']},不是输出字段")
unmatched_placeholders.append(placeholder)
else:
unmatched_placeholders.append(placeholder)
return matched_field_ids, unmatched_placeholders
def upload_to_minio(client: Minio, file_path: str, template_name: str) -> str:
"""上传文件到MinIO"""
try:
now = datetime.now()
object_name = f'{TENANT_ID}/TEMPLATE/{now.year}/{now.month:02d}/{template_name}'
client.fput_object(
BUCKET_NAME,
object_name,
file_path,
content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document'
)
return f"/{object_name}"
except Exception as e:
raise Exception(f"上传到MinIO失败: {str(e)}")
def find_template_by_name(conn, template_name: str) -> Optional[int]:
"""根据模板名称查找数据库中的模板ID"""
cursor = conn.cursor()
try:
sql = """
SELECT id FROM f_polic_file_config
WHERE tenant_id = %s AND name = %s
"""
cursor.execute(sql, (TENANT_ID, template_name))
result = cursor.fetchone()
return result[0] if result else None
finally:
cursor.close()
def create_or_update_template(conn, template_info: Dict, file_path: str, minio_path: str) -> int:
"""
创建或更新模板配置
Returns:
模板ID
"""
cursor = conn.cursor()
try:
# 检查是否已存在
existing_id = find_template_by_name(conn, template_info['normalized_name'])
# 准备input_data
input_data = json.dumps({
'template_code': template_info.get('template_code', ''),
'business_type': 'INVESTIGATION',
'placeholders': template_info['placeholders']
}, ensure_ascii=False)
if existing_id:
# 更新现有记录
update_sql = """
UPDATE f_polic_file_config
SET file_path = %s, input_data = %s, updated_time = NOW(), updated_by = %s, state = 1
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (
minio_path,
input_data,
UPDATED_BY,
existing_id,
TENANT_ID
))
print(f" [OK] 更新模板配置: {template_info['normalized_name']}, ID: {existing_id}")
conn.commit()
return existing_id
else:
# 创建新记录
template_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
template_id,
TENANT_ID,
template_info.get('parent_id'),
template_info['normalized_name'],
input_data,
minio_path,
CREATED_BY,
CREATED_BY,
1 # state: 1表示启用
))
print(f" [OK] 创建模板配置: {template_info['normalized_name']}, ID: {template_id}")
conn.commit()
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"创建或更新模板配置失败: {str(e)}")
finally:
cursor.close()
def update_template_field_relations(conn, template_id: int, field_ids: List[int]):
"""
更新模板和字段的关联关系
Args:
template_id: 模板ID
field_ids: 字段ID列表
"""
cursor = conn.cursor()
try:
# 删除旧的关联关系
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
deleted_count = cursor.rowcount
# 创建新的关联关系
created_count = 0
for field_id in field_ids:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
relation_id, TENANT_ID, template_id, field_id,
CREATED_BY, UPDATED_BY, 1 # state=1 表示启用
))
created_count += 1
conn.commit()
print(f" [OK] 更新字段关联: 删除 {deleted_count} 条,创建 {created_count}")
except Exception as e:
conn.rollback()
raise Exception(f"更新字段关联失败: {str(e)}")
finally:
cursor.close()
def mark_invalid_templates(conn, valid_template_names: Set[str]):
"""
标记无效的模板不在template_finish文件夹中的模板
Args:
conn: 数据库连接
valid_template_names: 有效的模板名称集合
"""
cursor = conn.cursor()
try:
# 查找所有模板
sql = """
SELECT id, name FROM f_polic_file_config
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
all_templates = cursor.fetchall()
invalid_count = 0
for template in all_templates:
template_id = template[0]
template_name = template[1]
# 标准化名称进行匹配
normalized_name = normalize_template_name(template_name)
# 检查是否在有效模板列表中
is_valid = False
for valid_name in valid_template_names:
if normalized_name == normalize_template_name(valid_name) or normalized_name in valid_name or valid_name in normalized_name:
is_valid = True
break
if not is_valid:
# 标记为未启用
update_sql = """
UPDATE f_polic_file_config
SET state = 0, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (UPDATED_BY, template_id, TENANT_ID))
invalid_count += 1
print(f" [WARN] 标记无效模板: {template_name} (ID: {template_id})")
conn.commit()
print(f"\n总共标记 {invalid_count} 个无效模板")
except Exception as e:
conn.rollback()
raise Exception(f"标记无效模板失败: {str(e)}")
finally:
cursor.close()
def main():
"""主函数"""
print("=" * 80)
print("重新校验和更新模板配置")
print("=" * 80)
print()
try:
# 连接数据库和MinIO
print("1. 连接数据库和MinIO...")
conn = pymysql.connect(**DB_CONFIG)
minio_client = Minio(
MINIO_CONFIG['endpoint'],
access_key=MINIO_CONFIG['access_key'],
secret_key=MINIO_CONFIG['secret_key'],
secure=MINIO_CONFIG['secure']
)
# 检查存储桶
if not minio_client.bucket_exists(BUCKET_NAME):
print(f"错误: 存储桶 '{BUCKET_NAME}' 不存在")
return
print(f"[OK] 数据库连接成功")
print(f"[OK] MinIO存储桶 '{BUCKET_NAME}' 已存在\n")
# 扫描模板文件
print("2. 扫描模板文件...")
template_files = scan_template_files(TEMPLATE_BASE_DIR)
if not template_files:
print("错误: 未找到任何模板文件")
return
# 获取数据库中的模板和字段
print("\n3. 获取数据库中的模板和字段...")
db_templates = get_database_templates(conn)
db_fields = get_database_fields(conn)
print(f" 数据库中的模板数: {len(db_templates)}")
print(f" 数据库中的字段数: {len(db_fields)}")
# 标记无效模板
print("\n4. 标记无效模板...")
valid_template_names = {info['normalized_name'] for info in template_files.values()}
mark_invalid_templates(conn, valid_template_names)
# 处理每个模板文件
print("\n5. 处理模板文件...")
print("=" * 80)
success_count = 0
failed_count = 0
failed_files = []
for relative_path, template_info in template_files.items():
file_name = template_info['file_name']
normalized_name = template_info['normalized_name']
placeholders = template_info['placeholders']
file_path = template_info['file_path']
print(f"\n处理模板: {normalized_name}")
print(f" 文件: {relative_path}")
print(f" 占位符数量: {len(placeholders)}")
try:
# 匹配占位符到字段
matched_field_ids, unmatched_placeholders = match_placeholders_to_fields(placeholders, db_fields)
if unmatched_placeholders:
print(f" [WARN] 警告: {len(unmatched_placeholders)} 个占位符未匹配到字段:")
for placeholder in unmatched_placeholders[:5]: # 只显示前5个
print(f" - {{{{ {placeholder} }}}}")
if len(unmatched_placeholders) > 5:
print(f" ... 还有 {len(unmatched_placeholders) - 5}")
if not matched_field_ids:
print(f" [WARN] 警告: 没有匹配到任何字段,但仍会上传模板")
# 即使没有字段,也继续处理(上传模板和更新数据库)
print(f" [OK] 匹配到 {len(matched_field_ids)} 个字段")
# 上传到MinIO
print(f" 正在上传到MinIO...")
minio_path = upload_to_minio(minio_client, file_path, file_name)
print(f" [OK] 上传成功: {minio_path}")
# 创建或更新模板配置
print(f" 正在更新数据库...")
template_id = create_or_update_template(conn, template_info, file_path, minio_path)
# 更新字段关联(如果有匹配的字段)
if matched_field_ids:
update_template_field_relations(conn, template_id, matched_field_ids)
else:
# 即使没有字段,也删除旧的关联关系
cursor = conn.cursor()
try:
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
conn.commit()
print(f" [OK] 清理旧的字段关联: 删除 {cursor.rowcount}")
finally:
cursor.close()
success_count += 1
except Exception as e:
failed_count += 1
failed_files.append((file_name, str(e)))
print(f" [ERROR] 处理失败: {str(e)}")
# 打印汇总
print("\n" + "=" * 80)
print("处理汇总")
print("=" * 80)
print(f"总文件数: {len(template_files)}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
if failed_files:
print("\n失败的文件:")
for file_name, error in failed_files:
print(f" - {file_name}: {error}")
print("\n" + "=" * 80)
print("处理完成!")
print("=" * 80)
except Exception as e:
print(f"\n[ERROR] 发生错误: {e}")
import traceback
traceback.print_exc()
if 'conn' in locals():
conn.rollback()
finally:
if 'conn' in locals():
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
main()