添加API最大token数配置,增强JSON解析功能,新增清理和修复JSON字符串的方法,改进字段名规范化逻辑以提高数据提取准确性。

This commit is contained in:
python 2025-12-09 12:14:34 +08:00
parent d8fa4c3d7e
commit e31cd0b764

View File

@ -24,6 +24,11 @@ class AIService:
# 可以通过环境变量 HUAWEI_API_TIMEOUT 自定义默认180秒3分钟
self.api_timeout = int(os.getenv('HUAWEI_API_TIMEOUT', '180'))
# API最大token数配置
# 开启思考模式时模型可能生成更长的响应需要更多的token
# 可以通过环境变量 HUAWEI_API_MAX_TOKENS 自定义默认12000
self.api_max_tokens = int(os.getenv('HUAWEI_API_MAX_TOKENS', '12000'))
# 确定使用的AI服务
self.ai_provider = self._determine_ai_provider()
@ -214,7 +219,7 @@ class AIService:
"top_p": 0.95,
"top_k": 1,
"seed": 1,
"max_tokens": 8192,
"max_tokens": self.api_max_tokens,
"n": 1,
"enable_thinking": True
}
@ -286,7 +291,11 @@ class AIService:
extracted_data = self._extract_json_from_text(content)
if extracted_data:
print(f"[AI服务] JSON解析成功提取到 {len(extracted_data)} 个字段")
return extracted_data
print(f"[AI服务] 原始字段名: {list(extracted_data.keys())}")
# 规范化字段名并映射到正确的字段编码
normalized_data = self._normalize_field_names(extracted_data, output_fields)
print(f"[AI服务] 规范化后的字段名: {list(normalized_data.keys())}")
return normalized_data
# 如果无法提取JSON记录错误
print(f"[AI服务] 警告无法从内容中提取JSON尝试备用解析方法")
@ -318,27 +327,49 @@ class AIService:
json_end = text.find('```', json_start)
if json_end != -1:
json_str = text[json_start:json_end].strip()
# 尝试清理和修复JSON
json_str = self._clean_json_string(json_str)
try:
return json.loads(json_str)
except json.JSONDecodeError:
pass
except json.JSONDecodeError as e:
print(f"[AI服务] JSON解析失败代码块: {e}")
# 尝试修复后再次解析
json_str = self._fix_json_string(json_str)
try:
return json.loads(json_str)
except json.JSONDecodeError:
pass
if '```' in text:
json_start = text.find('```') + 3
json_end = text.find('```', json_start)
if json_end != -1:
json_str = text[json_start:json_end].strip()
# 如果不是json标记尝试解析
# 尝试清理和修复JSON
json_str = self._clean_json_string(json_str)
try:
return json.loads(json_str)
except json.JSONDecodeError:
pass
except json.JSONDecodeError as e:
print(f"[AI服务] JSON解析失败代码块: {e}")
# 尝试修复后再次解析
json_str = self._fix_json_string(json_str)
try:
return json.loads(json_str)
except json.JSONDecodeError:
pass
# 方法2: 尝试直接解析整个文本
cleaned_text = self._clean_json_string(text.strip())
try:
return json.loads(text.strip())
except json.JSONDecodeError:
pass
return json.loads(cleaned_text)
except json.JSONDecodeError as e:
print(f"[AI服务] JSON解析失败直接解析: {e}")
# 尝试修复后再次解析
fixed_text = self._fix_json_string(cleaned_text)
try:
return json.loads(fixed_text)
except json.JSONDecodeError:
pass
# 方法3: 尝试查找文本中的JSON对象以 { 开始,以 } 结束)
# 使用正则表达式找到最外层的JSON对象
@ -379,6 +410,176 @@ class AIService:
return None
def _clean_json_string(self, json_str: str) -> str:
"""
清理JSON字符串移除常见的格式问题
"""
# 移除前导/尾随空白
json_str = json_str.strip()
# 移除可能的BOM标记
if json_str.startswith('\ufeff'):
json_str = json_str[1:]
# 移除可能的XML/HTML标签残留
json_str = re.sub(r'<[^>]+>', '', json_str)
return json_str
def _fix_json_string(self, json_str: str) -> str:
"""
尝试修复常见的JSON格式错误
"""
# 移除末尾的逗号(在 } 或 ] 之前)
json_str = re.sub(r',\s*}', '}', json_str)
json_str = re.sub(r',\s*]', ']', json_str)
# 修复字段名中的错误(如 .target_gender -> target_gender
json_str = re.sub(r'["\']\.([^"\']+)["\']\s*:', r'"\1":', json_str)
# 修复字段名中的空格(如 "target name" -> "target_name"
json_str = re.sub(r'["\']([^"\']+)\s+([^"\']+)["\']\s*:', r'"\1_\2":', json_str)
# 尝试修复未加引号的字段名
json_str = re.sub(r'(\w+)\s*:', r'"\1":', json_str)
return json_str
def _normalize_field_names(self, extracted_data: Dict, output_fields: List[Dict]) -> Dict:
"""
规范化字段名将模型返回的各种字段名格式映射到正确的字段编码
Args:
extracted_data: 模型返回的原始数据字典
output_fields: 输出字段列表包含正确的字段编码
Returns:
规范化后的字段字典使用正确的字段编码作为key
"""
# 创建字段编码到字段信息的映射
field_code_map = {field['field_code']: field for field in output_fields}
# 创建字段名到字段编码的映射(支持多种变体)
name_to_code_map = {}
for field in output_fields:
field_code = field['field_code']
field_name = field.get('name', '')
# 添加标准字段编码
name_to_code_map[field_code] = field_code
# 添加字段名(如果有)
if field_name:
name_to_code_map[field_name] = field_code
# 处理驼峰命名变体(如 politicalStatus -> target_political_status
# 将 target_political_status 转换为可能的驼峰形式
if '_' in field_code:
parts = field_code.split('_')
# 生成驼峰形式targetPoliticalStatus
camel_case = parts[0] + ''.join(word.capitalize() for word in parts[1:])
name_to_code_map[camel_case] = field_code
# 生成首字母大写的驼峰形式TargetPoliticalStatus
pascal_case = ''.join(word.capitalize() for word in parts)
name_to_code_map[pascal_case] = field_code
# 处理去掉前缀的变体(如 name -> target_name
if field_code.startswith('target_'):
short_name = field_code.replace('target_', '')
name_to_code_map[short_name] = field_code
# 驼峰形式name -> target_name
camel_short = short_name.split('_')[0] + ''.join(word.capitalize() for word in short_name.split('_')[1:]) if '_' in short_name else short_name
name_to_code_map[camel_short] = field_code
# 添加常见的Schema.org格式字段名映射
schema_mapping = {
'name': 'target_name',
'gender': 'target_gender',
'dateOfBirth': 'target_date_of_birth',
'date_of_birth': 'target_date_of_birth',
'politicalStatus': 'target_political_status',
'political_status': 'target_political_status',
'organizationAndPosition': 'target_organization_and_position',
'organization_and_position': 'target_organization_and_position',
'organization': 'target_organization',
'position': 'target_position',
'educationLevel': 'target_education_level',
'education_level': 'target_education_level',
'professionalRank': 'target_professional_rank',
'professional_rank': 'target_professional_rank',
'clueSource': 'clue_source',
'clue_source': 'clue_source',
'issueDescription': 'target_issue_description',
'issue_description': 'target_issue_description',
'description': 'target_issue_description', # description可能是问题描述
'age': 'target_age',
}
# 添加Schema.org格式的映射仅当字段编码存在时
for schema_key, code in schema_mapping.items():
if code in field_code_map:
name_to_code_map[schema_key] = code
# 规范化数据
normalized_data = {}
for key, value in extracted_data.items():
# 跳过特殊字段(如 @context
if key.startswith('@'):
continue
# 处理嵌套对象(如 description: {violationOfFamilyPlanningPolicies: "..."}
if isinstance(value, dict):
# 尝试从嵌套对象中提取值
# 通常嵌套对象中只有一个值,取第一个非空值
nested_values = [v for v in value.values() if v and isinstance(v, str)]
if nested_values:
value = nested_values[0]
else:
# 如果嵌套对象中没有字符串值,尝试转换为字符串
value = str(value) if value else ''
# 清理字段名:去掉前导点、空格等
cleaned_key = key.strip().lstrip('.')
# 尝试直接匹配
if cleaned_key in name_to_code_map:
correct_code = name_to_code_map[cleaned_key]
normalized_data[correct_code] = value
continue
# 尝试不区分大小写匹配
for name, code in name_to_code_map.items():
if cleaned_key.lower() == name.lower():
normalized_data[code] = value
break
else:
# 如果找不到匹配,尝试模糊匹配
# 检查是否包含字段编码的关键部分
matched = False
for field_code in field_code_map.keys():
# 如果清理后的key包含字段编码的关键部分或者字段编码包含key的关键部分
key_parts = cleaned_key.lower().replace('_', '').replace('-', '')
code_parts = field_code.lower().replace('_', '').replace('-', '')
# 检查相似度(简单匹配)
if key_parts in code_parts or code_parts in key_parts:
normalized_data[field_code] = value
matched = True
print(f"[AI服务] 模糊匹配: '{cleaned_key}' -> '{field_code}'")
break
if not matched:
# 如果仍然找不到匹配,保留原字段名(可能模型返回了意外的字段)
print(f"[AI服务] 警告:无法匹配字段名 '{cleaned_key}',保留原字段名")
normalized_data[cleaned_key] = value
# 确保所有输出字段都有对应的值(即使为空字符串)
for field_code in field_code_map.keys():
if field_code not in normalized_data:
normalized_data[field_code] = ''
return normalized_data
def _parse_text_response(self, text: str, output_fields: List[Dict]) -> Dict:
"""
从文本响应中解析字段值备用方案