更新下载链接生成逻辑,修改文件路径以反映最新的文档版本。同时,在文档服务中添加文件路径有效性检查,确保模板文件路径不为空,提升错误处理能力。

This commit is contained in:
python 2025-12-10 19:02:20 +08:00
parent 0563ff5346
commit ebc1154beb
7 changed files with 2138 additions and 3 deletions

View File

@ -0,0 +1,551 @@
"""
检查并修复 f_polic_file_field 表的关联关系
1. 检查无效的关联关联到不存在的 file_id filed_id
2. 检查重复的关联关系
3. 检查关联到已删除或未启用的字段/文件
4. 根据其他表的数据更新关联关系
"""
import pymysql
import os
from typing import Dict, List, Tuple
from collections import defaultdict
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
def check_invalid_relations(conn) -> Dict:
"""检查无效的关联关系(关联到不存在的 file_id 或 filed_id"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("1. 检查无效的关联关系")
print("="*80)
# 检查关联到不存在的 file_id
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, fff.tenant_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
WHERE fff.tenant_id = %s AND fc.id IS NULL
""", (TENANT_ID,))
invalid_file_relations = cursor.fetchall()
# 检查关联到不存在的 filed_id
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, fff.tenant_id
FROM f_polic_file_field fff
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND f.id IS NULL
""", (TENANT_ID,))
invalid_field_relations = cursor.fetchall()
print(f"\n关联到不存在的 file_id: {len(invalid_file_relations)}")
if invalid_file_relations:
print(" 详情:")
for rel in invalid_file_relations[:10]:
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
if len(invalid_file_relations) > 10:
print(f" ... 还有 {len(invalid_file_relations) - 10}")
print(f"\n关联到不存在的 filed_id: {len(invalid_field_relations)}")
if invalid_field_relations:
print(" 详情:")
for rel in invalid_field_relations[:10]:
print(f" - 关联ID: {rel['id']}, file_id: {rel['file_id']}, filed_id: {rel['filed_id']}")
if len(invalid_field_relations) > 10:
print(f" ... 还有 {len(invalid_field_relations) - 10}")
return {
'invalid_file_relations': invalid_file_relations,
'invalid_field_relations': invalid_field_relations
}
def check_duplicate_relations(conn) -> Dict:
"""检查重复的关联关系(相同的 file_id 和 filed_id"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("2. 检查重复的关联关系")
print("="*80)
# 查找重复的关联关系
cursor.execute("""
SELECT file_id, filed_id, COUNT(*) as count, GROUP_CONCAT(id ORDER BY id) as ids
FROM f_polic_file_field
WHERE tenant_id = %s
GROUP BY file_id, filed_id
HAVING COUNT(*) > 1
ORDER BY count DESC
""", (TENANT_ID,))
duplicates = cursor.fetchall()
print(f"\n发现 {len(duplicates)} 个重复的关联关系:")
duplicate_details = []
for dup in duplicates:
ids = [int(id_str) for id_str in dup['ids'].split(',')]
duplicate_details.append({
'file_id': dup['file_id'],
'filed_id': dup['filed_id'],
'count': dup['count'],
'ids': ids
})
print(f"\n 文件ID: {dup['file_id']}, 字段ID: {dup['filed_id']} (共 {dup['count']} 条)")
print(f" 关联ID列表: {ids}")
return {
'duplicates': duplicate_details
}
def check_disabled_relations(conn) -> Dict:
"""检查关联到已删除或未启用的字段/文件"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("3. 检查关联到已删除或未启用的字段/文件")
print("="*80)
# 检查关联到未启用的文件
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, fc.name as file_name, fc.state as file_state
FROM f_polic_file_field fff
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
WHERE fff.tenant_id = %s AND fc.state = 0
""", (TENANT_ID,))
disabled_file_relations = cursor.fetchall()
# 检查关联到未启用的字段
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id, f.name as field_name, f.filed_code, f.state as field_state
FROM f_polic_file_field fff
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND f.state = 0
""", (TENANT_ID,))
disabled_field_relations = cursor.fetchall()
print(f"\n关联到未启用的文件: {len(disabled_file_relations)}")
if disabled_file_relations:
print(" 详情:")
for rel in disabled_file_relations[:10]:
print(f" - 关联ID: {rel['id']}, 文件: {rel['file_name']} (ID: {rel['file_id']})")
if len(disabled_file_relations) > 10:
print(f" ... 还有 {len(disabled_file_relations) - 10}")
print(f"\n关联到未启用的字段: {len(disabled_field_relations)}")
if disabled_field_relations:
print(" 详情:")
for rel in disabled_field_relations[:10]:
print(f" - 关联ID: {rel['id']}, 字段: {rel['field_name']} ({rel['filed_code']}, ID: {rel['filed_id']})")
if len(disabled_field_relations) > 10:
print(f" ... 还有 {len(disabled_field_relations) - 10}")
return {
'disabled_file_relations': disabled_file_relations,
'disabled_field_relations': disabled_field_relations
}
def check_missing_relations(conn) -> Dict:
"""检查应该存在但缺失的关联关系(文件节点应该有输出字段关联)"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("4. 检查缺失的关联关系")
print("="*80)
# 获取所有有 template_code 的文件节点(这些应该是文件,不是目录)
cursor.execute("""
SELECT fc.id, fc.name, fc.template_code
FROM f_polic_file_config fc
WHERE fc.tenant_id = %s AND fc.template_code IS NOT NULL AND fc.state = 1
""", (TENANT_ID,))
file_configs = cursor.fetchall()
# 获取所有启用的输出字段
cursor.execute("""
SELECT id, name, filed_code
FROM f_polic_field
WHERE tenant_id = %s AND field_type = 2 AND state = 1
""", (TENANT_ID,))
output_fields = cursor.fetchall()
# 获取现有的关联关系
cursor.execute("""
SELECT file_id, filed_id
FROM f_polic_file_field
WHERE tenant_id = %s
""", (TENANT_ID,))
existing_relations = {(rel['file_id'], rel['filed_id']) for rel in cursor.fetchall()}
print(f"\n文件节点总数: {len(file_configs)}")
print(f"输出字段总数: {len(output_fields)}")
print(f"现有关联关系总数: {len(existing_relations)}")
# 这里不自动创建缺失的关联,因为不是所有文件都需要所有字段
# 只显示统计信息
print("\n注意: 缺失的关联关系需要根据业务逻辑手动创建")
return {
'file_configs': file_configs,
'output_fields': output_fields,
'existing_relations': existing_relations
}
def check_field_type_consistency(conn) -> Dict:
"""检查关联关系的字段类型一致性f_polic_file_field 应该只关联输出字段)"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("5. 检查字段类型一致性")
print("="*80)
# 检查是否关联了输入字段field_type=1
cursor.execute("""
SELECT fff.id, fff.file_id, fff.filed_id,
fc.name as file_name, fc.template_code, f.name as field_name, f.filed_code, f.field_type
FROM f_polic_file_field fff
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id
WHERE fff.tenant_id = %s AND f.field_type = 1
ORDER BY fc.name, f.name
""", (TENANT_ID,))
input_field_relations = cursor.fetchall()
print(f"\n关联到输入字段 (field_type=1) 的记录: {len(input_field_relations)}")
if input_field_relations:
print(" 注意: f_polic_file_field 表通常只应该关联输出字段 (field_type=2)")
print(" 根据业务逻辑,输入字段不需要通过此表关联")
print(" 详情:")
for rel in input_field_relations:
print(f" - 关联ID: {rel['id']}, 文件: {rel['file_name']} (code: {rel['template_code']}), "
f"字段: {rel['field_name']} ({rel['filed_code']}, type={rel['field_type']})")
else:
print(" ✓ 所有关联都是输出字段")
return {
'input_field_relations': input_field_relations
}
def fix_invalid_relations(conn, dry_run: bool = True) -> Dict:
"""修复无效的关联关系"""
cursor = conn.cursor()
print("\n" + "="*80)
print("修复无效的关联关系")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
# 获取无效的关联
invalid_file_relations = check_invalid_relations(conn)['invalid_file_relations']
invalid_field_relations = check_invalid_relations(conn)['invalid_field_relations']
all_invalid_ids = set()
for rel in invalid_file_relations:
all_invalid_ids.add(rel['id'])
for rel in invalid_field_relations:
all_invalid_ids.add(rel['id'])
if not all_invalid_ids:
print("\n✓ 没有无效的关联关系需要删除")
return {'deleted': 0}
print(f"\n准备删除 {len(all_invalid_ids)} 条无效的关联关系")
if not dry_run:
placeholders = ','.join(['%s'] * len(all_invalid_ids))
cursor.execute(f"""
DELETE FROM f_polic_file_field
WHERE id IN ({placeholders})
""", list(all_invalid_ids))
conn.commit()
print(f"✓ 已删除 {cursor.rowcount} 条无效的关联关系")
else:
print(f"[DRY RUN] 将删除以下关联ID: {sorted(all_invalid_ids)}")
return {'deleted': len(all_invalid_ids) if not dry_run else 0}
def fix_input_field_relations(conn, dry_run: bool = True) -> Dict:
"""删除关联到输入字段的记录f_polic_file_field 应该只关联输出字段)"""
cursor = conn.cursor()
print("\n" + "="*80)
print("删除关联到输入字段的记录")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
# 获取关联到输入字段的记录
input_field_relations = check_field_type_consistency(conn)['input_field_relations']
if not input_field_relations:
print("\n✓ 没有关联到输入字段的记录需要删除")
return {'deleted': 0}
ids_to_delete = [rel['id'] for rel in input_field_relations]
print(f"\n准备删除 {len(ids_to_delete)} 条关联到输入字段的记录")
if not dry_run:
placeholders = ','.join(['%s'] * len(ids_to_delete))
cursor.execute(f"""
DELETE FROM f_polic_file_field
WHERE id IN ({placeholders})
""", ids_to_delete)
conn.commit()
print(f"✓ 已删除 {cursor.rowcount} 条关联到输入字段的记录")
else:
print(f"[DRY RUN] 将删除以下关联ID: {sorted(ids_to_delete)}")
return {'deleted': len(ids_to_delete) if not dry_run else 0}
def fix_duplicate_relations(conn, dry_run: bool = True) -> Dict:
"""修复重复的关联关系(保留第一条,删除其他)"""
cursor = conn.cursor()
print("\n" + "="*80)
print("修复重复的关联关系")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
duplicates = check_duplicate_relations(conn)['duplicates']
if not duplicates:
print("\n✓ 没有重复的关联关系需要修复")
return {'deleted': 0}
ids_to_delete = []
for dup in duplicates:
# 保留第一条ID最小的删除其他的
ids_to_delete.extend(dup['ids'][1:])
print(f"\n准备删除 {len(ids_to_delete)} 条重复的关联关系")
if not dry_run:
placeholders = ','.join(['%s'] * len(ids_to_delete))
cursor.execute(f"""
DELETE FROM f_polic_file_field
WHERE id IN ({placeholders})
""", ids_to_delete)
conn.commit()
print(f"✓ 已删除 {cursor.rowcount} 条重复的关联关系")
else:
print(f"[DRY RUN] 将删除以下关联ID: {sorted(ids_to_delete)}")
return {'deleted': len(ids_to_delete) if not dry_run else 0}
def get_statistics(conn) -> Dict:
"""获取统计信息"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
print("\n" + "="*80)
print("统计信息")
print("="*80)
# 总关联数
cursor.execute("""
SELECT COUNT(*) as total
FROM f_polic_file_field
WHERE tenant_id = %s
""", (TENANT_ID,))
total_relations = cursor.fetchone()['total']
# 有效的关联数(关联到存在的、启用的文件和字段)
cursor.execute("""
SELECT COUNT(*) as total
FROM f_polic_file_field fff
INNER JOIN f_polic_file_config fc ON fff.file_id = fc.id AND fff.tenant_id = fc.tenant_id AND fc.state = 1
INNER JOIN f_polic_field f ON fff.filed_id = f.id AND fff.tenant_id = f.tenant_id AND f.state = 1
WHERE fff.tenant_id = %s
""", (TENANT_ID,))
valid_relations = cursor.fetchone()['total']
# 关联的文件数
cursor.execute("""
SELECT COUNT(DISTINCT file_id) as total
FROM f_polic_file_field
WHERE tenant_id = %s
""", (TENANT_ID,))
related_files = cursor.fetchone()['total']
# 关联的字段数
cursor.execute("""
SELECT COUNT(DISTINCT filed_id) as total
FROM f_polic_file_field
WHERE tenant_id = %s
""", (TENANT_ID,))
related_fields = cursor.fetchone()['total']
print(f"\n总关联数: {total_relations}")
print(f"有效关联数: {valid_relations}")
print(f"关联的文件数: {related_files}")
print(f"关联的字段数: {related_fields}")
return {
'total_relations': total_relations,
'valid_relations': valid_relations,
'related_files': related_files,
'related_fields': related_fields
}
def main():
"""主函数"""
print("="*80)
print("检查并修复 f_polic_file_field 表的关联关系")
print("="*80)
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功\n")
except Exception as e:
print(f"✗ 数据库连接失败: {e}")
return
try:
# 1. 检查无效的关联关系
invalid_result = check_invalid_relations(conn)
# 2. 检查重复的关联关系
duplicate_result = check_duplicate_relations(conn)
# 3. 检查关联到已删除或未启用的字段/文件
disabled_result = check_disabled_relations(conn)
# 4. 检查缺失的关联关系
missing_result = check_missing_relations(conn)
# 5. 检查字段类型一致性
type_result = check_field_type_consistency(conn)
# 6. 获取统计信息
stats = get_statistics(conn)
# 总结
print("\n" + "="*80)
print("检查总结")
print("="*80)
has_issues = (
len(invalid_result['invalid_file_relations']) > 0 or
len(invalid_result['invalid_field_relations']) > 0 or
len(duplicate_result['duplicates']) > 0
)
has_issues = (
len(invalid_result['invalid_file_relations']) > 0 or
len(invalid_result['invalid_field_relations']) > 0 or
len(duplicate_result['duplicates']) > 0 or
len(type_result['input_field_relations']) > 0
)
if has_issues:
print("\n⚠ 发现以下问题:")
print(f" - 无效的 file_id 关联: {len(invalid_result['invalid_file_relations'])}")
print(f" - 无效的 filed_id 关联: {len(invalid_result['invalid_field_relations'])}")
print(f" - 重复的关联关系: {len(duplicate_result['duplicates'])}")
print(f" - 关联到未启用的文件: {len(disabled_result['disabled_file_relations'])}")
print(f" - 关联到未启用的字段: {len(disabled_result['disabled_field_relations'])}")
print(f" - 关联到输入字段: {len(type_result['input_field_relations'])}")
print("\n是否要修复这些问题?")
print("运行以下命令进行修复:")
print(" python check_and_fix_file_field_relations.py --fix")
else:
print("\n✓ 未发现需要修复的问题")
print("\n" + "="*80)
except Exception as e:
print(f"\n✗ 检查过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
conn.close()
print("\n数据库连接已关闭")
def fix_main():
"""修复主函数"""
print("="*80)
print("修复 f_polic_file_field 表的关联关系")
print("="*80)
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功\n")
except Exception as e:
print(f"✗ 数据库连接失败: {e}")
return
try:
# 先进行干运行
print("\n[第一步] 干运行检查...")
invalid_result = check_invalid_relations(conn)
duplicate_result = check_duplicate_relations(conn)
# 修复无效的关联关系
print("\n[第二步] 修复无效的关联关系...")
fix_invalid_relations(conn, dry_run=False)
# 修复重复的关联关系
print("\n[第三步] 修复重复的关联关系...")
fix_duplicate_relations(conn, dry_run=False)
# 删除关联到输入字段的记录
print("\n[第四步] 删除关联到输入字段的记录...")
fix_input_field_relations(conn, dry_run=False)
# 重新获取统计信息
print("\n[第五步] 修复后的统计信息...")
stats = get_statistics(conn)
print("\n" + "="*80)
print("修复完成")
print("="*80)
except Exception as e:
print(f"\n✗ 修复过程中发生错误: {e}")
import traceback
traceback.print_exc()
conn.rollback()
finally:
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
import sys
if '--fix' in sys.argv:
# 确认操作
print("\n⚠ 警告: 这将修改数据库!")
response = input("确认要继续吗? (yes/no): ")
if response.lower() == 'yes':
fix_main()
else:
print("操作已取消")
else:
main()

View File

@ -0,0 +1,191 @@
"""
修复缺失的 target_education_level 字段
检查并创建被核查人员文化程度字段
"""
import pymysql
import os
from datetime import datetime
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
CURRENT_TIME = datetime.now()
# 字段定义
FIELD_DEFINITION = {
'name': '被核查人员文化程度',
'field_code': 'target_education_level',
'field_type': 2, # 输出字段
'description': '被核查人员文化程度(如:本科、大专、高中等)'
}
def generate_id():
"""生成ID使用时间戳+随机数的方式,模拟雪花算法)"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def check_field_exists(conn):
"""检查字段是否存在"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s AND filed_code = %s
"""
cursor.execute(sql, (TENANT_ID, FIELD_DEFINITION['field_code']))
field = cursor.fetchone()
cursor.close()
return field
def create_field(conn, dry_run: bool = True):
"""创建字段"""
cursor = conn.cursor()
field_id = generate_id()
insert_sql = """
INSERT INTO f_polic_field
(id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
if dry_run:
print(f"[DRY RUN] 将创建字段:")
print(f" ID: {field_id}")
print(f" 名称: {FIELD_DEFINITION['name']}")
print(f" 编码: {FIELD_DEFINITION['field_code']}")
print(f" 类型: {FIELD_DEFINITION['field_type']} (输出字段)")
print(f" 状态: 1 (启用)")
else:
cursor.execute(insert_sql, (
field_id,
TENANT_ID,
FIELD_DEFINITION['name'],
FIELD_DEFINITION['field_code'],
FIELD_DEFINITION['field_type'],
CURRENT_TIME,
CREATED_BY,
CURRENT_TIME,
UPDATED_BY,
1 # state: 1表示启用
))
conn.commit()
print(f"✓ 成功创建字段: {FIELD_DEFINITION['name']} ({FIELD_DEFINITION['field_code']}), ID: {field_id}")
cursor.close()
return field_id
def update_field_state(conn, field_id, dry_run: bool = True):
"""更新字段状态为启用"""
cursor = conn.cursor()
update_sql = """
UPDATE f_polic_field
SET state = 1, updated_time = NOW(), updated_by = %s
WHERE id = %s AND tenant_id = %s
"""
if dry_run:
print(f"[DRY RUN] 将更新字段状态为启用: ID={field_id}")
else:
cursor.execute(update_sql, (UPDATED_BY, field_id, TENANT_ID))
conn.commit()
print(f"✓ 成功更新字段状态为启用: ID={field_id}")
cursor.close()
def main(dry_run: bool = True):
"""主函数"""
print("="*80)
print("修复缺失的 target_education_level 字段")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
else:
print("\n[实际执行模式 - 将修改数据库]")
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功\n")
# 检查字段是否存在
print("1. 检查字段是否存在...")
existing_field = check_field_exists(conn)
if existing_field:
print(f" ✓ 字段已存在:")
print(f" ID: {existing_field['id']}")
print(f" 名称: {existing_field['name']}")
print(f" 编码: {existing_field['filed_code']}")
print(f" 类型: {existing_field['field_type']} ({'输出字段' if existing_field['field_type'] == 2 else '输入字段'})")
print(f" 状态: {existing_field['state']} ({'启用' if existing_field['state'] == 1 else '未启用'})")
# 如果字段存在但未启用,启用它
if existing_field['state'] != 1:
print(f"\n2. 字段存在但未启用,将更新状态...")
update_field_state(conn, existing_field['id'], dry_run=dry_run)
else:
print(f"\n✓ 字段已存在且已启用,无需操作")
else:
print(f" ✗ 字段不存在,需要创建")
print(f"\n2. 创建字段...")
field_id = create_field(conn, dry_run=dry_run)
if not dry_run:
print(f"\n✓ 字段创建完成")
print("\n" + "="*80)
if dry_run:
print("\n这是DRY RUN模式未实际修改数据库。")
print("要实际执行,请运行: python fix_missing_education_level_field.py --execute")
else:
print("\n✓ 字段修复完成")
except Exception as e:
print(f"\n✗ 发生错误: {e}")
import traceback
traceback.print_exc()
if not dry_run:
conn.rollback()
finally:
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
import sys
dry_run = '--execute' not in sys.argv
if not dry_run:
print("\n⚠ 警告: 这将修改数据库!")
response = input("确认要继续吗? (yes/no): ")
if response.lower() != 'yes':
print("操作已取消")
sys.exit(0)
main(dry_run=dry_run)

View File

@ -16,8 +16,8 @@ BUCKET_NAME = 'finyx'
# 文件相对路径列表 # 文件相对路径列表
FILE_PATHS = [ FILE_PATHS = [
'/615873064429507639/20251209170434/初步核实审批表_张三.docx', '/615873064429507639/20251210155041/初步核实审批表_张三.docx',
'/615873064429507639/20251209170434/请示报告卡_张三.docx' '/615873064429507639/20251210155041/请示报告卡_张三.docx'
] ]
def generate_download_urls(): def generate_download_urls():

View File

@ -0,0 +1,318 @@
"""
模板字段关联查询示例脚本
演示如何查询模板关联的输入和输出字段
"""
import pymysql
import os
from typing import Dict, List, Optional
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
def get_template_fields_by_name(template_name: str) -> Optional[Dict]:
"""
根据模板名称获取关联的字段
Args:
template_name: 模板名称 '初步核实审批表'
Returns:
dict: 包含 template_id, template_name, input_fields output_fields 的字典
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT
fc.id AS template_id,
fc.name AS template_name,
f.id AS field_id,
f.name AS field_name,
f.filed_code AS field_code,
f.field_type
FROM f_polic_file_config fc
INNER JOIN f_polic_file_field fff ON fc.id = fff.file_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fc.tenant_id = %s
AND fc.name = %s
AND fc.state = 1
AND fff.state = 1
AND f.state = 1
ORDER BY f.field_type, f.name
"""
cursor.execute(sql, (TENANT_ID, template_name))
rows = cursor.fetchall()
if not rows:
return None
result = {
'template_id': rows[0]['template_id'],
'template_name': rows[0]['template_name'],
'input_fields': [],
'output_fields': []
}
for row in rows:
field_info = {
'id': row['field_id'],
'name': row['field_name'],
'field_code': row['field_code'],
'field_type': row['field_type']
}
if row['field_type'] == 1:
result['input_fields'].append(field_info)
elif row['field_type'] == 2:
result['output_fields'].append(field_info)
return result
finally:
cursor.close()
conn.close()
def get_template_fields_by_id(template_id: int) -> Optional[Dict]:
"""
根据模板ID获取关联的字段
Args:
template_id: 模板ID
Returns:
dict: 包含 template_id, template_name, input_fields output_fields 的字典
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 先获取模板名称
sql_template = """
SELECT id, name
FROM f_polic_file_config
WHERE id = %s AND tenant_id = %s AND state = 1
"""
cursor.execute(sql_template, (template_id, TENANT_ID))
template = cursor.fetchone()
if not template:
return None
# 获取字段
sql_fields = """
SELECT
f.id AS field_id,
f.name AS field_name,
f.filed_code AS field_code,
f.field_type
FROM f_polic_file_field fff
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fff.file_id = %s
AND fff.tenant_id = %s
AND fff.state = 1
AND f.state = 1
ORDER BY f.field_type, f.name
"""
cursor.execute(sql_fields, (template_id, TENANT_ID))
rows = cursor.fetchall()
result = {
'template_id': template['id'],
'template_name': template['name'],
'input_fields': [],
'output_fields': []
}
for row in rows:
field_info = {
'id': row['field_id'],
'name': row['field_name'],
'field_code': row['field_code'],
'field_type': row['field_type']
}
if row['field_type'] == 1:
result['input_fields'].append(field_info)
elif row['field_type'] == 2:
result['output_fields'].append(field_info)
return result
finally:
cursor.close()
conn.close()
def get_all_templates_with_field_stats() -> List[Dict]:
"""
获取所有模板及其字段统计信息
Returns:
list: 模板列表每个模板包含字段统计
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT
fc.id AS template_id,
fc.name AS template_name,
COUNT(DISTINCT CASE WHEN f.field_type = 1 THEN f.id END) AS input_field_count,
COUNT(DISTINCT CASE WHEN f.field_type = 2 THEN f.id END) AS output_field_count,
COUNT(DISTINCT f.id) AS total_field_count
FROM f_polic_file_config fc
LEFT JOIN f_polic_file_field fff ON fc.id = fff.file_id AND fff.state = 1
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND f.state = 1
WHERE fc.tenant_id = %s
AND fc.state = 1
GROUP BY fc.id, fc.name
ORDER BY fc.name
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
return [
{
'template_id': t['template_id'],
'template_name': t['template_name'],
'input_field_count': t['input_field_count'] or 0,
'output_field_count': t['output_field_count'] or 0,
'total_field_count': t['total_field_count'] or 0
}
for t in templates
]
finally:
cursor.close()
conn.close()
def find_templates_using_field(field_code: str) -> List[Dict]:
"""
查找使用特定字段的所有模板
Args:
field_code: 字段编码 'target_name'
Returns:
list: 使用该字段的模板列表
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT DISTINCT
fc.id AS template_id,
fc.name AS template_name
FROM f_polic_file_config fc
INNER JOIN f_polic_file_field fff ON fc.id = fff.file_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fc.tenant_id = %s
AND f.tenant_id = %s
AND f.filed_code = %s
AND fc.state = 1
AND fff.state = 1
AND f.state = 1
ORDER BY fc.name
"""
cursor.execute(sql, (TENANT_ID, TENANT_ID, field_code))
templates = cursor.fetchall()
return [
{
'template_id': t['template_id'],
'template_name': t['template_name']
}
for t in templates
]
finally:
cursor.close()
conn.close()
def print_template_fields(result: Dict):
"""打印模板字段信息"""
if not result:
print("未找到模板")
return
print("="*80)
print(f"模板: {result['template_name']} (ID: {result['template_id']})")
print("="*80)
print(f"\n输入字段 ({len(result['input_fields'])} 个):")
if result['input_fields']:
for field in result['input_fields']:
print(f" - {field['name']} ({field['field_code']})")
else:
print(" (无)")
print(f"\n输出字段 ({len(result['output_fields'])} 个):")
if result['output_fields']:
for field in result['output_fields']:
print(f" - {field['name']} ({field['field_code']})")
else:
print(" (无)")
def main():
"""主函数 - 演示各种查询方式"""
print("="*80)
print("模板字段关联查询示例")
print("="*80)
# 示例1: 根据模板名称查询
print("\n【示例1】根据模板名称查询字段")
print("-" * 80)
# 注意:模板名称需要完全匹配,如 "2.初步核实审批表XXX"
result = get_template_fields_by_name('2.初步核实审批表XXX')
if not result:
# 尝试其他可能的名称
result = get_template_fields_by_name('初步核实审批表')
print_template_fields(result)
# 示例2: 获取所有模板的字段统计
print("\n\n【示例2】获取所有模板的字段统计")
print("-" * 80)
templates = get_all_templates_with_field_stats()
print(f"共找到 {len(templates)} 个模板:\n")
for template in templates[:5]: # 只显示前5个
print(f" {template['template_name']} (ID: {template['template_id']})")
print(f" 输入字段: {template['input_field_count']}")
print(f" 输出字段: {template['output_field_count']}")
print(f" 总字段数: {template['total_field_count']}\n")
if len(templates) > 5:
print(f" ... 还有 {len(templates) - 5} 个模板")
# 示例3: 查找使用特定字段的模板
print("\n\n【示例3】查找使用 'target_name' 字段的模板")
print("-" * 80)
templates_using_field = find_templates_using_field('target_name')
print(f"共找到 {len(templates_using_field)} 个模板使用该字段:")
for template in templates_using_field:
print(f" - {template['template_name']} (ID: {template['template_id']})")
print("\n" + "="*80)
print("查询完成")
print("="*80)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,536 @@
"""
重新建立模板和字段的关联关系
根据模板名称重新建立 f_polic_file_field 表的关联关系
不再依赖 input_data template_code 字段
"""
import pymysql
import os
import json
from typing import Dict, List, Set, Optional
from datetime import datetime
from collections import defaultdict
# 数据库连接配置
DB_CONFIG = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
CREATED_BY = 655162080928945152
UPDATED_BY = 655162080928945152
# 模板名称到字段编码的映射(根据业务逻辑定义)
# 格式:{模板名称: {'input_fields': [字段编码列表], 'output_fields': [字段编码列表]}}
TEMPLATE_FIELD_MAPPING = {
# 初步核实审批表
'初步核实审批表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_organization',
'target_position', 'target_gender', 'target_date_of_birth', 'target_age',
'target_education_level', 'target_political_status', 'target_professional_rank',
'clue_source', 'target_issue_description', 'department_opinion', 'filler_name'
]
},
# 谈话前安全风险评估表
'谈话前安全风险评估表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_family_situation', 'target_social_relations', 'target_health_status',
'target_personality', 'target_tolerance', 'target_issue_severity',
'target_other_issues_possibility', 'target_previous_investigation',
'target_negative_events', 'target_other_situation', 'risk_level'
]
},
# 请示报告卡
'请示报告卡': {
'input_fields': ['clue_info'],
'output_fields': ['target_name', 'target_organization_and_position', 'report_card_request_time']
},
# 初核方案
'初核方案': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_work_basic_info',
'target_issue_description', 'investigation_unit_name', 'investigation_team_leader_name',
'investigation_team_member_names', 'investigation_location'
]
},
# 谈话通知书
'谈话通知书': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话通知书第一联
'谈话通知书第一联': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话通知书第二联
'谈话通知书第二联': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话通知书第三联
'谈话通知书第三联': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name', 'notification_time', 'notification_location'
]
},
# 谈话笔录
'谈话笔录': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_gender',
'target_date_of_birth_full', 'target_political_status', 'target_address',
'target_registered_address', 'target_contact', 'target_place_of_origin',
'target_ethnicity', 'target_id_number', 'investigation_team_code'
]
},
# 谈话后安全风险评估表
'谈话后安全风险评估表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_gender',
'target_date_of_birth_full', 'target_political_status', 'target_address',
'target_registered_address', 'target_contact', 'target_place_of_origin',
'target_ethnicity', 'target_id_number', 'investigation_team_code'
]
},
# XXX初核情况报告
'XXX初核情况报告': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_issue_description',
'target_work_basic_info', 'investigation_unit_name', 'investigation_team_leader_name'
]
},
# 走读式谈话审批
'走读式谈话审批': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name'
]
},
# 走读式谈话流程
'走读式谈话流程': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name'
]
},
# 谈话审批 / 谈话审批表
'谈话审批': {
'input_fields': ['target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_id_number',
'appointment_time', 'appointment_location', 'approval_time',
'handling_department', 'handler_name'
]
},
'谈话审批表': {
'input_fields': ['clue_info', 'target_basic_info_clue'],
'output_fields': [
'target_name', 'target_organization_and_position', 'target_gender',
'target_date_of_birth_full', 'target_political_status', 'target_address',
'target_registered_address', 'target_contact', 'target_place_of_origin',
'target_ethnicity', 'target_id_number', 'investigation_team_code'
]
},
}
# 模板名称的标准化映射(处理不同的命名方式)
TEMPLATE_NAME_NORMALIZE = {
'1.请示报告卡XXX': '请示报告卡',
'2.初步核实审批表XXX': '初步核实审批表',
'3.附件初核方案(XXX)': '初核方案',
'8.XXX初核情况报告': 'XXX初核情况报告',
'2.谈话审批': '谈话审批',
'2谈话审批表': '谈话审批表',
}
def generate_id():
"""生成ID使用时间戳+随机数的方式,模拟雪花算法)"""
import time
import random
timestamp = int(time.time() * 1000)
random_part = random.randint(100000, 999999)
return timestamp * 1000 + random_part
def normalize_template_name(name: str) -> str:
"""标准化模板名称"""
# 先检查映射表
if name in TEMPLATE_NAME_NORMALIZE:
return TEMPLATE_NAME_NORMALIZE[name]
# 移除常见的后缀和前缀
name = name.strip()
# 移除括号内容
import re
name = re.sub(r'[(].*?[)]', '', name)
name = name.strip()
# 移除数字前缀和点号
name = re.sub(r'^\d+\.', '', name)
name = name.strip()
return name
def get_all_templates(conn) -> Dict:
"""获取所有模板配置"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, parent_id, state
FROM f_polic_file_config
WHERE tenant_id = %s
ORDER BY name
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
result = {}
for template in templates:
name = template['name']
normalized_name = normalize_template_name(name)
# 处理state字段可能是二进制格式
state = template['state']
if isinstance(state, bytes):
state = int.from_bytes(state, byteorder='big')
elif isinstance(state, (int, str)):
state = int(state)
else:
state = 0
result[template['id']] = {
'id': template['id'],
'name': name,
'normalized_name': normalized_name,
'parent_id': template['parent_id'],
'state': state
}
cursor.close()
return result
def get_all_fields(conn) -> Dict:
"""获取所有字段定义"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT id, name, filed_code, field_type, state
FROM f_polic_field
WHERE tenant_id = %s
ORDER BY field_type, filed_code
"""
cursor.execute(sql, (TENANT_ID,))
fields = cursor.fetchall()
result = {
'by_code': {},
'by_name': {},
'input_fields': [],
'output_fields': []
}
for field in fields:
field_code = field['filed_code']
field_name = field['name']
field_type = field['field_type']
result['by_code'][field_code] = field
result['by_name'][field_name] = field
if field_type == 1:
result['input_fields'].append(field)
elif field_type == 2:
result['output_fields'].append(field)
cursor.close()
return result
def get_existing_relations(conn) -> Set[tuple]:
"""获取现有的关联关系"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = """
SELECT file_id, filed_id
FROM f_polic_file_field
WHERE tenant_id = %s
"""
cursor.execute(sql, (TENANT_ID,))
relations = cursor.fetchall()
result = {(rel['file_id'], rel['filed_id']) for rel in relations}
cursor.close()
return result
def rebuild_template_relations(conn, template_id: int, template_name: str,
normalized_name: str, field_mapping: Dict,
dry_run: bool = True) -> Dict:
"""重建单个模板的关联关系"""
cursor = conn.cursor()
# 查找模板对应的字段配置
template_config = None
# 优先精确匹配标准化名称
if normalized_name in TEMPLATE_FIELD_MAPPING:
template_config = TEMPLATE_FIELD_MAPPING[normalized_name]
else:
# 尝试模糊匹配
for name, config in TEMPLATE_FIELD_MAPPING.items():
if name == normalized_name or name in normalized_name or normalized_name in name:
template_config = config
break
# 也检查原始名称
if name in template_name or template_name in name:
template_config = config
break
if not template_config:
return {
'template_id': template_id,
'template_name': template_name,
'status': 'skipped',
'reason': '未找到字段配置映射',
'input_count': 0,
'output_count': 0
}
input_field_codes = template_config.get('input_fields', [])
output_field_codes = template_config.get('output_fields', [])
# 查找字段ID
input_field_ids = []
output_field_ids = []
for field_code in input_field_codes:
field = field_mapping['by_code'].get(field_code)
if field:
if field['field_type'] == 1:
input_field_ids.append(field['id'])
else:
print(f" ⚠ 警告: 字段 {field_code} 应该是输入字段,但实际类型为 {field['field_type']}")
else:
print(f" ⚠ 警告: 字段 {field_code} 不存在")
for field_code in output_field_codes:
field = field_mapping['by_code'].get(field_code)
if field:
if field['field_type'] == 2:
output_field_ids.append(field['id'])
else:
print(f" ⚠ 警告: 字段 {field_code} 应该是输出字段,但实际类型为 {field['field_type']}")
else:
print(f" ⚠ 警告: 字段 {field_code} 不存在")
# 删除旧的关联关系
if not dry_run:
delete_sql = """
DELETE FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s
"""
cursor.execute(delete_sql, (TENANT_ID, template_id))
deleted_count = cursor.rowcount
else:
deleted_count = 0
# 创建新的关联关系
created_count = 0
all_field_ids = input_field_ids + output_field_ids
for field_id in all_field_ids:
if not dry_run:
# 检查是否已存在(虽然已经删除了,但为了安全还是检查一下)
check_sql = """
SELECT id FROM f_polic_file_field
WHERE tenant_id = %s AND file_id = %s AND filed_id = %s
"""
cursor.execute(check_sql, (TENANT_ID, template_id, field_id))
existing = cursor.fetchone()
if not existing:
relation_id = generate_id()
insert_sql = """
INSERT INTO f_polic_file_field
(id, tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state)
VALUES (%s, %s, %s, %s, NOW(), %s, NOW(), %s, %s)
"""
cursor.execute(insert_sql, (
relation_id, TENANT_ID, template_id, field_id,
CREATED_BY, UPDATED_BY, 1 # state=1 表示启用
))
created_count += 1
else:
created_count += 1
if not dry_run:
conn.commit()
return {
'template_id': template_id,
'template_name': template_name,
'normalized_name': normalized_name,
'status': 'success',
'deleted_count': deleted_count,
'input_count': len(input_field_ids),
'output_count': len(output_field_ids),
'created_count': created_count
}
def main(dry_run: bool = True):
"""主函数"""
print("="*80)
print("重新建立模板和字段的关联关系")
print("="*80)
if dry_run:
print("\n[DRY RUN模式 - 不会实际修改数据库]")
else:
print("\n[实际执行模式 - 将修改数据库]")
try:
conn = pymysql.connect(**DB_CONFIG)
print("✓ 数据库连接成功\n")
# 获取所有模板
print("1. 获取所有模板配置...")
templates = get_all_templates(conn)
print(f" 找到 {len(templates)} 个模板")
# 获取所有字段
print("\n2. 获取所有字段定义...")
field_mapping = get_all_fields(conn)
print(f" 输入字段: {len(field_mapping['input_fields'])}")
print(f" 输出字段: {len(field_mapping['output_fields'])}")
print(f" 总字段数: {len(field_mapping['by_code'])}")
# 获取现有关联关系
print("\n3. 获取现有关联关系...")
existing_relations = get_existing_relations(conn)
print(f" 现有关联关系: {len(existing_relations)}")
# 重建关联关系
print("\n4. 重建模板和字段的关联关系...")
print("="*80)
results = []
for template_id, template_info in templates.items():
template_name = template_info['name']
normalized_name = template_info['normalized_name']
state = template_info['state']
# 处理所有模板(包括未启用的,因为可能需要建立关联)
# 但可以记录状态
status_note = f" (state={state})" if state != 1 else ""
if state != 1:
print(f"\n处理未启用的模板: {template_name}{status_note}")
print(f"\n处理模板: {template_name}")
print(f" 标准化名称: {normalized_name}")
result = rebuild_template_relations(
conn, template_id, template_name, normalized_name,
field_mapping, dry_run=dry_run
)
results.append(result)
if result['status'] == 'success':
print(f" ✓ 成功: 删除 {result['deleted_count']} 条旧关联, "
f"创建 {result['created_count']} 条新关联 "
f"(输入字段: {result['input_count']}, 输出字段: {result['output_count']})")
else:
print(f"{result['status']}: {result.get('reason', '')}")
# 统计信息
print("\n" + "="*80)
print("处理结果统计")
print("="*80)
success_count = sum(1 for r in results if r['status'] == 'success')
skipped_count = sum(1 for r in results if r['status'] == 'skipped')
total_input = sum(r.get('input_count', 0) for r in results)
total_output = sum(r.get('output_count', 0) for r in results)
total_created = sum(r.get('created_count', 0) for r in results)
print(f"\n成功处理: {success_count} 个模板")
print(f"跳过: {skipped_count} 个模板")
print(f"总输入字段关联: {total_input}")
print(f"总输出字段关联: {total_output}")
print(f"总关联关系: {total_created}")
# 显示详细结果
print("\n详细结果:")
for result in results:
if result['status'] == 'success':
print(f" - {result['template_name']}: "
f"输入字段 {result['input_count']} 个, "
f"输出字段 {result['output_count']}")
else:
print(f" - {result['template_name']}: {result['status']} - {result.get('reason', '')}")
print("\n" + "="*80)
if dry_run:
print("\n这是DRY RUN模式未实际修改数据库。")
print("要实际执行,请运行: python rebuild_template_field_relations.py --execute")
else:
print("\n✓ 关联关系已更新完成")
except Exception as e:
print(f"\n✗ 发生错误: {e}")
import traceback
traceback.print_exc()
if not dry_run:
conn.rollback()
finally:
conn.close()
print("\n数据库连接已关闭")
if __name__ == '__main__':
import sys
dry_run = '--execute' not in sys.argv
if not dry_run:
print("\n⚠ 警告: 这将修改数据库!")
response = input("确认要继续吗? (yes/no): ")
if response.lower() != 'yes':
print("操作已取消")
sys.exit(0)
main(dry_run=dry_run)

View File

@ -115,6 +115,10 @@ class DocumentService:
Returns: Returns:
本地临时文件路径 本地临时文件路径
""" """
# 检查file_path是否为None或空
if not file_path:
raise Exception("模板文件路径不能为空请检查数据库中模板配置的file_path字段")
client = self.get_minio_client() client = self.get_minio_client()
# 创建临时文件 # 创建临时文件
@ -231,6 +235,11 @@ class DocumentService:
if not file_config: if not file_config:
raise Exception(f"模板编码 {template_code} 不存在") raise Exception(f"模板编码 {template_code} 不存在")
# 检查file_path是否存在
file_path = file_config.get('file_path')
if not file_path:
raise Exception(f"模板编码 {template_code} 的文件路径(file_path)为空,请检查数据库配置")
# 将input_data转换为字典格式 # 将input_data转换为字典格式
field_data = {} field_data = {}
for item in input_data: for item in input_data:
@ -243,7 +252,7 @@ class DocumentService:
template_path = None template_path = None
filled_doc_path = None filled_doc_path = None
try: try:
template_path = self.download_template_from_minio(file_config['file_path']) template_path = self.download_template_from_minio(file_path)
# 填充模板 # 填充模板
filled_doc_path = self.fill_template(template_path, field_data) filled_doc_path = self.fill_template(template_path, field_data)

View File

@ -0,0 +1,530 @@
# 模板字段关联查询说明
## 一、概述
本文档说明如何通过查询 `f_polic_file_config` 表获取每个模板关联的输入和输出字段。系统已重新建立了模板和字段的关联关系,不再依赖 `input_data``template_code` 字段。
## 二、表结构关系
### 2.1 相关表说明
1. **f_polic_file_config** - 文件模板配置表
- `id`: 文件配置ID主键
- `name`: 模板名称(如:"初步核实审批表"
- `tenant_id`: 租户ID固定值615873064429507639
- `state`: 状态0=未启用1=启用)
2. **f_polic_field** - 字段定义表
- `id`: 字段ID主键
- `name`: 字段名称(中文显示名)
- `filed_code`: 字段编码(注意:表中字段名拼写为 `filed_code`
- `field_type`: 字段类型1=输入字段2=输出字段)
- `tenant_id`: 租户ID
3. **f_polic_file_field** - 文件和字段关联表
- `file_id`: 文件配置ID关联 `f_polic_file_config.id`
- `filed_id`: 字段ID关联 `f_polic_field.id`
- `tenant_id`: 租户ID
- `state`: 状态0=未启用1=启用)
### 2.2 关联关系
```
f_polic_file_config (模板)
↓ (通过 file_id)
f_polic_file_field (关联表)
↓ (通过 filed_id)
f_polic_field (字段)
```
## 三、查询方式
### 3.1 根据模板名称查询字段
**场景**:已知模板名称,查询该模板关联的所有字段(包括输入和输出字段)
```sql
SELECT
fc.id AS template_id,
fc.name AS template_name,
f.id AS field_id,
f.name AS field_name,
f.filed_code AS field_code,
f.field_type,
CASE
WHEN f.field_type = 1 THEN '输入字段'
WHEN f.field_type = 2 THEN '输出字段'
ELSE '未知'
END AS field_type_name
FROM f_polic_file_config fc
INNER JOIN f_polic_file_field fff ON fc.id = fff.file_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fc.tenant_id = 615873064429507639
AND fc.name = '初步核实审批表'
AND fc.state = 1
AND fff.state = 1
AND f.state = 1
ORDER BY f.field_type, f.name;
```
### 3.2 根据模板ID查询字段
**场景**已知模板ID查询该模板关联的所有字段
```sql
SELECT
f.id AS field_id,
f.name AS field_name,
f.filed_code AS field_code,
f.field_type,
CASE
WHEN f.field_type = 1 THEN '输入字段'
WHEN f.field_type = 2 THEN '输出字段'
ELSE '未知'
END AS field_type_name
FROM f_polic_file_field fff
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fff.file_id = ? -- 替换为实际的模板ID
AND fff.tenant_id = 615873064429507639
AND fff.state = 1
AND f.state = 1
ORDER BY f.field_type, f.name;
```
### 3.3 分别查询输入字段和输出字段
**场景**:需要分别获取输入字段和输出字段列表
#### 查询输入字段field_type = 1
```sql
SELECT
f.id AS field_id,
f.name AS field_name,
f.filed_code AS field_code
FROM f_polic_file_config fc
INNER JOIN f_polic_file_field fff ON fc.id = fff.file_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fc.tenant_id = 615873064429507639
AND fc.name = '初步核实审批表'
AND fc.state = 1
AND fff.state = 1
AND f.state = 1
AND f.field_type = 1 -- 输入字段
ORDER BY f.name;
```
#### 查询输出字段field_type = 2
```sql
SELECT
f.id AS field_id,
f.name AS field_name,
f.filed_code AS field_code
FROM f_polic_file_config fc
INNER JOIN f_polic_file_field fff ON fc.id = fff.file_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fc.tenant_id = 615873064429507639
AND fc.name = '初步核实审批表'
AND fc.state = 1
AND fff.state = 1
AND f.state = 1
AND f.field_type = 2 -- 输出字段
ORDER BY f.name;
```
### 3.4 查询所有模板及其字段统计
**场景**:获取所有模板及其关联的字段数量统计
```sql
SELECT
fc.id AS template_id,
fc.name AS template_name,
COUNT(DISTINCT CASE WHEN f.field_type = 1 THEN f.id END) AS input_field_count,
COUNT(DISTINCT CASE WHEN f.field_type = 2 THEN f.id END) AS output_field_count,
COUNT(DISTINCT f.id) AS total_field_count
FROM f_polic_file_config fc
LEFT JOIN f_polic_file_field fff ON fc.id = fff.file_id AND fff.state = 1
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND f.state = 1
WHERE fc.tenant_id = 615873064429507639
AND fc.state = 1
GROUP BY fc.id, fc.name
ORDER BY fc.name;
```
### 3.5 查询特定模板的完整字段信息JSON格式
**场景**:前端需要获取模板的完整字段信息,包括输入和输出字段的详细信息
```sql
SELECT
fc.id AS template_id,
fc.name AS template_name,
JSON_OBJECT(
'input_fields', JSON_ARRAYAGG(
CASE
WHEN f.field_type = 1 THEN JSON_OBJECT(
'id', f.id,
'name', f.name,
'field_code', f.filed_code
)
END
),
'output_fields', JSON_ARRAYAGG(
CASE
WHEN f.field_type = 2 THEN JSON_OBJECT(
'id', f.id,
'name', f.name,
'field_code', f.filed_code
)
END
)
) AS fields_info
FROM f_polic_file_config fc
LEFT JOIN f_polic_file_field fff ON fc.id = fff.file_id AND fff.state = 1
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND f.state = 1
WHERE fc.tenant_id = 615873064429507639
AND fc.name = '初步核实审批表'
AND fc.state = 1
GROUP BY fc.id, fc.name;
```
## 四、Python代码示例
### 4.1 根据模板名称获取字段
```python
import pymysql
# 数据库配置
DB_CONFIG = {
'host': '152.136.177.240',
'port': 5012,
'user': 'finyx',
'password': '6QsGK6MpePZDE57Z',
'database': 'finyx',
'charset': 'utf8mb4'
}
TENANT_ID = 615873064429507639
def get_template_fields_by_name(template_name: str):
"""
根据模板名称获取关联的字段
Args:
template_name: 模板名称,如 '初步核实审批表'
Returns:
dict: 包含 input_fields 和 output_fields 的字典
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT
f.id,
f.name,
f.filed_code AS field_code,
f.field_type
FROM f_polic_file_config fc
INNER JOIN f_polic_file_field fff ON fc.id = fff.file_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fc.tenant_id = %s
AND fc.name = %s
AND fc.state = 1
AND fff.state = 1
AND f.state = 1
ORDER BY f.field_type, f.name
"""
cursor.execute(sql, (TENANT_ID, template_name))
fields = cursor.fetchall()
# 分类为输入字段和输出字段
result = {
'template_name': template_name,
'input_fields': [],
'output_fields': []
}
for field in fields:
field_info = {
'id': field['id'],
'name': field['name'],
'field_code': field['field_code'],
'field_type': field['field_type']
}
if field['field_type'] == 1:
result['input_fields'].append(field_info)
elif field['field_type'] == 2:
result['output_fields'].append(field_info)
return result
finally:
cursor.close()
conn.close()
# 使用示例
if __name__ == '__main__':
result = get_template_fields_by_name('初步核实审批表')
print(f"模板: {result['template_name']}")
print(f"输入字段数量: {len(result['input_fields'])}")
print(f"输出字段数量: {len(result['output_fields'])}")
print("\n输入字段:")
for field in result['input_fields']:
print(f" - {field['name']} ({field['field_code']})")
print("\n输出字段:")
for field in result['output_fields']:
print(f" - {field['name']} ({field['field_code']})")
```
### 4.2 根据模板ID获取字段
```python
def get_template_fields_by_id(template_id: int):
"""
根据模板ID获取关联的字段
Args:
template_id: 模板ID
Returns:
dict: 包含 input_fields 和 output_fields 的字典
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 先获取模板名称
sql_template = """
SELECT id, name
FROM f_polic_file_config
WHERE id = %s AND tenant_id = %s AND state = 1
"""
cursor.execute(sql_template, (template_id, TENANT_ID))
template = cursor.fetchone()
if not template:
return None
# 获取字段
sql_fields = """
SELECT
f.id,
f.name,
f.filed_code AS field_code,
f.field_type
FROM f_polic_file_field fff
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fff.file_id = %s
AND fff.tenant_id = %s
AND fff.state = 1
AND f.state = 1
ORDER BY f.field_type, f.name
"""
cursor.execute(sql_fields, (template_id, TENANT_ID))
fields = cursor.fetchall()
result = {
'template_id': template['id'],
'template_name': template['name'],
'input_fields': [],
'output_fields': []
}
for field in fields:
field_info = {
'id': field['id'],
'name': field['name'],
'field_code': field['field_code'],
'field_type': field['field_type']
}
if field['field_type'] == 1:
result['input_fields'].append(field_info)
elif field['field_type'] == 2:
result['output_fields'].append(field_info)
return result
finally:
cursor.close()
conn.close()
```
### 4.3 获取所有模板及其字段统计
```python
def get_all_templates_with_field_stats():
"""
获取所有模板及其字段统计信息
Returns:
list: 模板列表,每个模板包含字段统计
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT
fc.id AS template_id,
fc.name AS template_name,
COUNT(DISTINCT CASE WHEN f.field_type = 1 THEN f.id END) AS input_field_count,
COUNT(DISTINCT CASE WHEN f.field_type = 2 THEN f.id END) AS output_field_count,
COUNT(DISTINCT f.id) AS total_field_count
FROM f_polic_file_config fc
LEFT JOIN f_polic_file_field fff ON fc.id = fff.file_id AND fff.state = 1
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND f.state = 1
WHERE fc.tenant_id = %s
AND fc.state = 1
GROUP BY fc.id, fc.name
ORDER BY fc.name
"""
cursor.execute(sql, (TENANT_ID,))
templates = cursor.fetchall()
return [
{
'template_id': t['template_id'],
'template_name': t['template_name'],
'input_field_count': t['input_field_count'] or 0,
'output_field_count': t['output_field_count'] or 0,
'total_field_count': t['total_field_count'] or 0
}
for t in templates
]
finally:
cursor.close()
conn.close()
# 使用示例
if __name__ == '__main__':
templates = get_all_templates_with_field_stats()
print("所有模板及其字段统计:")
for template in templates:
print(f"\n模板: {template['template_name']} (ID: {template['template_id']})")
print(f" 输入字段: {template['input_field_count']} 个")
print(f" 输出字段: {template['output_field_count']} 个")
print(f" 总字段数: {template['total_field_count']} 个")
```
## 五、常见查询场景
### 5.1 前端展示模板列表
**需求**:前端需要展示所有模板,并显示每个模板的字段数量
```sql
SELECT
fc.id,
fc.name,
COUNT(DISTINCT CASE WHEN f.field_type = 1 THEN f.id END) AS input_count,
COUNT(DISTINCT CASE WHEN f.field_type = 2 THEN f.id END) AS output_count
FROM f_polic_file_config fc
LEFT JOIN f_polic_file_field fff ON fc.id = fff.file_id AND fff.state = 1
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND f.state = 1
WHERE fc.tenant_id = 615873064429507639
AND fc.state = 1
GROUP BY fc.id, fc.name
ORDER BY fc.name;
```
### 5.2 验证模板字段完整性
**需求**:检查某个模板是否有关联字段
```sql
SELECT
fc.id,
fc.name,
CASE
WHEN COUNT(f.id) > 0 THEN '有字段关联'
ELSE '无字段关联'
END AS status,
COUNT(f.id) AS field_count
FROM f_polic_file_config fc
LEFT JOIN f_polic_file_field fff ON fc.id = fff.file_id AND fff.state = 1
LEFT JOIN f_polic_field f ON fff.filed_id = f.id AND f.state = 1
WHERE fc.tenant_id = 615873064429507639
AND fc.name = '初步核实审批表'
AND fc.state = 1
GROUP BY fc.id, fc.name;
```
### 5.3 查找使用特定字段的所有模板
**需求**:查找哪些模板使用了某个字段(如 `target_name`
```sql
SELECT
fc.id AS template_id,
fc.name AS template_name
FROM f_polic_file_config fc
INNER JOIN f_polic_file_field fff ON fc.id = fff.file_id
INNER JOIN f_polic_field f ON fff.filed_id = f.id
WHERE fc.tenant_id = 615873064429507639
AND f.tenant_id = 615873064429507639
AND f.filed_code = 'target_name'
AND fc.state = 1
AND fff.state = 1
AND f.state = 1
ORDER BY fc.name;
```
## 六、注意事项
1. **租户ID**所有查询都需要使用固定的租户ID`615873064429507639`
2. **状态过滤**:建议始终过滤 `state = 1` 的记录,确保只获取启用的模板和字段
3. **字段名拼写**:注意 `f_polic_field` 表中的字段编码字段名是 `filed_code`(不是 `field_code`),这是历史遗留问题
4. **字段类型**
- `field_type = 1`输入字段用于AI解析的原始数据
- `field_type = 2`输出字段AI解析后生成的结构化数据用于填充模板
5. **关联表状态**`f_polic_file_field` 表也有 `state` 字段,需要过滤 `fff.state = 1`
6. **性能优化**:如果经常查询,建议在以下字段上创建索引:
- `f_polic_file_config.tenant_id`
- `f_polic_file_config.name`
- `f_polic_file_field.file_id`
- `f_polic_file_field.filed_id`
- `f_polic_field.filed_code`
## 七、示例数据
### 7.1 初步核实审批表字段示例
**输入字段**2个
- `clue_info` - 线索信息
- `target_basic_info_clue` - 被核查人员工作基本情况线索
**输出字段**14个
- `target_name` - 被核查人姓名
- `target_organization_and_position` - 被核查人员单位及职务
- `target_organization` - 被核查人员单位
- `target_position` - 被核查人员职务
- `target_gender` - 被核查人员性别
- `target_date_of_birth` - 被核查人员出生年月
- `target_age` - 被核查人员年龄
- `target_education_level` - 被核查人员文化程度
- `target_political_status` - 被核查人员政治面貌
- `target_professional_rank` - 被核查人员职级
- `clue_source` - 线索来源
- `target_issue_description` - 主要问题线索
- `department_opinion` - 初步核实审批表承办部门意见
- `filler_name` - 初步核实审批表填表人
## 八、总结
通过 `f_polic_file_field` 关联表,可以方便地查询每个模板关联的输入和输出字段。这种方式比之前依赖 `input_data``template_code` 字段更加规范、可靠,也更容易维护和扩展。
其他研发人员可以根据上述SQL示例和Python代码在自己的模块中实现模板字段的查询功能。