finyx_data_frontend/docs/api/04-ai-analyze.md

752 lines
23 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 接口开发说明 - 数据资产智能识别接口 ⭐⭐⭐
## 📋 接口基本信息
- **接口路径**: `/api/v1/inventory/ai-analyze`
- **请求方法**: `POST`
- **接口功能**: 使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征,并提供置信度评分
- **涉及页面**: `InventoryStep.vue` - AI 盘点处理阶段
- **是否涉及大模型**: ✅ **是**(核心功能)
- **工作量评估**: **15 人日**
- **优先级**: **高**
---
## 🎯 功能描述
该接口是数据资产盘点系统的核心功能,使用大模型技术智能识别和标注数据资产,具体功能包括:
1. **表名和字段名中文命名识别**
- 将英文表名/字段名转换为中文名称
- 识别业务含义
2. **业务含义描述生成**
- 自动生成表的中文描述
- 自动生成字段的中文描述
3. **PII个人信息识别**
- 识别敏感个人信息SPI
- 符合《个人信息保护法》(PIPL) 要求
- 识别类型:手机号、身份证、姓名、邮箱、地址等
4. **重要数据识别**
- 识别《数据安全法》定义的重要数据
- 涉及国家安全、公共利益的数据
5. **置信度评分**
- 评估识别结果的可靠性0-100%
- 考虑字段命名规范度、注释完整性等因素
---
## 🔧 技术实现方案
### 技术栈
```python
# 核心依赖
fastapi>=0.104.0 # Web 框架
pydantic>=2.0.0 # 数据验证
httpx>=0.24.0 # HTTP 客户端(用于调用大模型 API
# 大模型 SDK
openai>=1.0.0 # OpenAI API (如果使用 GPT-4)
dashscope>=1.14.0 # 通义千问 API
qianfan>=0.1.0 # 文心一言 API
# 工具库
python-dotenv>=1.0.0 # 环境变量管理
loguru>=0.7.0 # 日志管理
redis>=5.0.0 # 缓存(可选)
```
### 大模型选择建议
| 场景 | 推荐模型 | 理由 |
|------|---------|------|
| 数据资产识别 | 通义千问 / GPT-4 | 需要准确理解表结构和业务含义 |
### 实现思路
1. **输入数据准备**: 整理表结构信息、行业背景、业务上下文
2. **提示词构建**: 根据输入数据构建专业的提示词
3. **大模型调用**: 调用大模型 API 进行识别
4. **结果解析**: 解析大模型返回的 JSON 结果
5. **规则引擎验证**: 使用规则引擎验证和补充识别结果
6. **置信度评分**: 计算识别结果的置信度
7. **结果验证**: 验证数据格式和逻辑正确性
---
## 📥 请求格式
### 请求方式
**Content-Type**: `application/json`
### 请求参数
```json
{
"tables": [
{
"raw_name": "t_user_base_01",
"fields": [
{
"raw_name": "user_id",
"type": "varchar(64)",
"comment": "用户ID"
},
{
"raw_name": "phone",
"type": "varchar(11)",
"comment": "手机号"
},
{
"raw_name": "id_card",
"type": "varchar(18)",
"comment": "身份证号"
}
]
}
],
"project_id": "project_001",
"industry": "retail-fresh",
"context": "某连锁生鲜零售企业,主营水果、蔬菜等生鲜产品",
"options": {
"model": "qwen-max",
"temperature": 0.3,
"enable_pii_detection": true,
"enable_important_data_detection": true
}
}
```
### 请求参数说明
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| `tables` | array | 是 | 表列表,每个表包含表名和字段列表 |
| `tables[].raw_name` | string | 是 | 表名(英文/原始名称) |
| `tables[].fields` | array | 是 | 字段列表 |
| `tables[].fields[].raw_name` | string | 是 | 字段名(英文) |
| `tables[].fields[].type` | string | 是 | 字段类型 |
| `tables[].fields[].comment` | string | 否 | 字段注释(如果有) |
| `project_id` | string | 是 | 项目ID |
| `industry` | string | 否 | 行业信息retail-fresh |
| `context` | string | 否 | 业务背景信息 |
| `options` | object | 否 | 可选配置 |
| `options.model` | string | 否 | 大模型选择qwen-max/gpt-4/ernie-bot |
| `options.temperature` | float | 否 | 温度参数0.0-1.0),默认 0.3 |
| `options.enable_pii_detection` | boolean | 否 | 是否启用 PII 识别,默认 true |
| `options.enable_important_data_detection` | boolean | 否 | 是否启用重要数据识别,默认 true |
---
## 📤 响应格式
### 成功响应
```json
{
"success": true,
"code": 200,
"message": "数据资产识别成功",
"data": {
"tables": [
{
"raw_name": "t_user_base_01",
"ai_name": "会员基础信息表",
"desc": "存储C端注册用户的核心身份信息",
"confidence": 98,
"ai_completed": true,
"fields": [
{
"raw_name": "user_id",
"ai_name": "用户ID",
"desc": "用户的唯一标识符",
"type": "varchar(64)",
"pii": [],
"pii_type": null,
"is_important_data": false,
"confidence": 95
},
{
"raw_name": "phone",
"ai_name": "手机号",
"desc": "用户的联系电话",
"type": "varchar(11)",
"pii": ["手机号"],
"pii_type": "contact",
"is_important_data": false,
"confidence": 98
},
{
"raw_name": "id_card",
"ai_name": "身份证号",
"desc": "用户的身份证号码",
"type": "varchar(18)",
"pii": ["身份证号"],
"pii_type": "identity",
"is_important_data": false,
"confidence": 99
}
],
"pii": ["手机号", "身份证号"],
"important": false,
"important_data_types": []
}
],
"statistics": {
"total_tables": 1,
"total_fields": 3,
"pii_fields_count": 2,
"important_data_fields_count": 0,
"average_confidence": 97.3
},
"processing_time": 5.2,
"model_used": "qwen-max",
"token_usage": {
"prompt_tokens": 1200,
"completion_tokens": 800,
"total_tokens": 2000
}
}
}
```
### 失败响应
```json
{
"success": false,
"code": 500,
"message": "数据资产识别失败",
"error": {
"error_code": "AI_ANALYZE_ERROR",
"error_detail": "大模型 API 调用失败: Rate limit exceeded",
"retryable": true
}
}
```
---
## 💻 代码实现示例
### 提示词模板设计
```python
SYSTEM_PROMPT = """你是一位专业的数据资产管理专家,擅长识别数据资产的中文名称、业务含义、敏感信息和重要数据特征。
## 你的专业能力
- 深入理解数据资产管理、数据合规PIPL、数据安全法等法规要求
- 熟悉各种业务场景下的数据资产命名规范
- 能够准确识别敏感个人信息SPI和重要数据
- 具备优秀的文本理解和生成能力
## 输出要求
1. **准确性**: 中文命名必须准确反映业务含义
2. **合规性**: PII 识别必须符合《个人信息保护法》(PIPL)
3. **完整性**: 重要数据识别必须符合《数据安全法》
4. **专业性**: 使用专业术语,符合行业标准
5. **结构化**: 严格按照JSON格式输出
"""
USER_PROMPT_TEMPLATE = """请基于以下信息识别数据资产:
## 行业背景
{industry_info}
## 业务背景
{context_info}
## 表结构信息
{tables_info}
## 识别要求
1. 为每个表生成中文名称ai_name和业务描述desc
2. 为每个字段生成中文名称ai_name和业务描述desc
3. 识别敏感个人信息PII
- 手机号、身份证号、姓名、邮箱、地址等
- 生物识别信息(人脸、指纹等)
- 医疗健康信息
- 金融账户信息
- 行踪轨迹信息
4. 识别重要数据(符合《数据安全法》):
- 涉及国家安全的数据
- 涉及公共利益的数据
- 高精度地理信息(军事禁区周边)
- 关键物资流向(稀土、芯片等)
5. 计算置信度评分0-100
- 字段命名规范度
- 注释完整性
- 业务含义明确度
## 输出格式JSON
{json_schema}
"""
JSON_SCHEMA = """
{
"tables": [
{
"raw_name": "string",
"ai_name": "string",
"desc": "string",
"confidence": "integer (0-100)",
"fields": [
{
"raw_name": "string",
"ai_name": "string",
"desc": "string",
"pii": ["string"],
"pii_type": "string | null",
"is_important_data": "boolean",
"confidence": "integer (0-100)"
}
],
"pii": ["string"],
"important": "boolean",
"important_data_types": ["string"]
}
]
}
"""
```
### FastAPI 实现
```python
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
import json
import os
from dotenv import load_dotenv
import httpx
import time
from loguru import logger
load_dotenv()
app = FastAPI()
# 大模型配置
MODEL_CONFIG = {
"qwen-max": {
"api_key": os.getenv("DASHSCOPE_API_KEY"),
"base_url": "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation",
"model_name": "qwen-max"
},
"gpt-4": {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": "https://api.openai.com/v1/chat/completions",
"model_name": "gpt-4"
}
}
class FieldInput(BaseModel):
raw_name: str
type: str
comment: Optional[str] = None
class TableInput(BaseModel):
raw_name: str
fields: List[FieldInput]
class AnalyzeRequest(BaseModel):
tables: List[TableInput]
project_id: str
industry: Optional[str] = None
context: Optional[str] = None
options: Optional[Dict] = None
class FieldOutput(BaseModel):
raw_name: str
ai_name: str
desc: str
type: str
pii: List[str] = []
pii_type: Optional[str] = None
is_important_data: bool = False
confidence: int = Field(ge=0, le=100)
class TableOutput(BaseModel):
raw_name: str
ai_name: str
desc: str
confidence: int = Field(ge=0, le=100)
ai_completed: bool = True
fields: List[FieldOutput]
pii: List[str] = []
important: bool = False
important_data_types: List[str] = []
def build_prompt(tables: List[TableInput], industry: str = None, context: str = None) -> str:
"""构建提示词"""
# 格式化表信息
tables_info = []
for table in tables:
table_info = f"表名: {table.raw_name}\n字段列表:\n"
for field in table.fields:
field_info = f" - {field.raw_name} ({field.type})"
if field.comment:
field_info += f" - {field.comment}"
table_info += field_info + "\n"
tables_info.append(table_info)
tables_info_str = "\n\n".join(tables_info)
# 行业信息
industry_info = industry if industry else "未指定"
# 业务背景
context_info = context if context else "未提供业务背景信息"
# 构建用户提示词
user_prompt = USER_PROMPT_TEMPLATE.format(
industry_info=industry_info,
context_info=context_info,
tables_info=tables_info_str,
json_schema=JSON_SCHEMA
)
return user_prompt
async def call_llm_api(prompt: str, model: str = "qwen-max", temperature: float = 0.3) -> str:
"""调用大模型 API"""
config = MODEL_CONFIG.get(model)
if not config:
raise ValueError(f"不支持的大模型: {model}")
headers = {
"Authorization": f"Bearer {config['api_key']}",
"Content-Type": "application/json"
}
if model == "qwen-max":
# 通义千问 API
payload = {
"model": config["model_name"],
"input": {
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
]
},
"parameters": {
"temperature": temperature,
"result_format": "message"
}
}
elif model == "gpt-4":
# OpenAI API
payload = {
"model": config["model_name"],
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"temperature": temperature,
"response_format": {"type": "json_object"}
}
async with httpx.AsyncClient(timeout=60.0) as client:
try:
response = await client.post(
config["base_url"],
headers=headers,
json=payload
)
response.raise_for_status()
result = response.json()
# 解析响应(根据不同的 API 格式)
if model == "qwen-max":
content = result["output"]["choices"][0]["message"]["content"]
elif model == "gpt-4":
content = result["choices"][0]["message"]["content"]
return content
except httpx.HTTPError as e:
logger.error(f"大模型 API 调用失败: {str(e)}")
raise Exception(f"大模型 API 调用失败: {str(e)}")
def parse_llm_response(response_text: str) -> Dict:
"""解析大模型返回的 JSON 结果"""
try:
# 提取 JSON 部分(如果返回的是 Markdown 格式)
if "```json" in response_text:
json_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
json_text = response_text.split("```")[1].split("```")[0].strip()
else:
json_text = response_text.strip()
# 解析 JSON
result = json.loads(json_text)
return result
except json.JSONDecodeError as e:
logger.error(f"JSON 解析失败: {str(e)}")
logger.error(f"原始响应: {response_text}")
raise Exception(f"大模型返回的 JSON 格式错误: {str(e)}")
def validate_pii_detection(field: FieldOutput, field_input: FieldInput) -> FieldOutput:
"""使用规则引擎验证和补充 PII 识别"""
# PII 关键词规则
pii_keywords = {
"phone": ["手机", "phone", "mobile", "tel", "telephone"],
"id_card": ["身份证", "id_card", "idcard", "identity"],
"name": ["姓名", "name", "real_name"],
"email": ["邮箱", "email", "mail"],
"address": ["地址", "address", "addr"]
}
field_name_lower = field.raw_name.lower()
# 如果 AI 未识别,使用规则引擎识别
if not field.pii:
for pii_type, keywords in pii_keywords.items():
if any(keyword in field_name_lower for keyword in keywords):
field.pii = [pii_type]
field.pii_type = pii_type
break
return field
def calculate_confidence(field: FieldInput, field_output: FieldOutput) -> int:
"""计算置信度评分"""
score = 50 # 基础分
# 命名规范度30分
if field.raw_name.islower() and '_' in field.raw_name:
score += 15 # 蛇形命名
elif field.raw_name.islower() and field.raw_name.isalnum():
score += 10 # 小写字母数字
# 注释完整性20分
if field.comment:
score += 20
# AI 识别结果50分
if field_output.ai_name and field_output.ai_name != field.raw_name:
score += 25
if field_output.desc:
score += 25
return min(score, 100)
@app.post("/api/v1/inventory/ai-analyze")
async def ai_analyze(request: AnalyzeRequest):
"""
数据资产智能识别接口
使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征
"""
start_time = time.time()
try:
# 获取配置
model = request.options.get("model", "qwen-max") if request.options else "qwen-max"
temperature = request.options.get("temperature", 0.3) if request.options else 0.3
enable_pii = request.options.get("enable_pii_detection", True) if request.options else True
enable_important = request.options.get("enable_important_data_detection", True) if request.options else True
# 构建提示词
prompt = build_prompt(
tables=request.tables,
industry=request.industry,
context=request.context
)
logger.info(f"调用大模型 {model} 进行数据资产识别")
# 调用大模型
response_text = await call_llm_api(prompt, model=model, temperature=temperature)
# 解析结果
llm_result = parse_llm_response(response_text)
# 转换为标准格式并验证
tables_output = []
total_pii_fields = 0
total_important_fields = 0
total_confidence = 0
for table_result, table_input in zip(llm_result.get("tables", []), request.tables):
fields_output = []
table_pii = []
table_important = False
for field_result, field_input in zip(table_result.get("fields", []), table_input.fields):
field_output = FieldOutput(
raw_name=field_result.get("raw_name", field_input.raw_name),
ai_name=field_result.get("ai_name", field_input.raw_name),
desc=field_result.get("desc", ""),
type=field_input.type,
pii=field_result.get("pii", []),
pii_type=field_result.get("pii_type"),
is_important_data=field_result.get("is_important_data", False),
confidence=field_result.get("confidence", 80)
)
# 规则引擎验证和补充
if enable_pii:
field_output = validate_pii_detection(field_output, field_input)
# 重新计算置信度
field_output.confidence = calculate_confidence(field_input, field_output)
# 收集 PII 信息
if field_output.pii:
table_pii.extend(field_output.pii)
total_pii_fields += 1
# 收集重要数据信息
if field_output.is_important_data:
table_important = True
total_important_fields += 1
fields_output.append(field_output)
total_confidence += field_output.confidence
table_output = TableOutput(
raw_name=table_result.get("raw_name", table_input.raw_name),
ai_name=table_result.get("ai_name", table_input.raw_name),
desc=table_result.get("desc", ""),
confidence=table_result.get("confidence", 80),
ai_completed=True,
fields=fields_output,
pii=list(set(table_pii)), # 去重
important=table_important,
important_data_types=table_result.get("important_data_types", [])
)
tables_output.append(table_output)
# 计算统计信息
total_fields = sum(len(table.fields) for table in tables_output)
avg_confidence = total_confidence / total_fields if total_fields > 0 else 0
processing_time = time.time() - start_time
# 构建响应
response_data = {
"tables": [table.dict() for table in tables_output],
"statistics": {
"total_tables": len(tables_output),
"total_fields": total_fields,
"pii_fields_count": total_pii_fields,
"important_data_fields_count": total_important_fields,
"average_confidence": round(avg_confidence, 2)
},
"processing_time": round(processing_time, 2),
"model_used": model,
"token_usage": {
"prompt_tokens": len(prompt) // 4, # 粗略估算
"completion_tokens": len(response_text) // 4,
"total_tokens": (len(prompt) + len(response_text)) // 4
}
}
return {
"success": True,
"code": 200,
"message": "数据资产识别成功",
"data": response_data
}
except Exception as e:
logger.error(f"数据资产识别失败: {str(e)}")
return JSONResponse(
status_code=500,
content={
"success": False,
"code": 500,
"message": "数据资产识别失败",
"error": {
"error_code": "AI_ANALYZE_ERROR",
"error_detail": str(e),
"retryable": "Rate limit" in str(e) or "timeout" in str(e).lower()
}
}
)
```
---
## ⚠️ 注意事项
### 1. 提示词工程
- **系统提示词**: 定义 AI 角色为"数据资产管理专家"
- **少样本学习**: 提供 5-10 个典型示例
- **约束条件**: 明确 PII 和重要数据的识别标准
- **输出格式**: 使用 JSON Schema 确保输出格式正确
### 2. PII 识别规则
必须符合《个人信息保护法》(PIPL),识别以下类型:
- **身份信息**: 姓名、身份证号、护照号
- **联系信息**: 手机号、邮箱、地址
- **生物识别**: 人脸、指纹、声纹
- **医疗健康**: 体检报告、疾病信息
- **金融账户**: 银行卡号、账户信息
- **行踪轨迹**: GPS 位置、行程记录
### 3. 重要数据识别规则
必须符合《数据安全法》,识别以下类型:
- **国家安全**: 军事信息、国家秘密
- **公共利益**: 关键基础设施信息
- **高精度地理**: 军事禁区周边位置
- **关键物资**: 稀土、芯片等关键物资流向
### 4. 错误处理和重试
- **API 限流**: 实现指数退避重试策略
- **超时处理**: 设置合理的超时时间60秒
- **降级策略**: API 失败时使用规则引擎作为降级方案
- **日志记录**: 详细记录每次 API 调用的请求和响应
### 5. 性能优化
- **批量处理**: 对于大量表,考虑批量调用 API
- **缓存机制**: 相同输入缓存结果,减少 API 调用
- **异步处理**: 对于大量数据,考虑异步处理
### 6. 成本控制
- **Token 优化**: 优化提示词,减少 Token 消耗
- **模型选择**: 根据需求选择合适的模型(平衡成本和质量)
- **缓存策略**: 对相同输入进行缓存
---
## 📝 开发检查清单
- [ ] 大模型 API 集成(通义千问/GPT-4
- [ ] 提示词工程设计和优化
- [ ] PII 识别规则引擎
- [ ] 重要数据识别规则引擎
- [ ] 置信度评分算法
- [ ] JSON 解析和验证
- [ ] 错误处理和重试机制
- [ ] 缓存机制(可选)
- [ ] 日志记录
- [ ] 单元测试覆盖
- [ ] 性能测试
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md)
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)
- [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md)
- [通义千问 API 文档](https://help.aliyun.com/zh/model-studio/)
- [OpenAI API 文档](https://platform.openai.com/docs)