# 接口开发说明 - 数据资产智能识别接口 ⭐⭐⭐ ## 📋 接口基本信息 - **接口路径**: `/api/v1/inventory/ai-analyze` - **请求方法**: `POST` - **接口功能**: 使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征,并提供置信度评分 - **涉及页面**: `InventoryStep.vue` - AI 盘点处理阶段 - **是否涉及大模型**: ✅ **是**(核心功能) - **工作量评估**: **15 人日** - **优先级**: **高** --- ## 🎯 功能描述 该接口是数据资产盘点系统的核心功能,使用大模型技术智能识别和标注数据资产,具体功能包括: 1. **表名和字段名中文命名识别** - 将英文表名/字段名转换为中文名称 - 识别业务含义 2. **业务含义描述生成** - 自动生成表的中文描述 - 自动生成字段的中文描述 3. **PII(个人信息)识别** - 识别敏感个人信息(SPI) - 符合《个人信息保护法》(PIPL) 要求 - 识别类型:手机号、身份证、姓名、邮箱、地址等 4. **重要数据识别** - 识别《数据安全法》定义的重要数据 - 涉及国家安全、公共利益的数据 5. **置信度评分** - 评估识别结果的可靠性(0-100%) - 考虑字段命名规范度、注释完整性等因素 --- ## 🔧 技术实现方案 ### 技术栈 ```python # 核心依赖 fastapi>=0.104.0 # Web 框架 pydantic>=2.0.0 # 数据验证 httpx>=0.24.0 # HTTP 客户端(用于调用大模型 API) # 大模型 SDK openai>=1.0.0 # OpenAI API (如果使用 GPT-4) dashscope>=1.14.0 # 通义千问 API qianfan>=0.1.0 # 文心一言 API # 工具库 python-dotenv>=1.0.0 # 环境变量管理 loguru>=0.7.0 # 日志管理 redis>=5.0.0 # 缓存(可选) ``` ### 大模型选择建议 | 场景 | 推荐模型 | 理由 | |------|---------|------| | 数据资产识别 | 通义千问 / GPT-4 | 需要准确理解表结构和业务含义 | ### 实现思路 1. **输入数据准备**: 整理表结构信息、行业背景、业务上下文 2. **提示词构建**: 根据输入数据构建专业的提示词 3. **大模型调用**: 调用大模型 API 进行识别 4. **结果解析**: 解析大模型返回的 JSON 结果 5. **规则引擎验证**: 使用规则引擎验证和补充识别结果 6. **置信度评分**: 计算识别结果的置信度 7. **结果验证**: 验证数据格式和逻辑正确性 --- ## 📥 请求格式 ### 请求方式 **Content-Type**: `application/json` ### 请求参数 ```json { "tables": [ { "raw_name": "t_user_base_01", "fields": [ { "raw_name": "user_id", "type": "varchar(64)", "comment": "用户ID" }, { "raw_name": "phone", "type": "varchar(11)", "comment": "手机号" }, { "raw_name": "id_card", "type": "varchar(18)", "comment": "身份证号" } ] } ], "project_id": "project_001", "industry": "retail-fresh", "context": "某连锁生鲜零售企业,主营水果、蔬菜等生鲜产品", "options": { "model": "qwen-max", "temperature": 0.3, "enable_pii_detection": true, "enable_important_data_detection": true } } ``` ### 请求参数说明 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | `tables` | array | 是 | 表列表,每个表包含表名和字段列表 | | `tables[].raw_name` | string | 是 | 表名(英文/原始名称) | | `tables[].fields` | array | 是 | 字段列表 | | `tables[].fields[].raw_name` | string | 是 | 字段名(英文) | | `tables[].fields[].type` | string | 是 | 字段类型 | | `tables[].fields[].comment` | string | 否 | 字段注释(如果有) | | `project_id` | string | 是 | 项目ID | | `industry` | string | 否 | 行业信息(如:retail-fresh) | | `context` | string | 否 | 业务背景信息 | | `options` | object | 否 | 可选配置 | | `options.model` | string | 否 | 大模型选择(qwen-max/gpt-4/ernie-bot) | | `options.temperature` | float | 否 | 温度参数(0.0-1.0),默认 0.3 | | `options.enable_pii_detection` | boolean | 否 | 是否启用 PII 识别,默认 true | | `options.enable_important_data_detection` | boolean | 否 | 是否启用重要数据识别,默认 true | --- ## 📤 响应格式 ### 成功响应 ```json { "success": true, "code": 200, "message": "数据资产识别成功", "data": { "tables": [ { "raw_name": "t_user_base_01", "ai_name": "会员基础信息表", "desc": "存储C端注册用户的核心身份信息", "confidence": 98, "ai_completed": true, "fields": [ { "raw_name": "user_id", "ai_name": "用户ID", "desc": "用户的唯一标识符", "type": "varchar(64)", "pii": [], "pii_type": null, "is_important_data": false, "confidence": 95 }, { "raw_name": "phone", "ai_name": "手机号", "desc": "用户的联系电话", "type": "varchar(11)", "pii": ["手机号"], "pii_type": "contact", "is_important_data": false, "confidence": 98 }, { "raw_name": "id_card", "ai_name": "身份证号", "desc": "用户的身份证号码", "type": "varchar(18)", "pii": ["身份证号"], "pii_type": "identity", "is_important_data": false, "confidence": 99 } ], "pii": ["手机号", "身份证号"], "important": false, "important_data_types": [] } ], "statistics": { "total_tables": 1, "total_fields": 3, "pii_fields_count": 2, "important_data_fields_count": 0, "average_confidence": 97.3 }, "processing_time": 5.2, "model_used": "qwen-max", "token_usage": { "prompt_tokens": 1200, "completion_tokens": 800, "total_tokens": 2000 } } } ``` ### 失败响应 ```json { "success": false, "code": 500, "message": "数据资产识别失败", "error": { "error_code": "AI_ANALYZE_ERROR", "error_detail": "大模型 API 调用失败: Rate limit exceeded", "retryable": true } } ``` --- ## 💻 代码实现示例 ### 提示词模板设计 ```python SYSTEM_PROMPT = """你是一位专业的数据资产管理专家,擅长识别数据资产的中文名称、业务含义、敏感信息和重要数据特征。 ## 你的专业能力 - 深入理解数据资产管理、数据合规(PIPL、数据安全法)等法规要求 - 熟悉各种业务场景下的数据资产命名规范 - 能够准确识别敏感个人信息(SPI)和重要数据 - 具备优秀的文本理解和生成能力 ## 输出要求 1. **准确性**: 中文命名必须准确反映业务含义 2. **合规性**: PII 识别必须符合《个人信息保护法》(PIPL) 3. **完整性**: 重要数据识别必须符合《数据安全法》 4. **专业性**: 使用专业术语,符合行业标准 5. **结构化**: 严格按照JSON格式输出 """ USER_PROMPT_TEMPLATE = """请基于以下信息识别数据资产: ## 行业背景 {industry_info} ## 业务背景 {context_info} ## 表结构信息 {tables_info} ## 识别要求 1. 为每个表生成中文名称(ai_name)和业务描述(desc) 2. 为每个字段生成中文名称(ai_name)和业务描述(desc) 3. 识别敏感个人信息(PII): - 手机号、身份证号、姓名、邮箱、地址等 - 生物识别信息(人脸、指纹等) - 医疗健康信息 - 金融账户信息 - 行踪轨迹信息 4. 识别重要数据(符合《数据安全法》): - 涉及国家安全的数据 - 涉及公共利益的数据 - 高精度地理信息(军事禁区周边) - 关键物资流向(稀土、芯片等) 5. 计算置信度评分(0-100): - 字段命名规范度 - 注释完整性 - 业务含义明确度 ## 输出格式(JSON) {json_schema} """ JSON_SCHEMA = """ { "tables": [ { "raw_name": "string", "ai_name": "string", "desc": "string", "confidence": "integer (0-100)", "fields": [ { "raw_name": "string", "ai_name": "string", "desc": "string", "pii": ["string"], "pii_type": "string | null", "is_important_data": "boolean", "confidence": "integer (0-100)" } ], "pii": ["string"], "important": "boolean", "important_data_types": ["string"] } ] } """ ``` ### FastAPI 实现 ```python from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from typing import Optional, List, Dict import json import os from dotenv import load_dotenv import httpx import time from loguru import logger load_dotenv() app = FastAPI() # 大模型配置 MODEL_CONFIG = { "qwen-max": { "api_key": os.getenv("DASHSCOPE_API_KEY"), "base_url": "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation", "model_name": "qwen-max" }, "gpt-4": { "api_key": os.getenv("OPENAI_API_KEY"), "base_url": "https://api.openai.com/v1/chat/completions", "model_name": "gpt-4" } } class FieldInput(BaseModel): raw_name: str type: str comment: Optional[str] = None class TableInput(BaseModel): raw_name: str fields: List[FieldInput] class AnalyzeRequest(BaseModel): tables: List[TableInput] project_id: str industry: Optional[str] = None context: Optional[str] = None options: Optional[Dict] = None class FieldOutput(BaseModel): raw_name: str ai_name: str desc: str type: str pii: List[str] = [] pii_type: Optional[str] = None is_important_data: bool = False confidence: int = Field(ge=0, le=100) class TableOutput(BaseModel): raw_name: str ai_name: str desc: str confidence: int = Field(ge=0, le=100) ai_completed: bool = True fields: List[FieldOutput] pii: List[str] = [] important: bool = False important_data_types: List[str] = [] def build_prompt(tables: List[TableInput], industry: str = None, context: str = None) -> str: """构建提示词""" # 格式化表信息 tables_info = [] for table in tables: table_info = f"表名: {table.raw_name}\n字段列表:\n" for field in table.fields: field_info = f" - {field.raw_name} ({field.type})" if field.comment: field_info += f" - {field.comment}" table_info += field_info + "\n" tables_info.append(table_info) tables_info_str = "\n\n".join(tables_info) # 行业信息 industry_info = industry if industry else "未指定" # 业务背景 context_info = context if context else "未提供业务背景信息" # 构建用户提示词 user_prompt = USER_PROMPT_TEMPLATE.format( industry_info=industry_info, context_info=context_info, tables_info=tables_info_str, json_schema=JSON_SCHEMA ) return user_prompt async def call_llm_api(prompt: str, model: str = "qwen-max", temperature: float = 0.3) -> str: """调用大模型 API""" config = MODEL_CONFIG.get(model) if not config: raise ValueError(f"不支持的大模型: {model}") headers = { "Authorization": f"Bearer {config['api_key']}", "Content-Type": "application/json" } if model == "qwen-max": # 通义千问 API payload = { "model": config["model_name"], "input": { "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt} ] }, "parameters": { "temperature": temperature, "result_format": "message" } } elif model == "gpt-4": # OpenAI API payload = { "model": config["model_name"], "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt} ], "temperature": temperature, "response_format": {"type": "json_object"} } async with httpx.AsyncClient(timeout=60.0) as client: try: response = await client.post( config["base_url"], headers=headers, json=payload ) response.raise_for_status() result = response.json() # 解析响应(根据不同的 API 格式) if model == "qwen-max": content = result["output"]["choices"][0]["message"]["content"] elif model == "gpt-4": content = result["choices"][0]["message"]["content"] return content except httpx.HTTPError as e: logger.error(f"大模型 API 调用失败: {str(e)}") raise Exception(f"大模型 API 调用失败: {str(e)}") def parse_llm_response(response_text: str) -> Dict: """解析大模型返回的 JSON 结果""" try: # 提取 JSON 部分(如果返回的是 Markdown 格式) if "```json" in response_text: json_text = response_text.split("```json")[1].split("```")[0].strip() elif "```" in response_text: json_text = response_text.split("```")[1].split("```")[0].strip() else: json_text = response_text.strip() # 解析 JSON result = json.loads(json_text) return result except json.JSONDecodeError as e: logger.error(f"JSON 解析失败: {str(e)}") logger.error(f"原始响应: {response_text}") raise Exception(f"大模型返回的 JSON 格式错误: {str(e)}") def validate_pii_detection(field: FieldOutput, field_input: FieldInput) -> FieldOutput: """使用规则引擎验证和补充 PII 识别""" # PII 关键词规则 pii_keywords = { "phone": ["手机", "phone", "mobile", "tel", "telephone"], "id_card": ["身份证", "id_card", "idcard", "identity"], "name": ["姓名", "name", "real_name"], "email": ["邮箱", "email", "mail"], "address": ["地址", "address", "addr"] } field_name_lower = field.raw_name.lower() # 如果 AI 未识别,使用规则引擎识别 if not field.pii: for pii_type, keywords in pii_keywords.items(): if any(keyword in field_name_lower for keyword in keywords): field.pii = [pii_type] field.pii_type = pii_type break return field def calculate_confidence(field: FieldInput, field_output: FieldOutput) -> int: """计算置信度评分""" score = 50 # 基础分 # 命名规范度(30分) if field.raw_name.islower() and '_' in field.raw_name: score += 15 # 蛇形命名 elif field.raw_name.islower() and field.raw_name.isalnum(): score += 10 # 小写字母数字 # 注释完整性(20分) if field.comment: score += 20 # AI 识别结果(50分) if field_output.ai_name and field_output.ai_name != field.raw_name: score += 25 if field_output.desc: score += 25 return min(score, 100) @app.post("/api/v1/inventory/ai-analyze") async def ai_analyze(request: AnalyzeRequest): """ 数据资产智能识别接口 使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征 """ start_time = time.time() try: # 获取配置 model = request.options.get("model", "qwen-max") if request.options else "qwen-max" temperature = request.options.get("temperature", 0.3) if request.options else 0.3 enable_pii = request.options.get("enable_pii_detection", True) if request.options else True enable_important = request.options.get("enable_important_data_detection", True) if request.options else True # 构建提示词 prompt = build_prompt( tables=request.tables, industry=request.industry, context=request.context ) logger.info(f"调用大模型 {model} 进行数据资产识别") # 调用大模型 response_text = await call_llm_api(prompt, model=model, temperature=temperature) # 解析结果 llm_result = parse_llm_response(response_text) # 转换为标准格式并验证 tables_output = [] total_pii_fields = 0 total_important_fields = 0 total_confidence = 0 for table_result, table_input in zip(llm_result.get("tables", []), request.tables): fields_output = [] table_pii = [] table_important = False for field_result, field_input in zip(table_result.get("fields", []), table_input.fields): field_output = FieldOutput( raw_name=field_result.get("raw_name", field_input.raw_name), ai_name=field_result.get("ai_name", field_input.raw_name), desc=field_result.get("desc", ""), type=field_input.type, pii=field_result.get("pii", []), pii_type=field_result.get("pii_type"), is_important_data=field_result.get("is_important_data", False), confidence=field_result.get("confidence", 80) ) # 规则引擎验证和补充 if enable_pii: field_output = validate_pii_detection(field_output, field_input) # 重新计算置信度 field_output.confidence = calculate_confidence(field_input, field_output) # 收集 PII 信息 if field_output.pii: table_pii.extend(field_output.pii) total_pii_fields += 1 # 收集重要数据信息 if field_output.is_important_data: table_important = True total_important_fields += 1 fields_output.append(field_output) total_confidence += field_output.confidence table_output = TableOutput( raw_name=table_result.get("raw_name", table_input.raw_name), ai_name=table_result.get("ai_name", table_input.raw_name), desc=table_result.get("desc", ""), confidence=table_result.get("confidence", 80), ai_completed=True, fields=fields_output, pii=list(set(table_pii)), # 去重 important=table_important, important_data_types=table_result.get("important_data_types", []) ) tables_output.append(table_output) # 计算统计信息 total_fields = sum(len(table.fields) for table in tables_output) avg_confidence = total_confidence / total_fields if total_fields > 0 else 0 processing_time = time.time() - start_time # 构建响应 response_data = { "tables": [table.dict() for table in tables_output], "statistics": { "total_tables": len(tables_output), "total_fields": total_fields, "pii_fields_count": total_pii_fields, "important_data_fields_count": total_important_fields, "average_confidence": round(avg_confidence, 2) }, "processing_time": round(processing_time, 2), "model_used": model, "token_usage": { "prompt_tokens": len(prompt) // 4, # 粗略估算 "completion_tokens": len(response_text) // 4, "total_tokens": (len(prompt) + len(response_text)) // 4 } } return { "success": True, "code": 200, "message": "数据资产识别成功", "data": response_data } except Exception as e: logger.error(f"数据资产识别失败: {str(e)}") return JSONResponse( status_code=500, content={ "success": False, "code": 500, "message": "数据资产识别失败", "error": { "error_code": "AI_ANALYZE_ERROR", "error_detail": str(e), "retryable": "Rate limit" in str(e) or "timeout" in str(e).lower() } } ) ``` --- ## ⚠️ 注意事项 ### 1. 提示词工程 - **系统提示词**: 定义 AI 角色为"数据资产管理专家" - **少样本学习**: 提供 5-10 个典型示例 - **约束条件**: 明确 PII 和重要数据的识别标准 - **输出格式**: 使用 JSON Schema 确保输出格式正确 ### 2. PII 识别规则 必须符合《个人信息保护法》(PIPL),识别以下类型: - **身份信息**: 姓名、身份证号、护照号 - **联系信息**: 手机号、邮箱、地址 - **生物识别**: 人脸、指纹、声纹 - **医疗健康**: 体检报告、疾病信息 - **金融账户**: 银行卡号、账户信息 - **行踪轨迹**: GPS 位置、行程记录 ### 3. 重要数据识别规则 必须符合《数据安全法》,识别以下类型: - **国家安全**: 军事信息、国家秘密 - **公共利益**: 关键基础设施信息 - **高精度地理**: 军事禁区周边位置 - **关键物资**: 稀土、芯片等关键物资流向 ### 4. 错误处理和重试 - **API 限流**: 实现指数退避重试策略 - **超时处理**: 设置合理的超时时间(60秒) - **降级策略**: API 失败时使用规则引擎作为降级方案 - **日志记录**: 详细记录每次 API 调用的请求和响应 ### 5. 性能优化 - **批量处理**: 对于大量表,考虑批量调用 API - **缓存机制**: 相同输入缓存结果,减少 API 调用 - **异步处理**: 对于大量数据,考虑异步处理 ### 6. 成本控制 - **Token 优化**: 优化提示词,减少 Token 消耗 - **模型选择**: 根据需求选择合适的模型(平衡成本和质量) - **缓存策略**: 对相同输入进行缓存 --- ## 📝 开发检查清单 - [ ] 大模型 API 集成(通义千问/GPT-4) - [ ] 提示词工程设计和优化 - [ ] PII 识别规则引擎 - [ ] 重要数据识别规则引擎 - [ ] 置信度评分算法 - [ ] JSON 解析和验证 - [ ] 错误处理和重试机制 - [ ] 缓存机制(可选) - [ ] 日志记录 - [ ] 单元测试覆盖 - [ ] 性能测试 --- ## 🔗 相关文档 - [接口清单表格](../Python接口清单表格.md) - [Python技术人员工作量文档](../Python技术人员工作量文档.md) - [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md) - [通义千问 API 文档](https://help.aliyun.com/zh/model-studio/) - [OpenAI API 文档](https://platform.openai.com/docs)