ai-business-write/services/field_service.py

393 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
字段服务 - 从数据库获取字段配置
"""
import pymysql
import os
import json
from typing import List, Dict, Optional
from pathlib import Path
class FieldService:
"""字段服务类"""
def __init__(self):
self.db_config = {
'host': os.getenv('DB_HOST', '152.136.177.240'),
'port': int(os.getenv('DB_PORT', 5012)),
'user': os.getenv('DB_USER', 'finyx'),
'password': os.getenv('DB_PASSWORD', '6QsGK6MpePZDE57Z'),
'database': os.getenv('DB_NAME', 'finyx'),
'charset': 'utf8mb4'
}
self.tenant_id = 615873064429507639
# 加载提示词配置文件
self.prompt_config = self._load_prompt_config()
# 加载字段默认值配置
self.field_defaults = self._load_field_defaults()
def _load_prompt_config(self) -> Dict:
"""
加载提示词配置文件
Returns:
配置字典
"""
# 获取项目根目录
current_dir = Path(__file__).parent
project_root = current_dir.parent
config_path = project_root / 'config' / 'prompt_config.json'
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except FileNotFoundError:
# 如果配置文件不存在,使用默认配置
print(f"警告: 配置文件 {config_path} 不存在,使用默认配置")
return self._get_default_config()
except json.JSONDecodeError as e:
print(f"错误: 配置文件 {config_path} JSON格式错误: {e}")
return self._get_default_config()
def _get_default_config(self) -> Dict:
"""获取默认配置(作为后备方案)"""
return {
"prompt_template": {
"intro": "请从以下输入文本中提取结构化信息。",
"input_text_label": "输入文本:",
"output_fields_label": "需要提取的字段:",
"json_format_label": "请严格按照以下JSON格式返回结果只返回JSON不要包含其他文字说明",
"requirements_label": "要求:",
"requirements": [
"仔细分析输入文本,准确提取每个字段的值",
"如果某个字段在输入文本中找不到对应信息,该字段值设为空字符串\"\"",
"日期格式统一为YYYYMM198005表示1980年5月",
"性别统一为\"\"\"\"",
"政治面貌使用标准表述(如:中共党员、群众等)",
"只返回JSON对象不要包含markdown代码块标记"
]
},
"field_formatting": {
"input_field_format": "{field_code}: {field_value}",
"output_field_format": "- {field_name} (字段编码: {field_code})"
},
"business_type_rules": {
"INVESTIGATION": {
"description": "调查核实业务类型的特殊规则",
"additional_requirements": []
}
}
}
def _load_field_defaults(self) -> Dict:
"""
加载字段默认值配置文件
Returns:
字段默认值字典
"""
current_dir = Path(__file__).parent
project_root = current_dir.parent
config_path = project_root / 'config' / 'field_defaults.json'
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('field_defaults', {})
except FileNotFoundError:
print(f"警告: 默认值配置文件 {config_path} 不存在,使用空默认值")
return {}
except json.JSONDecodeError as e:
print(f"错误: 默认值配置文件 {config_path} JSON格式错误: {e}")
return {}
def get_field_default_value(self, field_code: str) -> Optional[str]:
"""
获取字段的默认值
Args:
field_code: 字段编码
Returns:
默认值字符串如果不存在则返回None
"""
return self.field_defaults.get(field_code)
def get_connection(self):
"""获取数据库连接"""
return pymysql.connect(**self.db_config)
def get_output_fields_by_field_codes(self, field_codes: List[str]) -> List[Dict]:
"""
根据字段编码列表获取输出字段列表
Args:
field_codes: 字段编码列表,如 ['userName', 'userAge']
Returns:
字段列表,每个字段包含: id, name, field_code, field_type
"""
if not field_codes:
return []
conn = self.get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 根据字段编码查询字段信息
placeholders = ','.join(['%s'] * len(field_codes))
sql = f"""
SELECT f.id, f.name, f.filed_code as field_code, f.field_type
FROM f_polic_field f
WHERE f.tenant_id = %s
AND f.filed_code IN ({placeholders})
AND f.field_type = 2
ORDER BY f.id
"""
cursor.execute(sql, [self.tenant_id] + field_codes)
fields = cursor.fetchall()
# 转换为字典列表
result = []
for field in fields:
result.append({
'id': field['id'],
'name': field['name'],
'field_code': field['field_code'],
'field_type': field['field_type']
})
return result
finally:
cursor.close()
conn.close()
def get_input_field_by_field_code(self, field_code: str) -> Optional[Dict]:
"""
根据字段编码获取输入字段信息
Args:
field_code: 字段编码
Returns:
字段信息字典如果不存在返回None
"""
conn = self.get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
sql = """
SELECT f.id, f.name, f.filed_code as field_code, f.field_type
FROM f_polic_field f
WHERE f.tenant_id = %s
AND f.filed_code = %s
AND f.field_type = 1
LIMIT 1
"""
cursor.execute(sql, (self.tenant_id, field_code))
field = cursor.fetchone()
if field:
return {
'id': field['id'],
'name': field['name'],
'field_code': field['field_code'],
'field_type': field['field_type']
}
return None
finally:
cursor.close()
conn.close()
def get_fields_by_business_type(self, business_type: str) -> Dict:
"""
获取业务类型的所有字段(包括输入和输出字段)
用于测试页面展示
Args:
business_type: 业务类型,如 'INVESTIGATION'
Returns:
包含input_fields和output_fields的字典
"""
import json
conn = self.get_connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
try:
# 获取输入字段field_type=1
sql_input = """
SELECT f.id, f.name, f.filed_code as field_code, f.field_type
FROM f_polic_field f
WHERE f.tenant_id = %s
AND f.field_type = 1
AND (f.filed_code = 'clue_info' OR f.filed_code = 'target_basic_info_clue')
ORDER BY f.id
"""
cursor.execute(sql_input, (self.tenant_id,))
input_fields = cursor.fetchall()
# 获取输出字段field_type=2
# 根据business_type从input_data的JSON中查找匹配的文件配置
sql_output = """
SELECT f.id, f.name, f.filed_code as field_code, f.field_type
FROM f_polic_field f
INNER JOIN f_polic_file_field ff ON f.id = ff.filed_id
INNER JOIN f_polic_file_config fc ON ff.file_id = fc.id
WHERE f.tenant_id = %s
AND f.field_type = 2
AND fc.state = 1
ORDER BY f.id
"""
cursor.execute(sql_output, (self.tenant_id,))
all_output_fields = cursor.fetchall()
# 根据business_type过滤输出字段
# 需要查询文件配置的input_data来匹配business_type
sql_file_configs = """
SELECT id, name, input_data
FROM f_polic_file_config
WHERE tenant_id = %s
AND state = 1
"""
cursor.execute(sql_file_configs, (self.tenant_id,))
file_configs = cursor.fetchall()
# 找到匹配business_type的文件配置ID列表
matching_file_ids = []
for fc in file_configs:
try:
input_data = json.loads(fc['input_data']) if fc['input_data'] else {}
if input_data.get('business_type') == business_type:
matching_file_ids.append(fc['id'])
except (json.JSONDecodeError, TypeError):
continue
# 过滤输出字段:只返回匹配的文件配置关联的字段
output_fields = []
if matching_file_ids:
# 获取这些文件配置关联的字段
placeholders = ','.join(['%s'] * len(matching_file_ids))
sql_filtered = f"""
SELECT DISTINCT f.id, f.name, f.filed_code as field_code, f.field_type
FROM f_polic_field f
INNER JOIN f_polic_file_field ff ON f.id = ff.filed_id
WHERE f.tenant_id = %s
AND f.field_type = 2
AND ff.file_id IN ({placeholders})
ORDER BY f.id
"""
cursor.execute(sql_filtered, [self.tenant_id] + matching_file_ids)
output_fields = cursor.fetchall()
return {
'input_fields': [
{
'id': f['id'],
'name': f['name'],
'field_code': f['field_code'],
'field_type': f['field_type']
}
for f in input_fields
],
'output_fields': [
{
'id': f['id'],
'name': f['name'],
'field_code': f['field_code'],
'field_type': f['field_type']
}
for f in output_fields
]
}
finally:
cursor.close()
conn.close()
def build_extract_prompt(self, input_data: List[Dict], output_fields: List[Dict]) -> str:
"""
构建AI提取提示词
Args:
input_data: 输入数据列表,格式: [{'fieldCode': 'xxx', 'fieldValue': 'xxx'}]
output_fields: 输出字段列表
Returns:
构建好的提示词
"""
# 获取配置
template = self.prompt_config.get('prompt_template', {})
formatting = self.prompt_config.get('field_formatting', {})
# 构建输入文本
input_field_format = formatting.get('input_field_format', '{field_code}: {field_value}')
input_text = ""
for item in input_data:
field_code = item.get('fieldCode', '')
field_value = item.get('fieldValue', '')
input_text += input_field_format.format(
field_code=field_code,
field_value=field_value
) + "\n"
# 构建输出字段说明(包含字段特定规则)
output_field_format = formatting.get('output_field_format', '- {field_name} (字段编码: {field_code})')
field_specific_rules = self.prompt_config.get('field_specific_rules', {})
output_fields_desc = ""
for field in output_fields:
field_name = field['name']
field_code = field['field_code']
field_desc = output_field_format.format(
field_name=field_name,
field_code=field_code
)
# 如果字段有特定规则,添加到说明中
if field_code in field_specific_rules:
field_rule = field_specific_rules[field_code]
field_desc += f"\n 说明:{field_rule.get('description', '')}"
if 'rules' in field_rule and field_rule['rules']:
field_desc += "\n 特殊要求:"
for rule in field_rule['rules']:
field_desc += f"\n - {rule}"
output_fields_desc += field_desc + "\n"
# 构建JSON格式示例
json_example = {}
for field in output_fields:
json_example[field['field_code']] = ""
# 获取要求列表
requirements = template.get('requirements', [])
# 构建要求文本
requirements_text = ""
for i, req in enumerate(requirements, 1):
requirements_text += f"{i}. {req}\n"
# 构建完整提示词
prompt = f"""{template.get('intro', '请从以下输入文本中提取结构化信息。')}
{template.get('input_text_label', '输入文本:')}
{input_text.strip()}
{template.get('output_fields_label', '需要提取的字段:')}
{output_fields_desc.strip()}
{template.get('json_format_label', '请严格按照以下JSON格式返回结果只返回JSON不要包含其他文字说明')}
{json.dumps(json_example, ensure_ascii=False, indent=2)}
{template.get('requirements_label', '要求:')}
{requirements_text.strip()}
"""
return prompt