""" AI服务 - 封装大模型调用 仅支持华为大模型 """ import os import re import time import requests import json from typing import Dict, List, Optional class AIService: """AI服务类""" def __init__(self): # 华为大模型配置(必需) self.huawei_api_endpoint = os.getenv('HUAWEI_API_ENDPOINT', 'http://10.100.31.26:3001/v1/chat/completions') self.huawei_api_key = os.getenv('HUAWEI_API_KEY', 'sk-PoeiV3qwyTIRqcVc84E8E24cD2904872859a87922e0d9186') self.huawei_model = os.getenv('HUAWEI_MODEL', 'DeepSeek-R1-Distill-Llama-70B') # API超时配置(秒) # 开启思考模式时,响应时间会显著增加,需要更长的超时时间 # 可以通过环境变量 HUAWEI_API_TIMEOUT 自定义,默认180秒(3分钟) self.api_timeout = int(os.getenv('HUAWEI_API_TIMEOUT', '180')) # API最大token数配置 # 开启思考模式时,模型可能生成更长的响应,需要更多的token # 可以通过环境变量 HUAWEI_API_MAX_TOKENS 自定义,默认12000 self.api_max_tokens = int(os.getenv('HUAWEI_API_MAX_TOKENS', '12000')) # 确定使用的AI服务 self.ai_provider = self._determine_ai_provider() def _determine_ai_provider(self) -> str: """确定使用的AI服务提供商(仅支持华为大模型)""" if self.huawei_api_endpoint and self.huawei_api_key: return 'huawei' else: return 'none' def extract_fields(self, prompt: str, output_fields: List[Dict]) -> Optional[Dict]: """ 从提示词中提取结构化字段 Args: prompt: AI提示词 output_fields: 输出字段列表 Returns: 提取的字段字典,格式: {field_code: field_value} """ if self.ai_provider == 'none': raise Exception("未配置华为大模型服务,请设置 HUAWEI_API_KEY 和 HUAWEI_API_ENDPOINT") if self.ai_provider == 'huawei': return self._extract_with_huawei(prompt, output_fields) else: raise Exception(f"未知的AI服务提供商: {self.ai_provider}") def _extract_with_siliconflow(self, prompt: str, output_fields: List[Dict]) -> Optional[Dict]: """ 使用硅基流动API提取字段(已不再使用,仅保留用于参考) 系统仅支持华为大模型,不再支持自动回退 """ try: payload = { "model": self.siliconflow_model, "messages": [ { "role": "system", "content": "你是一个专业的数据提取助手,能够从文本中准确提取结构化信息。请严格按照JSON格式返回结果。" }, { "role": "user", "content": prompt } ], "temperature": 0.3, "max_tokens": 2000 } headers = { "Authorization": f"Bearer {self.siliconflow_api_key}", "Content-Type": "application/json" } response = requests.post( self.siliconflow_url, json=payload, headers=headers, timeout=30 ) if response.status_code != 200: raise Exception(f"API调用失败: {response.status_code} - {response.text}") result = response.json() # 提取AI返回的内容 if 'choices' in result and len(result['choices']) > 0: content = result['choices'][0]['message']['content'] # 尝试解析JSON try: # 如果返回的是代码块,提取JSON部分 if '```json' in content: json_start = content.find('```json') + 7 json_end = content.find('```', json_start) content = content[json_start:json_end].strip() elif '```' in content: json_start = content.find('```') + 3 json_end = content.find('```', json_start) content = content[json_start:json_end].strip() extracted_data = json.loads(content) return extracted_data except json.JSONDecodeError: # 如果不是JSON,尝试从文本中提取 return self._parse_text_response(content, output_fields) else: raise Exception("API返回格式异常") except requests.exceptions.Timeout: raise Exception("AI服务调用超时") except Exception as e: raise Exception(f"AI服务调用失败: {str(e)}") def _extract_with_huawei(self, prompt: str, output_fields: List[Dict]) -> Optional[Dict]: """ 使用华为大模型API提取字段(带重试机制) 至少重试3次,总共最多尝试4次 """ max_retries = 3 # 最多重试3次,总共4次尝试 retry_delay = 2 # 重试延迟(秒),每次重试延迟递增(从2秒开始) last_exception = None for attempt in range(max_retries + 1): # 0, 1, 2, 3 (总共4次) try: if attempt > 0: # 重试前等待,延迟时间递增(2秒、4秒、6秒) wait_time = retry_delay * attempt print(f"[AI服务] 第 {attempt} 次重试,等待 {wait_time} 秒后重试...") time.sleep(wait_time) print(f"[AI服务] 正在调用华为大模型API (尝试 {attempt + 1}/{max_retries + 1})...") result = self._call_huawei_api_once(prompt, output_fields) if result is not None: if attempt > 0: print(f"[AI服务] 重试成功!") return result except requests.exceptions.Timeout as e: last_exception = e error_msg = f"AI服务调用超时 (尝试 {attempt + 1}/{max_retries + 1})" print(f"[AI服务] {error_msg}") if attempt < max_retries: continue else: raise Exception(f"{error_msg}: {str(e)}") except requests.exceptions.ConnectionError as e: last_exception = e error_msg = f"连接错误 (尝试 {attempt + 1}/{max_retries + 1})" print(f"[AI服务] {error_msg}: {str(e)}") if attempt < max_retries: continue else: raise Exception(f"{error_msg}: {str(e)}") except requests.exceptions.RequestException as e: last_exception = e error_msg = f"请求异常 (尝试 {attempt + 1}/{max_retries + 1})" print(f"[AI服务] {error_msg}: {str(e)}") if attempt < max_retries: continue else: raise Exception(f"{error_msg}: {str(e)}") except Exception as e: last_exception = e error_msg = f"AI服务调用失败 (尝试 {attempt + 1}/{max_retries + 1})" print(f"[AI服务] {error_msg}: {str(e)}") # 对于其他类型的错误,也进行重试 if attempt < max_retries: continue else: raise Exception(f"{error_msg}: {str(e)}") # 如果所有重试都失败了 if last_exception: raise Exception(f"AI服务调用失败,已重试 {max_retries} 次: {str(last_exception)}") else: raise Exception(f"AI服务调用失败,已重试 {max_retries} 次") def _call_huawei_api_once(self, prompt: str, output_fields: List[Dict]) -> Optional[Dict]: """ 单次调用华为大模型API(不包含重试逻辑) """ payload = { "model": self.huawei_model, "messages": [ { "role": "system", "content": "你是一个专业的数据提取助手。请仔细分析用户提供的输入文本,提取所有相关信息,并严格按照指定的JSON格式返回结果。\n\n重要要求:\n1. 必须仔细阅读输入文本的每一个字,不要遗漏任何信息\n2. 对于每个字段,请从多个角度思考:直接提及、同义词、隐含信息、可推断信息\n3. 如果文本中明确提到某个信息(如性别、年龄、职务等),必须提取出来,不能设为空\n4. 如果可以通过已有信息合理推断(如根据出生年月推算年龄,从单位及职务中拆分单位和职务),请进行推断并填写\n5. 只返回JSON对象,不要包含任何其他文字说明、思考过程或markdown代码块标记" }, { "role": "user", "content": prompt } ], "stream": False, "presence_penalty": 1.05, # 提高presence_penalty,鼓励模型提取更多不同字段 "frequency_penalty": 1.02, # 提高frequency_penalty,减少重复 "repetition_penalty": 1.05, # 提高repetition_penalty,避免重复 "temperature": 0.2, # 降低temperature,提高确定性 "top_p": 0.9, # 降低top_p,更聚焦 "top_k": 40, # 增加top_k,允许更多选择 "seed": 1, "max_tokens": self.api_max_tokens, "n": 1, "enable_thinking": True } headers = { "Authorization": f"Bearer {self.huawei_api_key}", "Content-Type": "application/json" } # 根据是否开启思考模式动态调整超时时间 # 开启思考模式时,模型需要更多时间进行推理,超时时间需要更长 enable_thinking = payload.get('enable_thinking', False) if enable_thinking: # 思考模式:使用配置的超时时间(默认180秒) timeout = self.api_timeout print(f"[AI服务] 思考模式已开启,使用超时时间: {timeout}秒") else: # 非思考模式:使用较短的超时时间 timeout = min(self.api_timeout, 120) # 最多120秒 print(f"[AI服务] 思考模式未开启,使用超时时间: {timeout}秒") response = requests.post( self.huawei_api_endpoint, json=payload, headers=headers, timeout=timeout ) if response.status_code != 200: raise Exception(f"API调用失败: {response.status_code} - {response.text}") result = response.json() # 提取AI返回的内容 if 'choices' in result and len(result['choices']) > 0: raw_content = result['choices'][0]['message']['content'] # 调试:打印原始返回内容(前500字符) print(f"[AI服务] API返回的原始内容(前500字符): {raw_content[:500]}") # 处理思考过程标签(支持多种可能的标签格式) content = raw_content # 处理 标签(DeepSeek-R1常用格式) if '' in content: parts = content.split('') if len(parts) > 1: content = parts[-1].strip() print(f"[AI服务] 检测到 标签,提取标签后的内容") # 处理 标签 elif '' in content: parts = content.split('') if len(parts) > 1: content = parts[-1].strip() print(f"[AI服务] 检测到 标签,提取标签后的内容") # 处理 ... 标签 elif '' in content and '' in content: reasoning_start = content.find('') if reasoning_start != -1: content = content[reasoning_start + 11:].strip() print(f"[AI服务] 检测到 标签,提取标签后的内容") # 清理后的内容(前500字符) print(f"[AI服务] 清理后的内容(前500字符): {content[:500]}") # 尝试解析JSON extracted_data = self._extract_json_from_text(content) if extracted_data: print(f"[AI服务] JSON解析成功,提取到 {len(extracted_data)} 个字段") print(f"[AI服务] 原始字段名: {list(extracted_data.keys())}") # 规范化字段名并映射到正确的字段编码 normalized_data = self._normalize_field_names(extracted_data, output_fields) print(f"[AI服务] 规范化后的字段名: {list(normalized_data.keys())}") # 打印关键字段的值用于调试 for key in ['target_name', 'target_gender', 'target_age', 'target_date_of_birth']: if key in normalized_data: print(f"[AI服务] 规范化后 {key} = '{normalized_data[key]}'") # 规范化日期格式 normalized_data = self._normalize_date_formats(normalized_data, output_fields) # 再次打印关键字段的值用于调试 for key in ['target_name', 'target_gender', 'target_age', 'target_date_of_birth']: if key in normalized_data: print(f"[AI服务] 日期格式化后 {key} = '{normalized_data[key]}'") # 后处理:从已有信息推断缺失字段 normalized_data = self._post_process_inferred_fields(normalized_data, output_fields) # 打印后处理后的关键字段 for key in ['target_name', 'target_gender', 'target_age', 'target_date_of_birth', 'target_organization', 'target_position']: if key in normalized_data: print(f"[AI服务] 后处理后 {key} = '{normalized_data[key]}'") return normalized_data # 如果无法提取JSON,记录错误 print(f"[AI服务] 警告:无法从内容中提取JSON,尝试备用解析方法") print(f"[AI服务] 完整内容: {content}") # 尝试从文本中提取 parsed_data = self._parse_text_response(content, output_fields) if parsed_data and any(v for v in parsed_data.values() if v): # 至少有一个非空字段 print(f"[AI服务] 使用备用方法解析成功,提取到 {len(parsed_data)} 个字段") return parsed_data # 如果所有方法都失败,抛出异常 raise Exception(f"无法从API返回内容中提取JSON数据。原始内容长度: {len(raw_content)}, 清理后内容长度: {len(content)}。请检查API返回的内容格式是否正确。") else: raise Exception("API返回格式异常:未找到choices字段或choices为空") def _extract_json_from_text(self, text: str) -> Optional[Dict]: """ 从文本中提取JSON对象 支持多种格式: 1. 纯JSON对象 2. 包裹在 ```json 代码块中的JSON 3. 包裹在 ``` 代码块中的JSON 4. 文本中包含的JSON对象 """ # 方法1: 尝试提取代码块中的JSON if '```json' in text: json_start = text.find('```json') + 7 json_end = text.find('```', json_start) if json_end != -1: json_str = text[json_start:json_end].strip() # 尝试清理和修复JSON json_str = self._clean_json_string(json_str) try: return json.loads(json_str) except json.JSONDecodeError as e: print(f"[AI服务] JSON解析失败(代码块): {e}") # 尝试修复后再次解析 json_str = self._fix_json_string(json_str) try: return json.loads(json_str) except json.JSONDecodeError: pass if '```' in text: json_start = text.find('```') + 3 json_end = text.find('```', json_start) if json_end != -1: json_str = text[json_start:json_end].strip() # 尝试清理和修复JSON json_str = self._clean_json_string(json_str) try: return json.loads(json_str) except json.JSONDecodeError as e: print(f"[AI服务] JSON解析失败(代码块): {e}") # 尝试修复后再次解析 json_str = self._fix_json_string(json_str) try: return json.loads(json_str) except json.JSONDecodeError: pass # 方法2: 尝试直接解析整个文本 cleaned_text = self._clean_json_string(text.strip()) try: return json.loads(cleaned_text) except json.JSONDecodeError as e: print(f"[AI服务] JSON解析失败(直接解析): {e}") # 尝试修复后再次解析 fixed_text = self._fix_json_string(cleaned_text) try: return json.loads(fixed_text) except json.JSONDecodeError: pass # 方法3: 尝试查找文本中的JSON对象(以 { 开始,以 } 结束) # 使用正则表达式找到最外层的JSON对象 json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}' matches = re.finditer(json_pattern, text, re.DOTALL) for match in matches: json_str = match.group(0) try: data = json.loads(json_str) # 验证是否包含预期的字段(至少有一个输出字段的key) if isinstance(data, dict) and len(data) > 0: return data except json.JSONDecodeError: continue # 方法4: 尝试查找嵌套的JSON对象(更复杂的匹配) # 找到第一个 { 和最后一个匹配的 } start_idx = text.find('{') if start_idx != -1: brace_count = 0 end_idx = start_idx for i in range(start_idx, len(text)): if text[i] == '{': brace_count += 1 elif text[i] == '}': brace_count -= 1 if brace_count == 0: end_idx = i break if end_idx > start_idx: json_str = text[start_idx:end_idx + 1] try: return json.loads(json_str) except json.JSONDecodeError: pass return None def _clean_json_string(self, json_str: str) -> str: """ 清理JSON字符串,移除常见的格式问题 """ # 移除前导/尾随空白 json_str = json_str.strip() # 移除可能的BOM标记 if json_str.startswith('\ufeff'): json_str = json_str[1:] # 移除可能的XML/HTML标签残留 json_str = re.sub(r'<[^>]+>', '', json_str) return json_str def _fix_json_string(self, json_str: str) -> str: """ 尝试修复常见的JSON格式错误 """ # 移除末尾的逗号(在 } 或 ] 之前) json_str = re.sub(r',\s*}', '}', json_str) json_str = re.sub(r',\s*]', ']', json_str) # 修复字段名中的错误(如 .target_gender -> target_gender) # 处理前导点和尾随空格 json_str = re.sub(r'["\']\s*\.([^"\']+?)\s*["\']\s*:', r'"\1":', json_str) json_str = re.sub(r'["\']\.([^"\']+?)["\']\s*:', r'"\1":', json_str) # 修复字段名中的空格(如 "target name" -> "target_name") json_str = re.sub(r'["\']([^"\']+?)\s+([^"\']+?)["\']\s*:', r'"\1_\2":', json_str) # 修复字段名中的尾随空格(如 "target_gender " -> "target_gender") json_str = re.sub(r'["\']([^"\']+?)\s+["\']\s*:', r'"\1":', json_str) # 修复字段名中的前导空格(如 " target_gender" -> "target_gender") json_str = re.sub(r'["\']\s+([^"\']+?)["\']\s*:', r'"\1":', json_str) # 尝试修复未加引号的字段名(但要避免破坏字符串值) # 只修复在冒号前的未加引号的标识符 json_str = re.sub(r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', json_str) return json_str def _normalize_field_names(self, extracted_data: Dict, output_fields: List[Dict]) -> Dict: """ 规范化字段名,将模型返回的各种字段名格式映射到正确的字段编码 Args: extracted_data: 模型返回的原始数据字典 output_fields: 输出字段列表,包含正确的字段编码 Returns: 规范化后的字段字典,使用正确的字段编码作为key """ # 创建字段编码到字段信息的映射 field_code_map = {field['field_code']: field for field in output_fields} # 创建字段名到字段编码的映射(支持多种变体) name_to_code_map = {} for field in output_fields: field_code = field['field_code'] field_name = field.get('name', '') # 添加标准字段编码 name_to_code_map[field_code] = field_code # 添加字段名(如果有) if field_name: name_to_code_map[field_name] = field_code # 处理驼峰命名变体(如 politicalStatus -> target_political_status) # 将 target_political_status 转换为可能的驼峰形式 if '_' in field_code: parts = field_code.split('_') # 生成驼峰形式:targetPoliticalStatus camel_case = parts[0] + ''.join(word.capitalize() for word in parts[1:]) name_to_code_map[camel_case] = field_code # 生成首字母大写的驼峰形式:TargetPoliticalStatus pascal_case = ''.join(word.capitalize() for word in parts) name_to_code_map[pascal_case] = field_code # 处理去掉前缀的变体(如 name -> target_name) if field_code.startswith('target_'): short_name = field_code.replace('target_', '') name_to_code_map[short_name] = field_code # 驼峰形式:name -> target_name camel_short = short_name.split('_')[0] + ''.join(word.capitalize() for word in short_name.split('_')[1:]) if '_' in short_name else short_name name_to_code_map[camel_short] = field_code # 添加常见的Schema.org格式字段名映射 schema_mapping = { 'name': 'target_name', 'gender': 'target_gender', 'dateOfBirth': 'target_date_of_birth', 'date_of_birth': 'target_date_of_birth', 'politicalStatus': 'target_political_status', 'political_status': 'target_political_status', 'organizationAndPosition': 'target_organization_and_position', 'organization_and_position': 'target_organization_and_position', 'organization': 'target_organization', 'position': 'target_position', 'educationLevel': 'target_education_level', 'education_level': 'target_education_level', 'professionalRank': 'target_professional_rank', 'professional_rank': 'target_professional_rank', 'clueSource': 'clue_source', 'clue_source': 'clue_source', 'issueDescription': 'target_issue_description', 'issue_description': 'target_issue_description', 'description': 'target_issue_description', # description可能是问题描述 'age': 'target_age', } # 添加Schema.org格式的映射(仅当字段编码存在时) for schema_key, code in schema_mapping.items(): if code in field_code_map: name_to_code_map[schema_key] = code # 添加常见拼写错误的映射(如 targetsProfessionalRank -> target_professional_rank) typo_mapping = { 'targetsProfessionalRank': 'target_professional_rank', 'targetProfessionalRank': 'target_professional_rank', 'targets_professional_rank': 'target_professional_rank', 'targetsProfessional': 'target_professional_rank', 'professionalRank': 'target_professional_rank', 'targetGender': 'target_gender', 'targetsGender': 'target_gender', 'targetDateOfBirth': 'target_date_of_birth', 'targetsDateOfBirth': 'target_date_of_birth', 'targetPoliticalStatus': 'target_political_status', 'targetsPoliticalStatus': 'target_political_status', 'targetOrganizationAndPosition': 'target_organization_and_position', 'targetsOrganizationAndPosition': 'target_organization_and_position', 'targetOrganization': 'target_organization', 'targetsOrganization': 'target_organization', 'targetPosition': 'target_position', 'targetsPosition': 'target_position', 'targetEducationLevel': 'target_education_level', 'targetsEducationLevel': 'target_education_level', 'targetAge': 'target_age', 'targetsAge': 'target_age', 'targetIssueDescription': 'target_issue_description', 'targetsIssueDescription': 'target_issue_description', } # 添加拼写错误映射(仅当字段编码存在时) for typo_key, code in typo_mapping.items(): if code in field_code_map: name_to_code_map[typo_key] = code # 规范化数据 normalized_data = {} for key, value in extracted_data.items(): # 跳过特殊字段(如 @context) if key.startswith('@'): continue # 跳过空字段名 if not key or not key.strip(): print(f"[AI服务] 跳过空字段名,值为: '{value}'") continue # 处理嵌套对象(如 description: {violationOfFamilyPlanningPolicies: "..."}) if isinstance(value, dict): # 尝试从嵌套对象中提取值 # 通常嵌套对象中只有一个值,取第一个非空值 nested_values = [v for v in value.values() if v and isinstance(v, str)] if nested_values: value = nested_values[0] else: # 如果嵌套对象中没有字符串值,尝试转换为字符串 value = str(value) if value else '' # 清理字段名:去掉前导点、空格等 cleaned_key = key.strip().lstrip('.').rstrip() # 如果清理后字段名为空,跳过 if not cleaned_key: print(f"[AI服务] 跳过清理后为空字段名,原始key: '{key}', 值为: '{value}'") continue # 尝试直接匹配 if cleaned_key in name_to_code_map: correct_code = name_to_code_map[cleaned_key] normalized_data[correct_code] = value continue # 尝试不区分大小写匹配 matched = False for name, code in name_to_code_map.items(): if cleaned_key.lower() == name.lower(): normalized_data[code] = value matched = True break if not matched: # 尝试模糊匹配:处理拼写错误(如 targetsProfessionalRank -> target_professional_rank) # 移除常见的前缀/后缀错误(如 targets -> target) normalized_key = cleaned_key if normalized_key.startswith('targets'): normalized_key = 'target' + normalized_key[7:] # targets -> target elif normalized_key.startswith('targets_'): normalized_key = 'target_' + normalized_key[8:] # targets_ -> target_ # 尝试匹配规范化后的key if normalized_key in name_to_code_map: correct_code = name_to_code_map[normalized_key] normalized_data[correct_code] = value matched = True print(f"[AI服务] 拼写修正: '{cleaned_key}' -> '{normalized_key}' -> '{correct_code}'") elif normalized_key.lower() in [k.lower() for k in name_to_code_map.keys()]: for name, code in name_to_code_map.items(): if normalized_key.lower() == name.lower(): normalized_data[code] = value matched = True print(f"[AI服务] 拼写修正(不区分大小写): '{cleaned_key}' -> '{normalized_key}' -> '{code}'") break if not matched: # 如果找不到匹配,尝试模糊匹配 # 但跳过空字段名,避免错误匹配 if cleaned_key: # 检查是否包含字段编码的关键部分 for field_code in field_code_map.keys(): # 如果清理后的key包含字段编码的关键部分,或者字段编码包含key的关键部分 key_parts = cleaned_key.lower().replace('_', '').replace('-', '').replace('targets', 'target') code_parts = field_code.lower().replace('_', '').replace('-', '') # 检查相似度(简单匹配),但要求key_parts不为空 if key_parts and (key_parts in code_parts or code_parts in key_parts): # 如果该字段已经有值,且新值为空,则不覆盖 if field_code in normalized_data and normalized_data[field_code] and not value: print(f"[AI服务] 跳过模糊匹配(已有非空值): '{cleaned_key}' -> '{field_code}' (已有值: '{normalized_data[field_code]}')") matched = True break normalized_data[field_code] = value matched = True print(f"[AI服务] 模糊匹配: '{cleaned_key}' -> '{field_code}'") break if not matched: # 如果仍然找不到匹配,保留原字段名(可能模型返回了意外的字段) # 但跳过空字段名 if cleaned_key: print(f"[AI服务] 警告:无法匹配字段名 '{cleaned_key}',保留原字段名") normalized_data[cleaned_key] = value else: print(f"[AI服务] 跳过空字段名,无法匹配") # 确保所有输出字段都有对应的值(即使为空字符串) for field_code in field_code_map.keys(): if field_code not in normalized_data: normalized_data[field_code] = '' return normalized_data def _normalize_date_formats(self, data: Dict, output_fields: List[Dict]) -> Dict: """ 规范化日期格式,确保日期格式正确 输出格式:YYYY年MM月 或 YYYY年MM月DD日 Args: data: 提取的数据字典 output_fields: 输出字段列表 Returns: 规范化后的数据字典 """ # 创建字段编码到字段信息的映射 field_code_map = {field['field_code']: field for field in output_fields} # 处理出生年月字段 (target_date_of_birth) if 'target_date_of_birth' in data and data['target_date_of_birth']: date_value = str(data['target_date_of_birth']).strip() if date_value: # 尝试规范化日期格式为 YYYY年MM月 normalized_date = self._normalize_date_to_chinese_yyyymm(date_value) if normalized_date and normalized_date != date_value: print(f"[AI服务] 日期格式规范化: '{date_value}' -> '{normalized_date}'") data['target_date_of_birth'] = normalized_date # 处理出生年月日字段 (target_date_of_birth_full) if 'target_date_of_birth_full' in data and data['target_date_of_birth_full']: date_value = str(data['target_date_of_birth_full']).strip() if date_value: # 尝试规范化日期格式为 YYYY年MM月DD日 normalized_date = self._normalize_date_to_chinese_yyyymmdd(date_value) if normalized_date and normalized_date != date_value: print(f"[AI服务] 日期格式规范化: '{date_value}' -> '{normalized_date}'") data['target_date_of_birth_full'] = normalized_date return data def _normalize_date_to_chinese_yyyymm(self, date_str: str) -> Optional[str]: """ 将日期字符串规范化为 YYYY年MM月 格式(中文格式) Args: date_str: 日期字符串,可能是各种格式 Returns: 规范化后的日期字符串(YYYY年MM月格式),如果无法解析则返回原值 """ if not date_str: return None date_str = date_str.strip() # 如果已经是中文格式(YYYY年MM月),检查并规范化 match = re.search(r'(\d{4})年(\d{1,2})月', date_str) if match: year = match.group(1) month = match.group(2).zfill(2) # 补零到2位 if 1 <= int(month) <= 12: return f"{year}年{month}月" # 如果是6位数字格式(YYYYMM),转换为中文格式 if re.match(r'^\d{6}$', date_str): year = date_str[:4] month = date_str[4:].lstrip('0') or '01' # 去掉前导零,但如果全是0则设为01 month = month.zfill(2) # 补零到2位 if 1 <= int(month) <= 12: return f"{year}年{month}月" # 如果是5位数字(如19805),尝试修复 if re.match(r'^\d{5}$', date_str): year = date_str[:4] month = date_str[4:].zfill(2) if 1 <= int(month) <= 12: return f"{year}年{month}月" # 格式2: "1980-5" 或 "1980-05" match = re.search(r'(\d{4})-(\d{1,2})', date_str) if match: year = match.group(1) month = match.group(2).zfill(2) if 1 <= int(month) <= 12: return f"{year}年{month}月" # 格式3: "1980/5" 或 "1980/05" match = re.search(r'(\d{4})/(\d{1,2})', date_str) if match: year = match.group(1) month = match.group(2).zfill(2) if 1 <= int(month) <= 12: return f"{year}年{month}月" # 如果只有年份,补充月份为01 if re.match(r'^\d{4}$', date_str): return f"{date_str}年01月" # 如果无法解析,返回原值 return date_str def _normalize_date_to_chinese_yyyymmdd(self, date_str: str) -> Optional[str]: """ 将日期字符串规范化为 YYYY年MM月DD日 格式(中文格式) Args: date_str: 日期字符串,可能是各种格式 Returns: 规范化后的日期字符串(YYYY年MM月DD日格式),如果无法解析则返回原值 """ if not date_str: return None date_str = date_str.strip() # 如果已经是中文格式(YYYY年MM月DD日),检查并规范化 match = re.search(r'(\d{4})年(\d{1,2})月(\d{1,2})日', date_str) if match: year = match.group(1) month = match.group(2).zfill(2) # 补零到2位 day = match.group(3).zfill(2) # 补零到2位 if 1 <= int(month) <= 12 and 1 <= int(day) <= 31: return f"{year}年{month}月{day}日" # 如果是8位数字格式(YYYYMMDD),转换为中文格式 if re.match(r'^\d{8}$', date_str): year = date_str[:4] month = date_str[4:6].lstrip('0') or '01' month = month.zfill(2) day = date_str[6:8].lstrip('0') or '01' day = day.zfill(2) if 1 <= int(month) <= 12 and 1 <= int(day) <= 31: return f"{year}年{month}月{day}日" # 尝试解析各种日期格式 # 格式2: "1980-5-15" 或 "1980-05-15" match = re.search(r'(\d{4})-(\d{1,2})-(\d{1,2})', date_str) if match: year = match.group(1) month = match.group(2).zfill(2) day = match.group(3).zfill(2) if 1 <= int(month) <= 12 and 1 <= int(day) <= 31: return f"{year}年{month}月{day}日" # 格式3: "1980/5/15" 或 "1980/05/15" match = re.search(r'(\d{4})/(\d{1,2})/(\d{1,2})', date_str) if match: year = match.group(1) month = match.group(2).zfill(2) day = match.group(3).zfill(2) if 1 <= int(month) <= 12 and 1 <= int(day) <= 31: return f"{year}年{month}月{day}日" # 如果只有年月,补充日期为01日 normalized_yyyymm = self._normalize_date_to_chinese_yyyymm(date_str) if normalized_yyyymm and '年' in normalized_yyyymm and '月' in normalized_yyyymm: # 从"YYYY年MM月"中提取年月,补充日期 match = re.search(r'(\d{4})年(\d{2})月', normalized_yyyymm) if match: year = match.group(1) month = match.group(2) return f"{year}年{month}月01日" # 如果无法解析,返回原值 return date_str def _post_process_inferred_fields(self, data: Dict, output_fields: List[Dict]) -> Dict: """ 后处理:从已有信息推断缺失字段 Args: data: 提取的数据字典 output_fields: 输出字段列表 Returns: 后处理后的数据字典 """ # 创建字段编码到字段信息的映射 field_code_map = {field['field_code']: field for field in output_fields} # 1. 从出生年月计算年龄 if 'target_age' in field_code_map and (not data.get('target_age') or data.get('target_age') == ''): if 'target_date_of_birth' in data and data.get('target_date_of_birth'): age = self._calculate_age_from_birth_date(data['target_date_of_birth']) if age: data['target_age'] = str(age) print(f"[AI服务] 后处理:从出生年月 '{data['target_date_of_birth']}' 计算年龄: {age}岁") # 2. 从单位及职务中拆分单位和职务 if 'target_organization_and_position' in data and data.get('target_organization_and_position'): org_pos = data['target_organization_and_position'] # 拆分单位 if 'target_organization' in field_code_map and (not data.get('target_organization') or data.get('target_organization') == ''): org = self._extract_organization_from_org_pos(org_pos) if org: data['target_organization'] = org print(f"[AI服务] 后处理:从单位及职务 '{org_pos}' 提取单位: {org}") # 拆分职务 if 'target_position' in field_code_map and (not data.get('target_position') or data.get('target_position') == ''): pos = self._extract_position_from_org_pos(org_pos) if pos: data['target_position'] = pos print(f"[AI服务] 后处理:从单位及职务 '{org_pos}' 提取职务: {pos}") return data def _calculate_age_from_birth_date(self, birth_date: str) -> Optional[int]: """ 从出生年月计算年龄 Args: birth_date: 出生年月,格式如 "1980年05月" 或 "198005" Returns: 年龄(整数),如果无法计算则返回None """ if not birth_date: return None birth_date = str(birth_date).strip() # 提取年份 year_match = re.search(r'(\d{4})', birth_date) if not year_match: return None birth_year = int(year_match.group(1)) current_year = 2024 # 当前年份 # 计算年龄 age = current_year - birth_year # 验证年龄合理性(0-150岁) if 0 <= age <= 150: return age return None def _extract_organization_from_org_pos(self, org_pos: str) -> Optional[str]: """ 从单位及职务中提取单位名称 Args: org_pos: 单位及职务,如 "某公司总经理" Returns: 单位名称,如 "某公司" """ if not org_pos: return None org_pos = str(org_pos).strip() # 常见职务关键词 position_keywords = [ '总经理', '经理', '局长', '处长', '科长', '主任', '书记', '部长', '副部长', '副经理', '副局长', '副处长', '副科长', '副主任', '副书记', '董事长', '副董事长', '总裁', '副总裁', '总监', '副总监', '部长', '副部长', '司长', '副司长', '厅长', '副厅长', '市长', '副市长', '县长', '副县长', '乡长', '副乡长', '镇长', '副镇长', '村长', '副村长' ] # 尝试匹配:单位 + 职务 for pos_keyword in position_keywords: if pos_keyword in org_pos: # 找到职务位置,提取前面的单位部分 pos_index = org_pos.find(pos_keyword) if pos_index > 0: org = org_pos[:pos_index].strip() if org: return org # 如果没有找到明确的职务关键词,尝试其他模式 # 例如:"XX公司XX部门XX职务" # 这里简单返回,可能需要更复杂的逻辑 return None def _extract_position_from_org_pos(self, org_pos: str) -> Optional[str]: """ 从单位及职务中提取职务名称 Args: org_pos: 单位及职务,如 "某公司总经理" Returns: 职务名称,如 "总经理" """ if not org_pos: return None org_pos = str(org_pos).strip() # 常见职务关键词 position_keywords = [ '总经理', '经理', '局长', '处长', '科长', '主任', '书记', '部长', '副部长', '副经理', '副局长', '副处长', '副科长', '副主任', '副书记', '董事长', '副董事长', '总裁', '副总裁', '总监', '副总监', '部长', '副部长', '司长', '副司长', '厅长', '副厅长', '市长', '副市长', '县长', '副县长', '乡长', '副乡长', '镇长', '副镇长', '村长', '副村长' ] # 按长度从长到短排序,优先匹配长关键词(如"副经理"优先于"经理") position_keywords.sort(key=len, reverse=True) # 尝试匹配职务关键词 for pos_keyword in position_keywords: if pos_keyword in org_pos: return pos_keyword return None def _parse_text_response(self, text: str, output_fields: List[Dict]) -> Dict: """ 从文本响应中解析字段值(备用方案) """ result = {} for field in output_fields: field_code = field['field_code'] field_name = field['name'] # 尝试在文本中查找字段值 # 这里使用简单的关键词匹配,实际可以更复杂 if field_name in text: # 提取字段值(简单实现) start_idx = text.find(field_name) if start_idx != -1: # 查找冒号后的内容 colon_idx = text.find(':', start_idx) if colon_idx != -1: value_start = colon_idx + 1 value_end = text.find('\n', value_start) if value_end == -1: value_end = len(text) value = text[value_start:value_end].strip() result[field_code] = value else: result[field_code] = '' else: result[field_code] = '' else: result[field_code] = '' return result