""" 智慧监督AI文书写作服务 - 主应用 """ from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS from flasgger import Swagger import os import pymysql import json import tempfile import zipfile import re from datetime import datetime from pathlib import Path from dotenv import load_dotenv from flask import send_file from docx import Document from minio import Minio from werkzeug.utils import secure_filename from services.ai_service import AIService from services.field_service import FieldService from services.document_service import DocumentService from utils.response import success_response, error_response # 加载环境变量 load_dotenv() def clean_query_result(data): """ 清理查询结果,将 bytes 类型转换为字符串 用于处理数据库查询结果中的 BLOB 等字段 支持处理:bytes, datetime, Decimal, 以及其他不可序列化的类型 """ if isinstance(data, bytes): # 如果是单个字节(如 TINYINT(1) 的 state 字段),转换为整数 if len(data) == 1: return int.from_bytes(data, byteorder='big') # 如果是多个字节(如字符串),解码为 UTF-8 try: return data.decode('utf-8') except UnicodeDecodeError: return data.decode('utf-8', errors='ignore') elif isinstance(data, dict): return {key: clean_query_result(value) for key, value in data.items()} elif isinstance(data, list): return [clean_query_result(item) for item in data] elif isinstance(data, datetime): return data.isoformat() elif isinstance(data, (int, float, str, bool, type(None))): # 保持原始类型,但确保数字类型不会被意外转换 return data elif hasattr(data, '__int__'): # 处理 Decimal 等数值类型,转换为 int 或 float try: if isinstance(data, float) or (hasattr(data, 'as_tuple') and data.as_tuple()[2] < 0): return float(data) else: return int(data) except: return str(data) else: # 对于其他类型(如 Decimal, date, time 等),尝试转换为字符串或 JSON 兼容类型 try: # 尝试使用 JSON 默认处理 import json json.dumps(data, default=str) # 测试是否可以序列化 return data except (TypeError, ValueError): # 如果无法序列化,转换为字符串 try: return str(data) except: return None app = Flask(__name__) CORS(app) # 允许跨域请求 # 配置Swagger swagger_config = { "headers": [], "specs": [ { "endpoint": "apispec", "route": "/apispec.json", "rule_filter": lambda rule: True, "model_filter": lambda tag: True, } ], "static_url_path": "/flasgger_static", "swagger_ui": True, "specs_route": "/api-docs" } swagger_template = { "swagger": "2.0", "info": { "title": "智慧监督AI文书写作服务 API", "description": "基于大模型的智能文书生成服务,支持从非结构化文本中提取结构化字段数据", "version": "1.0.0", "contact": { "name": "API支持" } }, "basePath": "/", "schemes": ["http", "https"], "tags": [ { "name": "AI解析", "description": "AI字段提取相关接口" }, { "name": "文档生成", "description": "文档生成相关接口" }, { "name": "字段配置", "description": "字段配置查询接口" } ] } swagger = Swagger(app, config=swagger_config, template=swagger_template) # 初始化服务 ai_service = AIService() field_service = FieldService() document_service = DocumentService() @app.route('/') def index(): """返回测试页面""" return send_from_directory('static', 'index.html') @app.route('/ai/extract', methods=['POST']) @app.route('/api/ai/extract', methods=['POST']) # 保留旧路径以兼容 def extract(): """ AI字段提取接口 从输入的非结构化文本中提取结构化字段数据 --- tags: - AI解析 summary: 从输入数据中提取结构化字段 description: 使用AI大模型从输入文本中提取结构化字段,根据fieldCode从数据库查询字段配置 consumes: - application/json produces: - application/json parameters: - in: body name: body description: 请求参数 required: true schema: type: object required: - inputData - outputData properties: inputData: type: array description: 输入数据列表 items: type: object properties: fieldCode: type: string description: 字段编码 example: clue_info fieldValue: type: string description: 字段值(原始文本) example: 被举报用户名称是张三,年龄30岁,某公司总经理 outputData: type: array description: 需要提取的输出字段列表 items: type: object properties: fieldCode: type: string description: 字段编码 example: userName responses: 200: description: 解析成功 schema: type: object properties: code: type: integer description: 响应码,0表示成功 example: 0 data: type: object properties: outData: type: array description: 提取的字段列表 items: type: object properties: fieldCode: type: string description: 字段编码 example: userName fieldValue: type: string description: 提取的字段值 example: 张三 msg: type: string description: 响应消息 example: ok isSuccess: type: boolean description: 是否成功 example: true timestamp: type: string description: 时间戳 errorMsg: type: string description: 错误信息(成功时为空) 400: description: 请求参数错误 schema: type: object properties: code: type: integer example: 400 errorMsg: type: string example: 请求参数不能为空 isSuccess: type: boolean example: false 2001: description: AI解析超时或发生错误 schema: type: object properties: code: type: integer example: 2001 errorMsg: type: string example: AI解析超时或发生错误 isSuccess: type: boolean example: false 2002: description: AI解析失败 schema: type: object properties: code: type: integer example: 2002 errorMsg: type: string example: AI解析失败,请检查输入文本质量 isSuccess: type: boolean example: false """ try: data = request.get_json() # 验证请求参数 if not data: return error_response(400, "请求参数不能为空") input_data = data.get('inputData', []) output_data = data.get('outputData', []) if not input_data or not isinstance(input_data, list): return error_response(400, "inputData参数必须是非空数组") if not output_data or not isinstance(output_data, list): return error_response(400, "outputData参数必须是非空数组") # 提取outputData中的fieldCode列表 output_field_codes = [] for item in output_data: if isinstance(item, dict) and 'fieldCode' in item: output_field_codes.append(item['fieldCode']) elif isinstance(item, str): output_field_codes.append(item) if not output_field_codes: return error_response(400, "outputData中必须包含至少一个fieldCode") # 根据fieldCode从数据库查询输出字段配置 output_fields = field_service.get_output_fields_by_field_codes(output_field_codes) if not output_fields: return error_response(2002, f"未找到字段编码 {output_field_codes} 对应的字段配置") # 构建AI提示词(不再需要business_type) prompt = field_service.build_extract_prompt(input_data, output_fields) # 调用AI服务进行解析 ai_result = ai_service.extract_fields(prompt, output_fields) if not ai_result: return error_response(2002, "AI解析失败,请检查输入文本质量") # 调试:打印AI返回的结果 print(f"[API] AI返回结果包含 {len(ai_result)} 个字段") for key in ['target_name', 'target_gender', 'target_age', 'target_date_of_birth']: if key in ai_result: print(f"[API] AI返回 {key} = '{ai_result[key]}'") # 构建返回数据(按照outputData中的字段顺序返回) out_data = [] # 创建一个字段编码到字段信息的映射 field_map = {field['field_code']: field for field in output_fields} # 按照outputData的顺序构建返回数据 # 注意:如果AI未提取到值,返回空字符串,不自动应用默认值 # 默认值信息在文档中说明,由前端根据业务需求决定是否应用 for field_code in output_field_codes: field_value = ai_result.get(field_code, '') # 调试:打印关键字段的映射 if field_code in ['target_name', 'target_gender', 'target_age']: print(f"[API] 构建返回数据: {field_code} = '{field_value}' (从ai_result获取)") out_data.append({ 'fieldCode': field_code, 'fieldValue': field_value }) return success_response({'outData': out_data}) except Exception as e: return error_response(2001, f"AI解析超时或发生错误: {str(e)}") @app.route('/api/file-configs', methods=['GET']) def get_file_configs(): """ 获取可用的文件配置列表 用于查询可用的fileId,供文档生成接口使用 --- tags: - 字段配置 summary: 获取文件配置列表 description: 返回所有启用的文件配置,包含fileId和文件名称 responses: 200: description: 成功 schema: type: object properties: code: type: integer example: 0 data: type: object properties: fileConfigs: type: array items: type: object properties: fileId: type: integer description: 文件配置ID example: 1765273961563507 fileName: type: string description: 文件名称 example: 1.请示报告卡(XXX) filePath: type: string description: MinIO文件路径 example: /615873064429507639/TEMPLATE/2025/12/1.请示报告卡(XXX).docx isSuccess: type: boolean example: true """ try: conn = document_service.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: sql = """ SELECT id, name, file_path FROM f_polic_file_config WHERE state = 1 ORDER BY name """ cursor.execute(sql) configs = cursor.fetchall() file_configs = [] for config in configs: file_configs.append({ 'fileId': config['id'], 'fileName': config['name'], 'filePath': config['file_path'] or '' }) return success_response({ 'fileConfigs': file_configs }) finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"查询文件配置失败: {str(e)}") @app.route('/api/fields', methods=['GET']) def get_fields(): """ 获取字段配置接口 获取指定业务类型的输入和输出字段配置 --- tags: - 字段配置 summary: 获取字段配置 description: 获取指定业务类型的输入字段和输出字段配置,用于测试页面展示 produces: - application/json parameters: - in: query name: businessType type: string required: false default: INVESTIGATION description: 业务类型 example: INVESTIGATION responses: 200: description: 获取成功 schema: type: object properties: code: type: integer description: 响应码,0表示成功 example: 0 data: type: object properties: fields: type: object properties: input_fields: type: array description: 输入字段列表 items: type: object properties: id: type: integer description: 字段ID name: type: string description: 字段名称 example: 线索信息 field_code: type: string description: 字段编码 example: clue_info field_type: type: integer description: 字段类型(1=输入字段,2=输出字段) example: 1 output_fields: type: array description: 输出字段列表 items: type: object properties: id: type: integer description: 字段ID name: type: string description: 字段名称 example: 被核查人姓名 field_code: type: string description: 字段编码 example: target_name field_type: type: integer description: 字段类型(1=输入字段,2=输出字段) example: 2 msg: type: string description: 响应消息 example: ok isSuccess: type: boolean description: 是否成功 example: true 500: description: 服务器错误 schema: type: object properties: code: type: integer example: 500 errorMsg: type: string example: 获取字段配置失败 isSuccess: type: boolean example: false """ try: business_type = request.args.get('businessType', 'INVESTIGATION') fields = field_service.get_fields_by_business_type(business_type) return success_response({'fields': fields}) except Exception as e: return error_response(500, f"获取字段配置失败: {str(e)}") @app.route('/ai/generate-document', methods=['POST']) @app.route('/api/ai/generate-document', methods=['POST']) # 保留旧路径以兼容 def generate_document(): """ 文档生成接口 根据输入数据填充Word模板并生成文档 --- tags: - 文档生成 summary: 生成填充后的文档 description: 根据输入数据填充Word模板,上传到MinIO并返回文件路径 consumes: - application/json produces: - application/json parameters: - in: body name: body description: 请求参数 required: true schema: type: object required: - inputData - fpolicFieldParamFileList properties: inputData: type: array description: 输入数据列表 items: type: object properties: fieldCode: type: string description: 字段编码 example: userName fieldValue: type: string description: 字段值 example: 张三 fpolicFieldParamFileList: type: array description: 文件列表 items: type: object required: - fileId properties: fileId: type: integer description: 文件配置ID(从f_polic_file_config表获取) example: 1765273961563507 fileName: type: string description: 文件名称(可选,用于生成文档名称) example: 请示报告卡.doc responses: 200: description: 生成成功 schema: type: object properties: code: type: integer description: 响应码,0表示成功 example: 0 data: type: object properties: documentId: type: string description: 文档ID example: DOC202411260001 documentName: type: string description: 文档名称(第一个生成的文档名称) example: 初步核实审批表_张三.docx fpolicFieldParamFileList: type: array description: 生成的文档列表(数量与请求一致) items: type: object properties: fileId: type: integer description: 文件ID(与请求中的fileId一致) example: 1 fileName: type: string description: 实际生成的文档名称(.docx格式),与请求中的fileName可能不同 example: 初步核实审批表_张三.docx filePath: type: string description: MinIO相对路径(指向生成的文档文件) example: /615873064429507639/20251205090700/初步核实审批表_张三.docx downloadUrl: type: string description: MinIO预签名下载URL(完整链接,7天有效,可直接下载) example: https://minio.datacubeworld.com:9000/finyx/615873064429507639/20251205090700/初步核实审批表_张三.docx?X-Amz-Algorithm=... msg: type: string example: ok isSuccess: type: boolean example: true 1001: description: 模板不存在或参数错误 schema: type: object properties: code: type: integer example: 1001 errorMsg: type: string example: 文件ID对应的模板不存在或未启用 isSuccess: type: boolean example: false 3001: description: 文件生成失败 schema: type: object properties: code: type: integer example: 3001 errorMsg: type: string example: 文件生成失败 isSuccess: type: boolean example: false 3002: description: 文件保存失败 schema: type: object properties: code: type: integer example: 3002 errorMsg: type: string example: 文件保存失败 isSuccess: type: boolean example: false """ try: data = request.get_json() # 验证请求参数 if not data: return error_response(400, "请求参数不能为空") # 获取tenant_id(从请求参数或请求体中获取) tenant_id = request.args.get('tenant_id') or data.get('tenant_id') if tenant_id: try: tenant_id = int(tenant_id) except (ValueError, TypeError): return error_response(400, "tenant_id必须是整数") else: # 如果未提供tenant_id,尝试从环境变量获取,默认使用1 import os tenant_id_str = os.getenv('TENANT_ID', '1') try: tenant_id = int(tenant_id_str) except (ValueError, TypeError): tenant_id = 1 input_data = data.get('inputData', []) file_list = data.get('fpolicFieldParamFileList', []) if not input_data or not isinstance(input_data, list): return error_response(400, "inputData参数必须是非空数组") if not file_list or not isinstance(file_list, list): return error_response(400, "fpolicFieldParamFileList参数必须是非空数组") # 将input_data转换为字典格式(用于生成文档名称) field_data = {} for item in input_data: field_code = item.get('fieldCode', '') field_value = item.get('fieldValue', '') if field_code: field_data[field_code] = field_value or '' # 生成文档ID document_id = document_service.generate_document_id() # 处理每个文件 result_file_list = [] first_document_name = None # 用于存储第一个生成的文档名 for file_info in file_list: # 兼容 id 和 fileId 两种字段 file_id = file_info.get('fileId') or file_info.get('id') file_name = file_info.get('fileName') or file_info.get('name', '') if not file_id: return error_response(1001, f"文件 {file_name} 缺少fileId或id参数") try: # 生成文档(使用fileId而不是templateCode) result = document_service.generate_document( file_id=file_id, input_data=input_data, file_info=file_info, tenant_id=tenant_id ) # 使用生成的文档名称(.docx格式),而不是原始文件名 generated_file_name = result.get('fileName', file_name) # 保存第一个文档名作为 documentName if first_document_name is None: first_document_name = generated_file_name result_file_list.append({ 'fileId': file_id, 'fileName': generated_file_name, # 使用生成的文档名 'filePath': result['filePath'], # MinIO相对路径 'downloadUrl': result.get('downloadUrl') # MinIO预签名下载URL(完整链接) }) except Exception as e: error_msg = str(e) if '不存在' in error_msg or '模板' in error_msg: return error_response(1001, error_msg) elif '生成' in error_msg or '填充' in error_msg: return error_response(3001, error_msg) elif '上传' in error_msg or '保存' in error_msg: return error_response(3002, error_msg) else: return error_response(3001, f"文件生成失败: {error_msg}") # 构建返回数据(不包含inputData,只返回生成的文档信息) return success_response({ 'documentId': document_id, 'documentName': first_document_name or 'generated.docx', # 使用第一个生成的文档名 'fpolicFieldParamFileList': result_file_list }) except Exception as e: return error_response(3001, f"文档生成失败: {str(e)}") @app.route('/fPolicTask/getDocument', methods=['POST']) def get_document_by_task(): """ 通过taskId获取文档(兼容接口) 支持通过taskId查询关联的文件列表,或直接使用提供的文件列表 """ try: data = request.get_json() # 验证请求参数 if not data: return error_response(400, "请求参数不能为空") task_id = data.get('taskId') input_data = data.get('inputData', []) file_list = data.get('fpolicFieldParamFileList', []) # 如果没有提供file_list,尝试通过taskId查询 if not file_list and task_id: try: conn = document_service.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 尝试从f_polic_task表查询关联的文件列表 # 注意:这里需要根据实际表结构调整SQL sql = """ SELECT file_id, file_name FROM f_polic_task_file WHERE task_id = %s AND state = 1 """ cursor.execute(sql, (task_id,)) task_files = cursor.fetchall() if task_files: file_list = [] for tf in task_files: file_list.append({ 'fileId': tf['file_id'], 'fileName': tf.get('file_name', '') }) except Exception as e: # 如果表不存在或查询失败,记录日志但不报错 print(f"[WARN] 无法通过taskId查询文件列表: {str(e)}") finally: cursor.close() conn.close() except Exception as e: print(f"[WARN] 查询taskId关联文件时出错: {str(e)}") # 如果仍然没有file_list,返回错误 if not file_list: return error_response(400, "缺少fpolicFieldParamFileList参数,且无法通过taskId查询到关联文件。请提供fpolicFieldParamFileList参数,格式: [{'fileId': 文件ID, 'fileName': '文件名'}]") if not input_data or not isinstance(input_data, list): return error_response(400, "inputData参数必须是非空数组") if not file_list or not isinstance(file_list, list): return error_response(400, "fpolicFieldParamFileList参数必须是非空数组") # 将input_data转换为字典格式(用于生成文档名称) field_data = {} for item in input_data: field_code = item.get('fieldCode', '') field_value = item.get('fieldValue', '') if field_code: field_data[field_code] = field_value or '' # 生成文档ID document_id = document_service.generate_document_id() # 获取tenant_id(从请求参数或请求体中获取) tenant_id = request.args.get('tenant_id') or data.get('tenant_id') if tenant_id: try: tenant_id = int(tenant_id) except (ValueError, TypeError): return error_response(400, "tenant_id必须是整数") else: # 如果未提供tenant_id,尝试从环境变量获取,默认使用1 import os tenant_id_str = os.getenv('TENANT_ID', '1') try: tenant_id = int(tenant_id_str) except (ValueError, TypeError): tenant_id = 1 # 处理每个文件 result_file_list = [] first_document_name = None # 用于存储第一个生成的文档名 for file_info in file_list: # 兼容 id 和 fileId 两种字段 file_id = file_info.get('fileId') or file_info.get('id') file_name = file_info.get('fileName') or file_info.get('name', '') if not file_id: return error_response(1001, f"文件 {file_name} 缺少fileId或id参数") try: # 生成文档(使用fileId而不是templateCode) result = document_service.generate_document( file_id=file_id, input_data=input_data, file_info=file_info, tenant_id=tenant_id ) # 使用生成的文档名称(.docx格式),而不是原始文件名 generated_file_name = result.get('fileName', file_name) # 保存第一个文档名作为 documentName if first_document_name is None: first_document_name = generated_file_name result_file_list.append({ 'fileId': file_id, 'fileName': generated_file_name, # 使用生成的文档名 'filePath': result['filePath'], # MinIO相对路径 'downloadUrl': result.get('downloadUrl') # MinIO预签名下载URL(完整链接) }) except Exception as e: error_msg = str(e) if '不存在' in error_msg or '模板' in error_msg: return error_response(1001, error_msg) elif '生成' in error_msg or '填充' in error_msg: return error_response(3001, error_msg) elif '上传' in error_msg or '保存' in error_msg: return error_response(3002, error_msg) else: return error_response(3001, f"文件生成失败: {error_msg}") # 构建返回数据(不包含inputData,只返回生成的文档信息) return success_response({ 'documentId': document_id, 'documentName': first_document_name or 'generated.docx', # 使用第一个生成的文档名 'fpolicFieldParamFileList': result_file_list }) except Exception as e: return error_response(3001, f"文档生成失败: {str(e)}") @app.route('/template-field-manager') def template_field_manager(): """返回模板字段关联管理页面""" return send_from_directory('static', 'template_field_manager.html') def generate_id(): """生成ID(使用时间戳+随机数的方式,模拟雪花算法)""" import time import random timestamp = int(time.time() * 1000) random_part = random.randint(100000, 999999) return timestamp * 1000 + random_part @app.route('/api/tenant-ids', methods=['GET']) def get_tenant_ids(): """ 获取数据库中所有已存在的 tenant_id 用于模板字段关联管理页面选择租户 从三个表中查询所有不同的 tenant_id(字段表、模板表、关联表) """ try: conn = document_service.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 从三个表中获取所有不同的 tenant_id(合并去重) cursor.execute(""" SELECT DISTINCT tenant_id FROM ( SELECT tenant_id FROM f_polic_field WHERE tenant_id IS NOT NULL UNION SELECT tenant_id FROM f_polic_file_config WHERE tenant_id IS NOT NULL UNION SELECT tenant_id FROM f_polic_file_field WHERE tenant_id IS NOT NULL ) AS all_tenants ORDER BY tenant_id """) # 将 tenant_id 转换为字符串,避免 JavaScript 大整数精度问题 tenant_ids = [str(row['tenant_id']) for row in cursor.fetchall()] return success_response({'tenant_ids': tenant_ids}) finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"获取租户ID列表失败: {str(e)}") @app.route('/api/template-field-relations', methods=['GET']) def get_template_field_relations(): """ 获取指定 tenant_id 下的所有模板和字段的关联关系 用于模板字段关联管理页面 查询参数: tenant_id (必填) """ try: tenant_id = request.args.get('tenant_id') if not tenant_id: return error_response(400, "tenant_id参数不能为空") try: tenant_id = int(tenant_id) except ValueError: return error_response(400, "tenant_id必须是数字") conn = document_service.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 获取指定 tenant_id 下所有启用的模板 cursor.execute(""" SELECT id, name, template_code FROM f_polic_file_config WHERE tenant_id = %s AND state = 1 ORDER BY name """, (tenant_id,)) templates = cursor.fetchall() or [] templates = [clean_query_result(t) for t in templates] # 获取指定 tenant_id 下所有启用的输入字段 # 注意:这里查询的是 state=1 的字段,但为了字段管理页面能显示所有字段的状态,应该查询所有字段 cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s AND field_type = 1 ORDER BY name """, (tenant_id,)) input_fields = cursor.fetchall() or [] input_fields = [clean_query_result(f) for f in input_fields] # 确保 state 字段是整数类型(虽然这里查询的是 state=1,但为了统一处理) for field in input_fields: if 'state' in field: try: field['state'] = int(field['state']) except (ValueError, TypeError): field['state'] = 1 # 获取指定 tenant_id 下所有启用的输出字段 # 注意:这里查询的是 state=1 的字段,但为了字段管理页面能显示所有字段的状态,应该查询所有字段 cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s AND field_type = 2 ORDER BY name """, (tenant_id,)) output_fields = cursor.fetchall() or [] output_fields = [clean_query_result(f) for f in output_fields] # 确保 state 字段是整数类型 for field in output_fields: if 'state' in field: try: field['state'] = int(field['state']) except (ValueError, TypeError): field['state'] = 1 # 获取指定 tenant_id 下现有的关联关系 # 关联关系:f_polic_file_field.file_id -> f_polic_file_config.id # f_polic_file_field.filed_id -> f_polic_field.id # 注意:只查询关联关系表中 state=1 的记录,不检查模板的 state # 因为模板可能被禁用,但关联关系仍然有效 cursor.execute(""" SELECT fff.file_id, fff.filed_id FROM f_polic_file_field fff WHERE fff.tenant_id = %s AND fff.state = 1 """, (tenant_id,)) relations = cursor.fetchall() or [] relations = [clean_query_result(r) for r in relations] # 构建关联关系映射 (file_id -> list of filed_id) # 注意:JSON 序列化时,字典的整数 key 会变成字符串 # 所以这里使用字符串 key,前端需要处理类型转换 relation_map = {} for rel in relations: file_id = rel['file_id'] filed_id = rel['filed_id'] # 确保 ID 是整数类型 try: file_id = int(file_id) filed_id = int(filed_id) except (ValueError, TypeError): continue # 跳过无效的关联关系 # 使用字符串 key,因为 JSON 序列化会将数字 key 转为字符串 file_id_str = str(file_id) if file_id_str not in relation_map: relation_map[file_id_str] = [] relation_map[file_id_str].append(filed_id) # 确保 relation_map 的 key 是整数类型(JSON 序列化时 key 会变成字符串) # 但为了前端能正确匹配,我们保持 key 为整数类型 # JSON 会自动将数字 key 转换为字符串,所以前端需要处理这种情况 return success_response({ 'tenant_id': tenant_id, 'templates': templates, 'input_fields': input_fields, 'output_fields': output_fields, 'relations': relation_map }) finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"获取关联关系失败: {str(e)}") @app.route('/api/template-field-relations', methods=['POST']) def save_template_field_relations(): """ 保存模板和字段的关联关系 请求体格式: { "tenant_id": 123, "template_id": 123, "input_field_ids": [1, 2, 3], "output_field_ids": [4, 5, 6] } """ try: data = request.get_json() if not data: return error_response(400, "请求参数不能为空") tenant_id = data.get('tenant_id') template_id = data.get('template_id') input_field_ids = data.get('input_field_ids', []) output_field_ids = data.get('output_field_ids', []) if not tenant_id: return error_response(400, "tenant_id参数不能为空") if not template_id: return error_response(400, "template_id参数不能为空") try: tenant_id = int(tenant_id) except ValueError: return error_response(400, "tenant_id必须是数字") conn = document_service.get_connection() cursor = conn.cursor() try: # 验证模板是否存在且属于该 tenant_id cursor.execute(""" SELECT id FROM f_polic_file_config WHERE id = %s AND tenant_id = %s AND state = 1 """, (template_id, tenant_id)) if not cursor.fetchone(): return error_response(400, f"模板ID {template_id} 不存在或不属于该租户") # 合并所有字段ID all_field_ids = set(input_field_ids + output_field_ids) # 验证字段是否存在且属于该 tenant_id if all_field_ids: placeholders = ','.join(['%s'] * len(all_field_ids)) cursor.execute(f""" SELECT id FROM f_polic_field WHERE id IN ({placeholders}) AND tenant_id = %s AND state = 1 """, list(all_field_ids) + [tenant_id]) existing_field_ids = {row[0] for row in cursor.fetchall()} invalid_field_ids = all_field_ids - existing_field_ids if invalid_field_ids: return error_response(400, f"字段ID {list(invalid_field_ids)} 不存在或不属于该租户") # 删除该模板的所有现有关联关系(仅限该 tenant_id) cursor.execute(""" DELETE FROM f_polic_file_field WHERE file_id = %s AND tenant_id = %s """, (template_id, tenant_id)) # 插入新的关联关系 current_time = datetime.now() created_by = 655162080928945152 # 默认创建者ID if all_field_ids: insert_sql = """ INSERT INTO f_polic_file_field (tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, %s, 1) """ for field_id in all_field_ids: cursor.execute(insert_sql, ( tenant_id, template_id, field_id, current_time, created_by, current_time, created_by )) conn.commit() return success_response({ 'template_id': template_id, 'input_field_count': len(input_field_ids), 'output_field_count': len(output_field_ids), 'total_field_count': len(all_field_ids) }, "保存成功") except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"保存关联关系失败: {str(e)}") # 字段管理 API @app.route('/api/field-management/fields', methods=['GET']) def get_field_management_fields(): """ 获取字段列表(用于字段管理页面) 查询参数: tenant_id (必填), field_type (可选: 1=输入字段, 2=输出字段) """ try: tenant_id = request.args.get('tenant_id') field_type = request.args.get('field_type') if not tenant_id: return error_response(400, "tenant_id参数不能为空") try: tenant_id = int(tenant_id) except ValueError: return error_response(400, "tenant_id必须是数字") conn = document_service.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: if field_type: try: field_type = int(field_type) except ValueError: return error_response(400, "field_type必须是数字") cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s AND field_type = %s ORDER BY field_type, name """, (tenant_id, field_type)) else: cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s ORDER BY field_type, name """, (tenant_id,)) fields = cursor.fetchall() # 清理查询结果,将 bytes 类型转换为字符串 fields = [clean_query_result(field) for field in fields] if fields else [] # 确保 state 字段是整数类型(数据库可能返回 Decimal 或其他类型) for field in fields: if 'state' in field: try: field['state'] = int(field['state']) except (ValueError, TypeError): field['state'] = 1 # 默认启用 # 即使没有数据也返回空数组,而不是错误 return success_response({'fields': fields}) finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"获取字段列表失败: {str(e)}") @app.route('/api/field-management/fields', methods=['POST']) def create_field(): """ 创建新字段 请求体格式: { "tenant_id": 123, "name": "字段名称", "filed_code": "field_code", "field_type": 1 // 1=输入字段, 2=输出字段 } """ try: data = request.get_json() if not data: return error_response(400, "请求参数不能为空") tenant_id = data.get('tenant_id') name = data.get('name') filed_code = data.get('filed_code') field_type = data.get('field_type') if not all([tenant_id, name, filed_code, field_type]): return error_response(400, "tenant_id, name, filed_code, field_type 参数不能为空") try: tenant_id = int(tenant_id) field_type = int(field_type) except ValueError: return error_response(400, "tenant_id 和 field_type 必须是数字") if field_type not in [1, 2]: return error_response(400, "field_type 必须是 1(输入字段)或 2(输出字段)") conn = document_service.get_connection() cursor = conn.cursor() try: # 检查字段编码是否已存在(同一 tenant_id 下) cursor.execute(""" SELECT id FROM f_polic_field WHERE tenant_id = %s AND filed_code = %s """, (tenant_id, filed_code)) if cursor.fetchone(): return error_response(400, f"字段编码 {filed_code} 已存在") # 创建新字段 field_id = generate_id() current_time = datetime.now() created_by = 655162080928945152 cursor.execute(""" INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 1) """, (field_id, tenant_id, name, filed_code, field_type, current_time, created_by, current_time, created_by)) conn.commit() return success_response({'field_id': field_id}, "字段创建成功") except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"创建字段失败: {str(e)}") @app.route('/api/field-management/fields/', methods=['PUT']) def update_field(field_id): """ 更新字段 请求体格式: { "tenant_id": 123, "name": "字段名称", "filed_code": "field_code", "field_type": 1, "state": 1 // 0=未启用, 1=启用 } """ try: data = request.get_json() if not data: return error_response(400, "请求参数不能为空") tenant_id = data.get('tenant_id') name = data.get('name') filed_code = data.get('filed_code') field_type = data.get('field_type') state = data.get('state') if not tenant_id: return error_response(400, "tenant_id参数不能为空") try: tenant_id = int(tenant_id) except ValueError: return error_response(400, "tenant_id必须是数字") conn = document_service.get_connection() cursor = conn.cursor() try: # 验证字段是否存在且属于该 tenant_id cursor.execute(""" SELECT id FROM f_polic_field WHERE id = %s AND tenant_id = %s """, (field_id, tenant_id)) if not cursor.fetchone(): return error_response(404, "字段不存在或不属于该租户") # 如果更新 filed_code,检查是否与其他字段冲突 if filed_code: cursor.execute(""" SELECT id FROM f_polic_field WHERE tenant_id = %s AND filed_code = %s AND id != %s """, (tenant_id, filed_code, field_id)) if cursor.fetchone(): return error_response(400, f"字段编码 {filed_code} 已被其他字段使用") # 构建更新语句 update_fields = [] update_values = [] if name is not None: update_fields.append("name = %s") update_values.append(name) if filed_code is not None: update_fields.append("filed_code = %s") update_values.append(filed_code) if field_type is not None: if field_type not in [1, 2]: return error_response(400, "field_type 必须是 1(输入字段)或 2(输出字段)") update_fields.append("field_type = %s") update_values.append(field_type) if state is not None: update_fields.append("state = %s") update_values.append(state) if not update_fields: return error_response(400, "没有需要更新的字段") update_fields.append("updated_time = %s") update_values.append(datetime.now()) update_fields.append("updated_by = %s") update_values.append(655162080928945152) update_values.append(field_id) update_values.append(tenant_id) update_sql = f""" UPDATE f_polic_field SET {', '.join(update_fields)} WHERE id = %s AND tenant_id = %s """ cursor.execute(update_sql, update_values) conn.commit() return success_response({'field_id': field_id}, "字段更新成功") except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"更新字段失败: {str(e)}") @app.route('/api/field-management/fields/', methods=['DELETE']) def delete_field(field_id): """ 删除字段(软删除,将 state 设置为 0) 查询参数: tenant_id (必填) """ try: tenant_id = request.args.get('tenant_id') if not tenant_id: return error_response(400, "tenant_id参数不能为空") try: tenant_id = int(tenant_id) except ValueError: return error_response(400, "tenant_id必须是数字") conn = document_service.get_connection() cursor = conn.cursor() try: # 验证字段是否存在且属于该 tenant_id cursor.execute(""" SELECT id FROM f_polic_field WHERE id = %s AND tenant_id = %s """, (field_id, tenant_id)) if not cursor.fetchone(): return error_response(404, "字段不存在或不属于该租户") # 检查字段是否被模板关联 cursor.execute(""" SELECT COUNT(*) as count FROM f_polic_file_field WHERE filed_id = %s AND tenant_id = %s AND state = 1 """, (field_id, tenant_id)) result = cursor.fetchone() if result and result[0] > 0: return error_response(400, f"字段正在被 {result[0]} 个模板使用,无法删除") # 软删除字段 cursor.execute(""" UPDATE f_polic_field SET state = 0, updated_time = %s, updated_by = %s WHERE id = %s AND tenant_id = %s """, (datetime.now(), 655162080928945152, field_id, tenant_id)) conn.commit() return success_response({'field_id': field_id}, "字段删除成功") except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"删除字段失败: {str(e)}") # 数据库备份和恢复 API @app.route('/api/database/backup', methods=['POST']) def backup_database(): """ 备份数据库相关表格 请求体格式: { "tenant_id": 123 // 可选,如果提供则只备份该 tenant_id 的数据 } """ try: data = request.get_json() or {} tenant_id = data.get('tenant_id') conn = document_service.get_connection() cursor = conn.cursor(pymysql.cursors.DictCursor) try: # 要备份的表 tables = ['f_polic_file_config', 'f_polic_field', 'f_polic_file_field'] backup_data = {} for table in tables: if tenant_id: # 根据表结构决定如何过滤 tenant_id if table == 'f_polic_file_config': cursor.execute(f"SELECT * FROM {table} WHERE tenant_id = %s", (tenant_id,)) elif table == 'f_polic_field': cursor.execute(f"SELECT * FROM {table} WHERE tenant_id = %s", (tenant_id,)) elif table == 'f_polic_file_field': cursor.execute(f"SELECT * FROM {table} WHERE tenant_id = %s", (tenant_id,)) else: cursor.execute(f"SELECT * FROM {table}") rows = cursor.fetchall() # 清理查询结果,将 bytes 类型转换为字符串 backup_data[table] = [clean_query_result(row) for row in rows] if rows else [] # 创建临时文件保存备份数据 # 确保所有数据都已清理,可以 JSON 序列化 temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') try: json.dump({ 'backup_time': datetime.now().isoformat(), 'tenant_id': tenant_id, 'tables': backup_data }, temp_file, ensure_ascii=False, indent=2, default=str) except (TypeError, ValueError) as e: temp_file.close() return error_response(500, f"备份数据序列化失败: {str(e)}") temp_file.close() return send_file( temp_file.name, as_attachment=True, download_name=f'db_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json', mimetype='application/json' ) finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"备份数据库失败: {str(e)}") @app.route('/api/database/restore', methods=['POST']) def restore_database(): """ 恢复数据库相关表格 通过文件上传: file (multipart/form-data) 查询参数: tenant_id (必填,恢复数据到该 tenant_id) """ try: # 从查询参数获取 tenant_id tenant_id = request.args.get('tenant_id') if not tenant_id: # 尝试从表单数据获取 tenant_id = request.form.get('tenant_id') if not tenant_id: return error_response(400, "tenant_id参数不能为空(可通过查询参数或表单数据提供)") try: tenant_id = int(tenant_id) except (ValueError, TypeError): return error_response(400, "tenant_id必须是数字") backup_data = None # 检查是否有文件上传 if 'file' in request.files: file = request.files['file'] if file.filename: try: backup_data = json.load(file) except json.JSONDecodeError as e: return error_response(400, f"备份文件格式错误: {str(e)}") else: return error_response(400, "请上传备份文件") if not backup_data: return error_response(400, "备份数据不能为空") conn = document_service.get_connection() cursor = conn.cursor() try: # 验证备份数据格式 if 'tables' not in backup_data: return error_response(400, "备份数据格式错误:缺少 tables 字段") tables_data = backup_data['tables'] required_tables = ['f_polic_file_config', 'f_polic_field', 'f_polic_file_field'] for table in required_tables: if table not in tables_data: return error_response(400, f"备份数据格式错误:缺少表 {table} 的数据") # 开始恢复(注意:这里只恢复指定 tenant_id 的数据) # 先删除该 tenant_id 的现有数据 cursor.execute("DELETE FROM f_polic_file_field WHERE tenant_id = %s", (tenant_id,)) cursor.execute("DELETE FROM f_polic_field WHERE tenant_id = %s", (tenant_id,)) cursor.execute("DELETE FROM f_polic_file_config WHERE tenant_id = %s", (tenant_id,)) # 恢复 f_polic_file_config if tables_data['f_polic_file_config']: insert_sql = """ INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, template_code, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ for row in tables_data['f_polic_file_config']: # 确保 tenant_id 正确 row['tenant_id'] = tenant_id cursor.execute(insert_sql, ( row.get('id'), row.get('tenant_id'), row.get('parent_id'), row.get('name'), row.get('template_code'), row.get('input_data'), row.get('file_path'), row.get('created_time'), row.get('created_by'), row.get('updated_time'), row.get('updated_by'), row.get('state', 1) )) # 恢复 f_polic_field if tables_data['f_polic_field']: insert_sql = """ INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ for row in tables_data['f_polic_field']: # 确保 tenant_id 正确 row['tenant_id'] = tenant_id cursor.execute(insert_sql, ( row.get('id'), row.get('tenant_id'), row.get('name'), row.get('filed_code'), row.get('field_type'), row.get('created_time'), row.get('created_by'), row.get('updated_time'), row.get('updated_by'), row.get('state', 1) )) # 恢复 f_polic_file_field if tables_data['f_polic_file_field']: insert_sql = """ INSERT INTO f_polic_file_field (tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ for row in tables_data['f_polic_file_field']: # 确保 tenant_id 正确 row['tenant_id'] = tenant_id cursor.execute(insert_sql, ( row.get('tenant_id'), row.get('file_id'), row.get('filed_id'), row.get('created_time'), row.get('created_by'), row.get('updated_time'), row.get('updated_by'), row.get('state', 1) )) conn.commit() return success_response({ 'tenant_id': tenant_id, 'restored_tables': required_tables }, "数据库恢复成功") except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"恢复数据库失败: {str(e)}") def get_minio_client(): """获取 MinIO 客户端""" minio_endpoint = os.getenv('MINIO_ENDPOINT') minio_access_key = os.getenv('MINIO_ACCESS_KEY') minio_secret_key = os.getenv('MINIO_SECRET_KEY') minio_secure = os.getenv('MINIO_SECURE', 'false').lower() == 'true' if not all([minio_endpoint, minio_access_key, minio_secret_key]): raise ValueError("MinIO配置不完整") return Minio( minio_endpoint, access_key=minio_access_key, secret_key=minio_secret_key, secure=minio_secure ) def extract_placeholders_from_docx(file_path: str) -> list: """ 从docx文件中提取所有占位符 Args: file_path: docx文件路径 Returns: 占位符列表,格式: ['field_code1', 'field_code2', ...] """ placeholders = set() pattern = r'\{\{([^}]+)\}\}' # 匹配 {{field_code}} 格式 try: doc = Document(file_path) # 从段落中提取占位符 for paragraph in doc.paragraphs: text = paragraph.text matches = re.findall(pattern, text) for match in matches: cleaned = match.strip() # 过滤掉不完整的占位符(包含 { 或 } 的) if cleaned and '{' not in cleaned and '}' not in cleaned: placeholders.add(cleaned) # 从表格中提取占位符 for table in doc.tables: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: text = paragraph.text matches = re.findall(pattern, text) for match in matches: cleaned = match.strip() # 过滤掉不完整的占位符(包含 { 或 } 的) if cleaned and '{' not in cleaned and '}' not in cleaned: placeholders.add(cleaned) except Exception as e: raise Exception(f"读取文件失败: {str(e)}") return sorted(list(placeholders)) @app.route('/api/template/upload', methods=['POST']) def upload_template(): """ 上传新模板文件 1. 接收上传的 Word 文档 2. 提取占位符 3. 匹配数据库中的字段 4. 返回未匹配的占位符列表,等待用户输入字段名称 """ try: # 检查是否有文件 if 'file' not in request.files: return error_response(400, "未找到上传的文件") file = request.files['file'] tenant_id = request.form.get('tenant_id', type=int) if not tenant_id: return error_response(400, "缺少 tenant_id 参数") if file.filename == '': return error_response(400, "文件名为空") # 检查文件扩展名 if not file.filename.lower().endswith('.docx'): return error_response(400, "只支持 .docx 格式的文件") # 保存临时文件 temp_dir = tempfile.mkdtemp() temp_file_path = os.path.join(temp_dir, secure_filename(file.filename)) file.save(temp_file_path) try: # 提取占位符 placeholders = extract_placeholders_from_docx(temp_file_path) # 查询数据库中的字段 conn = pymysql.connect( host=os.getenv('DB_HOST'), port=int(os.getenv('DB_PORT', 3306)), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_NAME'), charset='utf8mb4' ) cursor = conn.cursor(pymysql.cursors.DictCursor) # 查询所有字段 cursor.execute(""" SELECT id, name, filed_code, field_type, state FROM f_polic_field WHERE tenant_id = %s """, (tenant_id,)) fields = cursor.fetchall() cursor.close() conn.close() # 构建字段映射 field_map = {} for field in fields: state = field['state'] if isinstance(state, bytes): state = int.from_bytes(state, byteorder='big') if len(state) == 1 else 1 if state == 1: # 只使用启用的字段 field_map[field['filed_code']] = { 'id': field['id'], 'name': field['name'], 'field_type': field['field_type'] } # 匹配占位符 matched_fields = [] unmatched_placeholders = [] for placeholder in placeholders: if placeholder in field_map: matched_fields.append({ 'placeholder': placeholder, 'field_id': field_map[placeholder]['id'], 'field_name': field_map[placeholder]['name'], 'field_type': field_map[placeholder]['field_type'] }) else: unmatched_placeholders.append(placeholder) # 返回结果 return success_response({ 'filename': file.filename, 'placeholders': placeholders, 'matched_fields': matched_fields, 'unmatched_placeholders': unmatched_placeholders, 'temp_file_path': temp_file_path # 临时保存路径,用于后续保存 }, "模板解析成功") except Exception as e: # 清理临时文件 try: os.remove(temp_file_path) os.rmdir(temp_dir) except: pass raise e except Exception as e: return error_response(500, f"上传模板失败: {str(e)}") @app.route('/api/template/save', methods=['POST']) def save_template(): """ 保存模板 1. 接收模板信息(文件名、tenant_id、字段关联关系、新字段信息) 2. 创建新字段(如果有) 3. 上传文件到 MinIO 4. 保存模板到数据库 5. 保存字段关联关系 """ try: data = request.get_json() tenant_id = data.get('tenant_id') filename = data.get('filename') temp_file_path = data.get('temp_file_path') field_relations = data.get('field_relations', []) # [{field_id, field_type}, ...] new_fields = data.get('new_fields', []) # [{placeholder, name, field_type}, ...] template_name = data.get('template_name') # 用户输入的模板名称 if not all([tenant_id, filename, temp_file_path]): return error_response(400, "缺少必要参数") if not os.path.exists(temp_file_path): return error_response(400, "临时文件不存在") # 连接数据库 conn = pymysql.connect( host=os.getenv('DB_HOST'), port=int(os.getenv('DB_PORT', 3306)), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_NAME'), charset='utf8mb4' ) cursor = conn.cursor() created_by = 655162080928945152 # 默认创建者ID updated_by = 655162080928945152 try: # 1. 创建新字段(如果有) new_field_map = {} # placeholder -> field_id if new_fields: for new_field in new_fields: placeholder = new_field.get('placeholder') field_name = new_field.get('name') field_type = new_field.get('field_type', 2) # 默认为输出字段 if not placeholder or not field_name: continue # 生成字段ID field_id = generate_id() # 插入新字段 cursor.execute(""" INSERT INTO f_polic_field (id, tenant_id, name, filed_code, field_type, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """, (field_id, tenant_id, field_name, placeholder, field_type, created_by, updated_by)) new_field_map[placeholder] = field_id # 2. 保存文件到本地 template_finish 文件夹 # 获取项目根目录 project_root = Path(__file__).parent templates_dir = project_root / "template_finish" # 创建 uploaded 子目录用于存放新上传的模板 uploaded_dir = templates_dir / "uploaded" uploaded_dir.mkdir(parents=True, exist_ok=True) # 生成本地文件路径(使用时间戳确保文件名唯一) now = datetime.now() timestamp = now.strftime('%Y%m%d_%H%M%S') # 处理文件名,确保安全 safe_filename = secure_filename(filename) # 如果文件名已存在,添加时间戳前缀 local_file_path = uploaded_dir / f"{timestamp}_{safe_filename}" # 复制文件到本地 import shutil shutil.copy2(temp_file_path, local_file_path) # 生成相对路径(相对于项目根目录,使用正斜杠) local_path = local_file_path.relative_to(project_root) local_path_str = str(local_path).replace('\\', '/') # 3. 保存模板到数据库 template_id = generate_id() template_name_final = template_name or filename.replace('.docx', '') cursor.execute(""" INSERT INTO f_polic_file_config (id, tenant_id, parent_id, name, input_data, file_path, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, %s, %s, %s, NOW(), %s, NOW(), %s, 1) """, ( template_id, tenant_id, None, # parent_id template_name_final, json.dumps({'template_name': template_name_final}, ensure_ascii=False), local_path_str, created_by, updated_by )) # 4. 保存字段关联关系 all_field_ids = [] # 添加用户选择的字段关联 for relation in field_relations: field_id = relation.get('field_id') if field_id: all_field_ids.append(field_id) # 添加新创建的字段关联 for placeholder, field_id in new_field_map.items(): if field_id not in all_field_ids: all_field_ids.append(field_id) # 添加必需的输入字段 cursor.execute(""" SELECT id FROM f_polic_field WHERE tenant_id = %s AND filed_code IN ('clue_info', 'target_basic_info_clue') AND state = 1 """, (tenant_id,)) required_fields = cursor.fetchall() for row in required_fields: if row[0] not in all_field_ids: all_field_ids.append(row[0]) # 插入关联关系 for field_id in all_field_ids: cursor.execute(""" INSERT INTO f_polic_file_field (tenant_id, file_id, filed_id, created_time, created_by, updated_time, updated_by, state) VALUES (%s, %s, %s, NOW(), %s, NOW(), %s, 1) """, (tenant_id, template_id, field_id, created_by, updated_by)) conn.commit() # 清理临时文件 try: os.remove(temp_file_path) os.rmdir(os.path.dirname(temp_file_path)) except: pass return success_response({ 'template_id': template_id, 'template_name': template_name_final, 'file_path': local_path_str, 'field_count': len(all_field_ids) }, "模板保存成功") except Exception as e: conn.rollback() raise e finally: cursor.close() conn.close() except Exception as e: return error_response(500, f"保存模板失败: {str(e)}") if __name__ == '__main__': # 确保static目录存在 os.makedirs('static', exist_ok=True) port = int(os.getenv('PORT', 7500)) debug = os.getenv('DEBUG', 'False').lower() == 'true' print(f"服务启动在 http://localhost:{port}") print(f"测试页面: http://localhost:{port}/") print(f"模板字段管理页面: http://localhost:{port}/template-field-manager") print(f"Swagger API文档: http://localhost:{port}/api-docs") app.run(host='0.0.0.0', port=port, debug=debug)