优化AI服务的内容提取逻辑,增强对缺失字段的推断能力,改进后处理机制以提升数据提取的准确性和完整性。

This commit is contained in:
python 2025-12-10 09:49:57 +08:00
parent 6871c2e803
commit cd27bb4bd0
2 changed files with 503 additions and 0 deletions

231
enable_all_fields.py Normal file
View File

@ -0,0 +1,231 @@
"""
启用f_polic_field表中所有字段将state更新为1
"""
import pymysql
import os
from datetime import datetime
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
UPDATED_BY = 655162080928945152
CURRENT_TIME = datetime.now()
def check_field_states(conn):
"""检查字段状态统计"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 统计各状态的字段数量使用CAST来正确处理二进制类型
sql = """
SELECT
CAST(state AS UNSIGNED) as state_int,
field_type,
COUNT(*) as count
FROM f_polic_field
WHERE tenant_id = %s
GROUP BY CAST(state AS UNSIGNED), field_type
ORDER BY field_type, CAST(state AS UNSIGNED)
"""
cursor.execute(sql, (TENANT_ID,))
stats = cursor.fetchall()
cursor.close()
return stats
def get_fields_by_state(conn, state):
"""获取指定状态的字段列表"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type, CAST(state AS UNSIGNED) as state_int
FROM f_polic_field
WHERE tenant_id = %s
AND CAST(state AS UNSIGNED) = %s
ORDER BY field_type, name
"""
cursor.execute(sql, (TENANT_ID, state))
fields = cursor.fetchall()
cursor.close()
return fields
def enable_all_fields(conn, dry_run=True):
"""启用所有字段"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 查询需要更新的字段使用CAST来正确处理二进制类型
sql = """
SELECT id, name, filed_code, field_type, CAST(state AS UNSIGNED) as state_int
FROM f_polic_field
WHERE tenant_id = %s
AND CAST(state AS UNSIGNED) != 1
ORDER BY field_type, name
"""
cursor.execute(sql, (TENANT_ID,))
fields_to_update = cursor.fetchall()
if not fields_to_update:
print("✓ 所有字段已经是启用状态,无需更新")
cursor.close()
return 0
print(f"\n找到 {len(fields_to_update)} 个需要启用的字段:")
for field in fields_to_update:
field_type_str = "输出字段" if field['field_type'] == 2 else "输入字段"
print(f" - {field['name']} ({field['filed_code']}) [{field_type_str}] (当前state={field['state_int']})")
if dry_run:
print("\n⚠ 这是预览模式dry_run=True不会实际更新数据库")
cursor.close()
return len(fields_to_update)
# 执行更新使用CAST来正确比较
update_sql = """
UPDATE f_polic_field
SET state = 1, updated_time = %s, updated_by = %s
WHERE tenant_id = %s
AND CAST(state AS UNSIGNED) != 1
"""
cursor.execute(update_sql, (CURRENT_TIME, UPDATED_BY, TENANT_ID))
updated_count = cursor.rowcount
conn.commit()
cursor.close()
return updated_count
def main():
"""主函数"""
print("="*80)
print("启用f_polic_field表中所有字段")
print("="*80)
print()
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功")
except Exception as e:
print(f"✗ 数据库连接失败: {str(e)}")
return
try:
# 1. 检查当前状态统计
print("\n正在检查字段状态统计...")
stats = check_field_states(conn)
print("\n字段状态统计:")
total_fields = 0
enabled_fields = 0
disabled_fields = 0
for stat in stats:
state_int = stat['state_int']
field_type = stat['field_type']
count = stat['count']
total_fields += count
state_str = "启用" if state_int == 1 else "未启用"
type_str = "输出字段" if field_type == 2 else "输入字段"
print(f" {type_str} - {state_str} (state={state_int}): {count}")
if state_int == 1:
enabled_fields += count
else:
disabled_fields += count
print(f"\n总计: {total_fields} 个字段")
print(f" 启用: {enabled_fields}")
print(f" 未启用: {disabled_fields}")
# 2. 显示未启用的字段详情
if disabled_fields > 0:
print(f"\n正在查询未启用的字段详情...")
disabled_fields_list = get_fields_by_state(conn, 0)
print(f"\n未启用的字段列表 ({len(disabled_fields_list)} 个):")
for field in disabled_fields_list:
field_type_str = "输出字段" if field['field_type'] == 2 else "输入字段"
print(f" - {field['name']} ({field['filed_code']}) [{field_type_str}]")
# 3. 预览更新dry_run
print("\n" + "="*80)
print("预览更新(不会实际修改数据库)")
print("="*80)
count_to_update = enable_all_fields(conn, dry_run=True)
if count_to_update == 0:
print("\n所有字段已经是启用状态,无需更新")
return
# 4. 确认是否执行更新
print("\n" + "="*80)
print("准备执行更新")
print("="*80)
print(f"将更新 {count_to_update} 个字段的状态为启用state=1")
# 实际执行更新
print("\n正在执行更新...")
updated_count = enable_all_fields(conn, dry_run=False)
print(f"\n✓ 更新成功!共更新 {updated_count} 个字段")
# 5. 验证更新结果
print("\n正在验证更新结果...")
final_stats = check_field_states(conn)
print("\n更新后的字段状态统计:")
final_enabled = 0
final_disabled = 0
for stat in final_stats:
state_int = stat['state_int']
field_type = stat['field_type']
count = stat['count']
state_str = "启用" if state_int == 1 else "未启用"
type_str = "输出字段" if field_type == 2 else "输入字段"
print(f" {type_str} - {state_str} (state={state_int}): {count}")
if state_int == 1:
final_enabled += count
else:
final_disabled += count
print(f"\n总计: {final_enabled + final_disabled} 个字段")
print(f" 启用: {final_enabled}")
print(f" 未启用: {final_disabled}")
if final_disabled == 0:
print("\n✓ 所有字段已成功启用!")
else:
print(f"\n⚠ 仍有 {final_disabled} 个字段未启用")
print("\n" + "="*80)
print("操作完成!")
print("="*80)
except Exception as e:
print(f"\n✗ 处理失败: {str(e)}")
import traceback
traceback.print_exc()
conn.rollback()
finally:
conn.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,272 @@
"""
修复"1.请示报告卡(初核谈话)"模板的input_data字段
分析模板占位符根据数据库字段对应关系生成input_data并更新数据库
"""
import pymysql
import json
import os
import re
from datetime import datetime
from pathlib import Path
from docx import Document
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
UPDATED_BY = 655162080928945152
CURRENT_TIME = datetime.now()
# 模板信息
TEMPLATE_NAME = "1.请示报告卡(初核谈话)"
TEMPLATE_CODE = "REPORT_CARD_INTERVIEW"
BUSINESS_TYPE = "INVESTIGATION"
TEMPLATE_FILE_PATH = "template_finish/2-初核模版/2.谈话审批/走读式谈话审批/1.请示报告卡(初核谈话).docx"
def extract_placeholders_from_docx(file_path):
"""从docx文件中提取所有占位符"""
placeholders = set()
pattern = r'\{\{([^}]+)\}\}'
try:
doc = Document(file_path)
# 从段落中提取占位符
for paragraph in doc.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
# 从表格中提取占位符
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
text = paragraph.text
matches = re.findall(pattern, text)
for match in matches:
placeholders.add(match.strip())
except Exception as e:
print(f" 错误: 读取文件失败 - {str(e)}")
return []
return sorted(list(placeholders))
def get_template_config(conn):
"""查询模板配置"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, template_code, input_data, file_path, state
FROM f_polic_file_config
WHERE tenant_id = %s AND name = %s
"""
cursor.execute(sql, (TENANT_ID, TEMPLATE_NAME))
config = cursor.fetchone()
cursor.close()
return config
def get_template_fields(conn, file_config_id):
"""查询模板关联的字段"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT f.id, f.name, f.filed_code as field_code, f.field_type
FROM f_polic_field f
INNER JOIN f_polic_file_field ff ON f.id = ff.filed_id
WHERE ff.file_id = %s
AND f.tenant_id = %s
ORDER BY f.field_type, f.name
"""
cursor.execute(sql, (file_config_id, TENANT_ID))
fields = cursor.fetchall()
cursor.close()
return fields
def verify_placeholders_in_database(conn, placeholders):
"""验证占位符是否在数据库中存在对应的字段"""
if not placeholders:
return {}
cursor = conn.cursor(pymysql.cursors.DictCursor)
placeholders_list = list(placeholders)
placeholders_str = ','.join(['%s'] * len(placeholders_list))
# 查询所有字段(包括未启用的)
sql = f"""
SELECT id, name, filed_code as field_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
AND filed_code IN ({placeholders_str})
"""
cursor.execute(sql, [TENANT_ID] + placeholders_list)
fields = cursor.fetchall()
cursor.close()
# 构建字段映射
field_map = {f['field_code']: f for f in fields}
# 检查缺失的字段
missing_fields = set(placeholders) - set(field_map.keys())
return {
'found_fields': field_map,
'missing_fields': missing_fields
}
def update_input_data(conn, file_config_id, input_data):
"""更新input_data字段"""
cursor = conn.cursor()
input_data_str = json.dumps(input_data, ensure_ascii=False)
update_sql = """
UPDATE f_polic_file_config
SET input_data = %s, updated_time = %s, updated_by = %s
WHERE id = %s
"""
cursor.execute(update_sql, (input_data_str, CURRENT_TIME, UPDATED_BY, file_config_id))
conn.commit()
cursor.close()
def main():
"""主函数"""
print("="*80)
print("修复'1.请示报告卡(初核谈话)'模板的input_data字段")
print("="*80)
print()
# 1. 检查模板文件是否存在
template_path = Path(TEMPLATE_FILE_PATH)
if not template_path.exists():
print(f"✗ 错误: 模板文件不存在 - {TEMPLATE_FILE_PATH}")
return
print(f"✓ 找到模板文件: {TEMPLATE_FILE_PATH}")
# 2. 提取占位符
print("\n正在提取占位符...")
placeholders = extract_placeholders_from_docx(str(template_path))
print(f"✓ 找到 {len(placeholders)} 个占位符:")
for i, placeholder in enumerate(placeholders, 1):
print(f" {i}. {{{{ {placeholder} }}}}")
# 3. 连接数据库
print("\n正在连接数据库...")
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功")
except Exception as e:
print(f"✗ 数据库连接失败: {str(e)}")
return
try:
# 4. 查询模板配置
print(f"\n正在查询模板配置: {TEMPLATE_NAME}")
config = get_template_config(conn)
if not config:
print(f"✗ 未找到模板配置: {TEMPLATE_NAME}")
return
print(f"✓ 找到模板配置:")
print(f" ID: {config['id']}")
print(f" 名称: {config['name']}")
print(f" 当前template_code: {config.get('template_code', 'NULL')}")
print(f" 当前input_data: {config.get('input_data', 'NULL')}")
print(f" 文件路径: {config.get('file_path', 'NULL')}")
print(f" 状态: {config.get('state', 0)}")
file_config_id = config['id']
# 5. 查询模板关联的字段
print(f"\n正在查询模板关联的字段...")
template_fields = get_template_fields(conn, file_config_id)
print(f"✓ 找到 {len(template_fields)} 个关联字段:")
for field in template_fields:
field_type_str = "输出字段" if field['field_type'] == 2 else "输入字段"
print(f" - {field['name']} ({field['field_code']}) [{field_type_str}]")
# 6. 验证占位符是否在数据库中存在
print(f"\n正在验证占位符...")
verification = verify_placeholders_in_database(conn, placeholders)
found_fields = verification['found_fields']
missing_fields = verification['missing_fields']
print(f"✓ 在数据库中找到 {len(found_fields)} 个字段:")
for field_code, field in found_fields.items():
field_type_str = "输出字段" if field['field_type'] == 2 else "输入字段"
state_str = "启用" if field.get('state', 0) == 1 else "未启用"
print(f" - {field['name']} ({field_code}) [{field_type_str}] [状态: {state_str}]")
if missing_fields:
print(f"\n⚠ 警告: 以下占位符在数据库中未找到对应字段:")
for field_code in missing_fields:
print(f" - {field_code}")
print("\n这些占位符仍会被包含在input_data中但可能无法正确填充。")
# 7. 生成input_data
print(f"\n正在生成input_data...")
input_data = {
'template_code': TEMPLATE_CODE,
'business_type': BUSINESS_TYPE,
'placeholders': placeholders
}
print(f"✓ input_data内容:")
print(json.dumps(input_data, ensure_ascii=False, indent=2))
# 8. 更新数据库
print(f"\n正在更新数据库...")
update_input_data(conn, file_config_id, input_data)
print(f"✓ 更新成功!")
# 9. 验证更新结果
print(f"\n正在验证更新结果...")
updated_config = get_template_config(conn)
if updated_config:
try:
updated_input_data = json.loads(updated_config['input_data'])
if updated_input_data.get('template_code') == TEMPLATE_CODE:
print(f"✓ 验证成功: template_code = {TEMPLATE_CODE}")
if updated_input_data.get('business_type') == BUSINESS_TYPE:
print(f"✓ 验证成功: business_type = {BUSINESS_TYPE}")
if set(updated_input_data.get('placeholders', [])) == set(placeholders):
print(f"✓ 验证成功: placeholders 匹配")
except Exception as e:
print(f"⚠ 验证时出错: {str(e)}")
print("\n" + "="*80)
print("修复完成!")
print("="*80)
except Exception as e:
print(f"\n✗ 处理失败: {str(e)}")
import traceback
traceback.print_exc()
finally:
conn.close()
if __name__ == '__main__':
main()