ai-business-write/process_confidentiality_commitment_non_party.py

373 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
处理"6.1保密承诺书(谈话对象使用-非中共党员用).docx"
- 解析占位符
- 上传到MinIO
- 更新数据库
"""
import os
import sys
import re
import json
import pymysql
from minio import Minio
from minio.error import S3Error
from datetime import datetime
from pathlib import Path
from docx import Document
from typing import Dict, List, Optional, Tuple
# 设置输出编码为UTF-8Windows兼容
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# MinIO连接配置
MINIO_CONFIG = {
'endpoint': 'minio.datacubeworld.com:9000',
'access_key': 'JOLXFXny3avFSzB0uRA5',
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
'secure': True
}
# 数据库连接配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
# 固定值
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
BUCKET_NAME = 'finyx'
# 文件路径
TEMPLATE_FILE = 'template_finish/2-初核模版/2.谈话审批/走读式谈话流程/6.1保密承诺书(谈话对象使用-非中共党员用).docx'
PARENT_ID = 1765273962716807 # 走读式谈话流程的ID
TEMPLATE_NAME = '6.1保密承诺书(谈话对象使用-非中共党员用)'
def generate_id():
"""生成ID"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def extract_placeholders_from_docx(file_path: str) -> List[str]:
"""
从docx文件中提取所有占位符
Args:
file_path: docx文件路径
Returns:
占位符列表,格式: ['field_code1', 'field_code2', ...]
"""
placeholders = set()
pattern = r'\{\{([^}]+)\}\}' # 匹配 {{field_code}} 格式
try:
doc = Document(file_path)
# 从段落中提取占位符
for paragraph in doc.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
# 从表格中提取占位符
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
except Exception as e:
print(f" 错误: 读取文件失败 - {str(e)}")
return []
return sorted(list(placeholders))
def upload_to_minio(file_path: str, minio_client: Minio) -> str:
"""
上传文件到MinIO
Args:
file_path: 本地文件路径
minio_client: MinIO客户端实例
Returns:
MinIO中的相对路径
"""
try:
# 检查存储桶是否存在
found = minio_client.bucket_exists(BUCKET_NAME)
if not found:
raise Exception(f"存储桶 '{BUCKET_NAME}' 不存在,请先创建")
# 生成MinIO对象路径使用当前日期
now = datetime.now()
file_name = Path(file_path).name
object_name = f'{TENANT_ID}/TEMPLATE/{now.year}/{now.month:02d}/{file_name}'
# 上传文件
minio_client.fput_object(
BUCKET_NAME,
object_name,
file_path,
content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document'
)
# 返回相对路径(以/开头)
return f"/{object_name}"
except S3Error as e:
raise Exception(f"MinIO错误: {e}")
except Exception as e:
raise Exception(f"上传文件时发生错误: {e}")
def get_db_fields(conn) -> Dict[str, Dict]:
"""
获取数据库中所有字段field_type=2的输出字段
Returns:
字典key为filed_codevalue为字段信息
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type
FROM f_polic_field
WHERE tenant_id = %s AND field_type = 2
"""
cursor.execute(sql, (TENANT_ID,))
fields = cursor.fetchall()
result = {}
for field in fields:
result[field['filed_code']] = {
'id': field['id'],
'name': field['name'],
'filed_code': field['filed_code'],
'field_type': field['field_type']
}
cursor.close()
return result
def match_placeholders_to_fields(placeholders: List[str], fields: Dict[str, Dict]) -> Tuple[List[int], List[str]]:
"""
匹配占位符到数据库字段
Returns:
(匹配的字段ID列表, 未匹配的占位符列表)
"""
matched_field_ids = []
unmatched_placeholders = []
for placeholder in placeholders:
if placeholder in fields:
matched_field_ids.append(fields[placeholder]['id'])
else:
unmatched_placeholders.append(placeholder)
return matched_field_ids, unmatched_placeholders
def create_or_update_template(conn, template_name: str, file_path: str, minio_path: str, parent_id: Optional[int]) -> int:
"""
创建或更新模板记录
Returns:
模板ID
"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 查找是否已存在通过名称和parent_id匹配
sql = """
SELECT id, name, file_path, parent_id
FROM f_polic_file_config
WHERE tenant_id = %s AND name = %s AND parent_id = %s
"""
cursor.execute(sql, (TENANT_ID, template_name, parent_id))
existing = cursor.fetchone()
if existing:
# 更新现有记录
template_id = existing['id']
update_sql = """
UPDATE f_polic_file_config
SET file_path = %s, updated_time = NOW(), updated_by = %s, state = 1
WHERE id = %s AND tenant_id = %s
"""
cursor.execute(update_sql, (minio_path, UPDATED_BY, template_id, TENANT_ID))
conn.commit()
print(f" [UPDATE] 更新模板记录 (ID: {template_id})")
return template_id
else:
# 创建新记录
template_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_config
(id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
template_id,
TENANT_ID,
parent_id,
template_name,
None, # input_data
minio_path,
CREATED_BY,
CREATED_BY,
1 # state: 1表示启用
))
conn.commit()
print(f" [CREATE] 创建模板记录 (ID: {template_id})")
return template_id
except Exception as e:
conn.rollback()
raise Exception(f"创建或更新模板失败: {str(e)}")
finally:
cursor.close()
def update_template_field_relations(conn, template_id: int, field_ids: List[int]):
"""
更新模板-字段关联关系
"""
cursor = conn.cursor()
try:
# 删除旧的关联关系
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
# 插入新的关联关系
if field_ids:
insert_sql = """
INSERT INTO f_polic_file_field
(tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by)
VALUES (%s, %s, %s, NOW(), %s, NOW(), %s)
"""
for field_id in field_ids:
cursor.execute(insert_sql, (TENANT_ID, template_id, field_id, CREATED_BY, CREATED_BY))
conn.commit()
print(f" [UPDATE] 更新字段关联关系: {len(field_ids)} 个字段")
except Exception as e:
conn.rollback()
raise Exception(f"更新字段关联关系失败: {str(e)}")
finally:
cursor.close()
def main():
"""主函数"""
print("=" * 80)
print("处理保密承诺书(非中共党员用)模板")
print("=" * 80)
print()
# 检查文件是否存在
if not os.path.exists(TEMPLATE_FILE):
print(f"错误: 文件不存在 - {TEMPLATE_FILE}")
return
print(f"文件路径: {TEMPLATE_FILE}")
print()
try:
# 1. 提取占位符
print("1. 提取占位符...")
placeholders = extract_placeholders_from_docx(TEMPLATE_FILE)
print(f" 找到 {len(placeholders)} 个占位符:")
for i, placeholder in enumerate(placeholders, 1):
print(f" {i}. {{{{ {placeholder} }}}}")
print()
# 2. 连接数据库和MinIO
print("2. 连接数据库和MinIO...")
conn = pymysql.connect(**DB_CONFIG)
minio_client = Minio(
MINIO_CONFIG['endpoint'],
access_key=MINIO_CONFIG['access_key'],
secret_key=MINIO_CONFIG['secret_key'],
secure=MINIO_CONFIG['secure']
)
print(" [OK] 连接成功\n")
# 3. 获取数据库字段
print("3. 获取数据库字段...")
db_fields = get_db_fields(conn)
print(f" [OK] 找到 {len(db_fields)} 个输出字段\n")
# 4. 匹配占位符到字段
print("4. 匹配占位符到字段...")
matched_field_ids, unmatched_placeholders = match_placeholders_to_fields(placeholders, db_fields)
print(f" 匹配成功: {len(matched_field_ids)}")
print(f" 未匹配: {len(unmatched_placeholders)}")
if unmatched_placeholders:
print(f" 未匹配的占位符: {', '.join(unmatched_placeholders)}")
print()
# 5. 上传到MinIO
print("5. 上传到MinIO...")
minio_path = upload_to_minio(TEMPLATE_FILE, minio_client)
print(f" [OK] MinIO路径: {minio_path}\n")
# 6. 创建或更新数据库记录
print("6. 创建或更新数据库记录...")
template_id = create_or_update_template(conn, TEMPLATE_NAME, TEMPLATE_FILE, minio_path, PARENT_ID)
print(f" [OK] 模板ID: {template_id}\n")
# 7. 更新字段关联关系
print("7. 更新字段关联关系...")
update_template_field_relations(conn, template_id, matched_field_ids)
print()
print("=" * 80)
print("处理完成!")
print("=" * 80)
print(f"模板ID: {template_id}")
print(f"MinIO路径: {minio_path}")
print(f"关联字段数: {len(matched_field_ids)}")
except Exception as e:
print(f"\n[ERROR] 发生错误: {e}")
import traceback
traceback.print_exc()
if 'conn' in locals():
conn.rollback()
finally:
if 'conn' in locals():
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
main()