""" 数据资产智能识别服务 """ import json import time from typing import List, Dict, Any, Optional from app.schemas.inventory import ( TableInput, FieldInput, TableOutput, FieldOutput, Statistics, TokenUsage, AnalyzeOptions, ) from app.utils.llm_client import llm_client from app.utils.logger import logger from app.core.config import settings from app.core.exceptions import LLMAPIException, ValidationException # ==================== 提示词模板 ==================== SYSTEM_PROMPT = """你是一位专业的数据资产管理专家,擅长识别数据资产的中文名称、业务含义、敏感信息和重要数据特征。 ## 你的专业能力 - 深入理解数据资产管理、数据合规(PIPL、数据安全法)等法规要求 - 熟悉各种业务场景下的数据资产命名规范 - 能够准确识别敏感个人信息(SPI)和重要数据 - 具备优秀的文本理解和生成能力 ## 输出要求 1. **准确性**: 中文命名必须准确反映业务含义 2. **合规性**: PII 识别必须符合《个人信息保护法》(PIPL) 3. **完整性**: 重要数据识别必须符合《数据安全法》 4. **专业性**: 使用专业术语,符合行业标准 5. **结构化**: 严格按照JSON格式输出 """ USER_PROMPT_TEMPLATE = """请基于以下信息识别数据资产: ## 行业背景 {industry_info} ## 业务背景 {context_info} ## 表结构信息 {tables_info} ## 识别要求 1. 为每个表生成中文名称(ai_name)和业务描述(desc) 2. 为每个字段生成中文名称(ai_name)和业务描述(desc) 3. 识别敏感个人信息(PII): - 手机号、身份证号、姓名、邮箱、地址等 - 生物识别信息(人脸、指纹等) - 医疗健康信息 - 金融账户信息 - 行踪轨迹信息 4. 识别重要数据(符合《数据安全法》): - 涉及国家安全的数据 - 涉及公共利益的数据 - 高精度地理信息(军事禁区周边) - 关键物资流向(稀土、芯片等) 5. 计算置信度评分(0-100): - 字段命名规范度 - 注释完整性 - 业务含义明确度 ## 输出格式(JSON) {json_schema} 请严格按照以上JSON Schema格式输出,确保所有字段都存在。 """ JSON_SCHEMA = """ { "type": "object", "required": ["tables"], "properties": { "tables": { "type": "array", "items": { "type": "object", "required": ["raw_name", "ai_name", "desc", "confidence", "fields"], "properties": { "raw_name": {"type": "string"}, "ai_name": {"type": "string"}, "desc": {"type": "string"}, "confidence": {"type": "integer", "minimum": 0, "maximum": 100}, "fields": { "type": "array", "items": { "type": "object", "required": ["raw_name", "ai_name", "desc", "pii", "pii_type", "is_important_data", "confidence"], "properties": { "raw_name": {"type": "string"}, "ai_name": {"type": "string"}, "desc": {"type": "string"}, "pii": {"type": "array", "items": {"type": "string"}}, "pii_type": {"type": ["string", "null"]}, "is_important_data": {"type": "boolean"}, "confidence": {"type": "integer", "minimum": 0, "maximum": 100} } } }, "pii": {"type": "array", "items": {"type": "string"}}, "important": {"type": "boolean"}, "important_data_types": {"type": "array", "items": {"type": "string"}} } } } } } """ # ==================== PII 识别规则引擎 ==================== PII_KEYWORDS = { "phone": { "keywords": ["phone", "mobile", "tel", "telephone", "手机", "电话", "联系方式"], "type": "contact", "label": "手机号" }, "id_card": { "keywords": ["id_card", "idcard", "identity", "身份证", "证件号", "身份证明"], "type": "identity", "label": "身份证号" }, "name": { "keywords": ["name", "real_name", "姓名", "名字", "用户名"], "type": "name", "label": "姓名" }, "email": { "keywords": ["email", "mail", "邮箱", "电子邮箱", "邮件"], "type": "email", "label": "邮箱" }, "address": { "keywords": ["address", "addr", "地址", "住址", "居住地址"], "type": "address", "label": "地址" }, "bank_card": { "keywords": ["bank_card", "card_no", "银行卡", "卡号", "账户"], "type": "financial", "label": "银行卡号" }, } def validate_pii_detection(field: FieldOutput, field_input: FieldInput) -> FieldOutput: """ 使用规则引擎验证和补充 PII 识别 Args: field: AI 识别的字段结果 field_input: 原始字段输入 Returns: 验证后的字段结果 """ field_name_lower = field.raw_name.lower() field_comment_lower = (field_input.comment or "").lower() # 如果 AI 未识别 PII,使用规则引擎识别 if not field.pii or not field.pii_type: for pii_key, pii_info in PII_KEYWORDS.items(): keywords = pii_info["keywords"] # 检查字段名和注释中是否包含关键词 if any(keyword.lower() in field_name_lower or keyword.lower() in field_comment_lower for keyword in keywords): if not field.pii: field.pii = [pii_info["label"]] if not field.pii_type: field.pii_type = pii_info["type"] break return field # ==================== 置信度评分算法 ==================== def calculate_confidence(field_input: FieldInput, field_output: FieldOutput) -> int: """ 计算字段识别结果的置信度评分 Args: field_input: 原始字段输入 field_output: AI 识别的字段结果 Returns: 置信度评分(0-100) """ score = 50 # 基础分 # 命名规范度(30分) field_name = field_input.raw_name if field_name.islower() and '_' in field_name: score += 15 # 蛇形命名 elif field_name.islower() and field_name.isalnum(): score += 10 # 小写字母数字 elif field_name.isalnum(): score += 5 # 字母数字组合 # 注释完整性(20分) if field_input.comment and len(field_input.comment.strip()) > 0: score += 20 # AI 识别结果质量(30分) if field_output.ai_name and field_output.ai_name != field_input.raw_name: score += 15 # AI 生成了中文名称 if field_output.desc and len(field_output.desc.strip()) > 0: score += 15 # AI 生成了描述 return min(score, 100) # ==================== 提示词构建 ==================== def build_prompt( tables: List[TableInput], industry: Optional[str] = None, context: Optional[str] = None ) -> str: """ 构建大模型提示词 Args: tables: 表列表 industry: 行业信息 context: 业务背景 Returns: 构建好的提示词 """ # 格式化表信息 tables_info = [] for table in tables: table_info = f"表名: {table.raw_name}\n字段列表:\n" for field in table.fields: field_info = f" - {field.raw_name} ({field.type})" if field.comment: field_info += f" - 注释: {field.comment}" table_info += field_info + "\n" tables_info.append(table_info) tables_info_str = "\n\n".join(tables_info) # 行业信息 industry_info = industry if industry else "未指定" # 业务背景 context_info = context if context else "未提供业务背景信息" # 构建用户提示词 user_prompt = USER_PROMPT_TEMPLATE.format( industry_info=industry_info, context_info=context_info, tables_info=tables_info_str, json_schema=JSON_SCHEMA ) return user_prompt # ==================== 主要服务类 ==================== class AIAnalyzeService: """数据资产智能识别服务""" @staticmethod async def analyze( tables: List[TableInput], project_id: str, industry: Optional[str] = None, context: Optional[str] = None, options: Optional[AnalyzeOptions] = None ) -> Dict[str, Any]: """ 执行 AI 分析 Args: tables: 表列表 project_id: 项目ID industry: 行业信息 context: 业务背景 options: 分析选项 Returns: 分析结果字典 """ start_time = time.time() # 获取配置 analyze_options = options or AnalyzeOptions() model = analyze_options.model or settings.DEFAULT_LLM_MODEL temperature = analyze_options.temperature or settings.DEFAULT_TEMPERATURE enable_pii = analyze_options.enable_pii_detection enable_important = analyze_options.enable_important_data_detection logger.info(f"开始 AI 分析 - 项目ID: {project_id}, 表数量: {len(tables)}, 模型: {model}") try: # 构建提示词 prompt = build_prompt(tables, industry, context) logger.debug(f"提示词长度: {len(prompt)} 字符") # 调用大模型 response_text = await llm_client.call( prompt=prompt, system_prompt=SYSTEM_PROMPT, temperature=temperature, model=model ) # 解析结果 llm_result = llm_client.parse_json_response(response_text) logger.info("大模型返回结果解析成功") # 验证和转换结果 tables_output = [] total_pii_fields = 0 total_important_fields = 0 total_confidence = 0 total_fields = 0 # 验证返回的表数量 llm_tables = llm_result.get("tables", []) if len(llm_tables) != len(tables): logger.warning( f"返回的表数量不匹配: 期望 {len(tables)}, 实际 {len(llm_tables)}" ) for idx, (table_result, table_input) in enumerate( zip(llm_tables, tables) ): fields_output = [] table_pii = [] table_important = False table_important_types = [] # 处理字段 llm_fields = table_result.get("fields", []) for field_idx, (field_result, field_input) in enumerate( zip(llm_fields, table_input.fields) ): field_output = FieldOutput( raw_name=field_result.get("raw_name", field_input.raw_name), ai_name=field_result.get("ai_name", field_input.raw_name), desc=field_result.get("desc", ""), type=field_input.type, pii=field_result.get("pii", []), pii_type=field_result.get("pii_type"), is_important_data=field_result.get("is_important_data", False), confidence=field_result.get("confidence", 80) ) # 规则引擎验证和补充 PII 识别 if enable_pii: field_output = validate_pii_detection(field_output, field_input) # 重新计算置信度 field_output.confidence = calculate_confidence( field_input, field_output ) # 收集 PII 信息 if field_output.pii: table_pii.extend(field_output.pii) total_pii_fields += 1 # 收集重要数据信息 if field_output.is_important_data: table_important = True table_important_types.append(field_output.raw_name) total_important_fields += 1 fields_output.append(field_output) total_confidence += field_output.confidence total_fields += 1 # 构建表输出 table_output = TableOutput( raw_name=table_result.get("raw_name", table_input.raw_name), ai_name=table_result.get("ai_name", table_input.raw_name), desc=table_result.get("desc", ""), confidence=table_result.get("confidence", 80), ai_completed=True, fields=fields_output, pii=list(set(table_pii)), # 去重 important=table_important, important_data_types=table_important_types ) tables_output.append(table_output) # 计算统计信息 avg_confidence = ( total_confidence / total_fields if total_fields > 0 else 0 ) processing_time = time.time() - start_time # 构建响应数据 response_data = { "tables": [table.dict() for table in tables_output], "statistics": Statistics( total_tables=len(tables_output), total_fields=total_fields, pii_fields_count=total_pii_fields, important_data_fields_count=total_important_fields, average_confidence=round(avg_confidence, 2) ).dict(), "processing_time": round(processing_time, 2), "model_used": model, "token_usage": TokenUsage( prompt_tokens=len(prompt) // 4, # 粗略估算 completion_tokens=len(response_text) // 4, total_tokens=(len(prompt) + len(response_text)) // 4 ).dict() } logger.info( f"AI 分析完成 - 处理时间: {processing_time:.2f}秒, " f"识别表数: {len(tables_output)}, PII字段数: {total_pii_fields}" ) return response_data except Exception as e: logger.exception(f"AI 分析失败: {str(e)}") raise LLMAPIException( f"数据资产识别失败: {str(e)}", error_detail=str(e), retryable="Rate limit" in str(e) or "timeout" in str(e).lower() )