23 KiB
23 KiB
接口开发说明 - 数据资产智能识别接口 ⭐⭐⭐
📋 接口基本信息
- 接口路径:
/api/v1/inventory/ai-analyze - 请求方法:
POST - 接口功能: 使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征,并提供置信度评分
- 涉及页面:
InventoryStep.vue- AI 盘点处理阶段 - 是否涉及大模型: ✅ 是(核心功能)
- 工作量评估: 15 人日
- 优先级: 高
🎯 功能描述
该接口是数据资产盘点系统的核心功能,使用大模型技术智能识别和标注数据资产,具体功能包括:
-
表名和字段名中文命名识别
- 将英文表名/字段名转换为中文名称
- 识别业务含义
-
业务含义描述生成
- 自动生成表的中文描述
- 自动生成字段的中文描述
-
PII(个人信息)识别
- 识别敏感个人信息(SPI)
- 符合《个人信息保护法》(PIPL) 要求
- 识别类型:手机号、身份证、姓名、邮箱、地址等
-
重要数据识别
- 识别《数据安全法》定义的重要数据
- 涉及国家安全、公共利益的数据
-
置信度评分
- 评估识别结果的可靠性(0-100%)
- 考虑字段命名规范度、注释完整性等因素
🔧 技术实现方案
技术栈
# 核心依赖
fastapi>=0.104.0 # Web 框架
pydantic>=2.0.0 # 数据验证
httpx>=0.24.0 # HTTP 客户端(用于调用大模型 API)
# 大模型 SDK
openai>=1.0.0 # OpenAI API (如果使用 GPT-4)
dashscope>=1.14.0 # 通义千问 API
qianfan>=0.1.0 # 文心一言 API
# 工具库
python-dotenv>=1.0.0 # 环境变量管理
loguru>=0.7.0 # 日志管理
redis>=5.0.0 # 缓存(可选)
大模型选择建议
| 场景 | 推荐模型 | 理由 |
|---|---|---|
| 数据资产识别 | 通义千问 / GPT-4 | 需要准确理解表结构和业务含义 |
实现思路
- 输入数据准备: 整理表结构信息、行业背景、业务上下文
- 提示词构建: 根据输入数据构建专业的提示词
- 大模型调用: 调用大模型 API 进行识别
- 结果解析: 解析大模型返回的 JSON 结果
- 规则引擎验证: 使用规则引擎验证和补充识别结果
- 置信度评分: 计算识别结果的置信度
- 结果验证: 验证数据格式和逻辑正确性
📥 请求格式
请求方式
Content-Type: application/json
请求参数
{
"tables": [
{
"raw_name": "t_user_base_01",
"fields": [
{
"raw_name": "user_id",
"type": "varchar(64)",
"comment": "用户ID"
},
{
"raw_name": "phone",
"type": "varchar(11)",
"comment": "手机号"
},
{
"raw_name": "id_card",
"type": "varchar(18)",
"comment": "身份证号"
}
]
}
],
"project_id": "project_001",
"industry": "retail-fresh",
"context": "某连锁生鲜零售企业,主营水果、蔬菜等生鲜产品",
"options": {
"model": "qwen-max",
"temperature": 0.3,
"enable_pii_detection": true,
"enable_important_data_detection": true
}
}
请求参数说明
| 参数名 | 类型 | 必填 | 说明 |
|---|---|---|---|
tables |
array | 是 | 表列表,每个表包含表名和字段列表 |
tables[].raw_name |
string | 是 | 表名(英文/原始名称) |
tables[].fields |
array | 是 | 字段列表 |
tables[].fields[].raw_name |
string | 是 | 字段名(英文) |
tables[].fields[].type |
string | 是 | 字段类型 |
tables[].fields[].comment |
string | 否 | 字段注释(如果有) |
project_id |
string | 是 | 项目ID |
industry |
string | 否 | 行业信息(如:retail-fresh) |
context |
string | 否 | 业务背景信息 |
options |
object | 否 | 可选配置 |
options.model |
string | 否 | 大模型选择(qwen-max/gpt-4/ernie-bot) |
options.temperature |
float | 否 | 温度参数(0.0-1.0),默认 0.3 |
options.enable_pii_detection |
boolean | 否 | 是否启用 PII 识别,默认 true |
options.enable_important_data_detection |
boolean | 否 | 是否启用重要数据识别,默认 true |
📤 响应格式
成功响应
{
"success": true,
"code": 200,
"message": "数据资产识别成功",
"data": {
"tables": [
{
"raw_name": "t_user_base_01",
"ai_name": "会员基础信息表",
"desc": "存储C端注册用户的核心身份信息",
"confidence": 98,
"ai_completed": true,
"fields": [
{
"raw_name": "user_id",
"ai_name": "用户ID",
"desc": "用户的唯一标识符",
"type": "varchar(64)",
"pii": [],
"pii_type": null,
"is_important_data": false,
"confidence": 95
},
{
"raw_name": "phone",
"ai_name": "手机号",
"desc": "用户的联系电话",
"type": "varchar(11)",
"pii": ["手机号"],
"pii_type": "contact",
"is_important_data": false,
"confidence": 98
},
{
"raw_name": "id_card",
"ai_name": "身份证号",
"desc": "用户的身份证号码",
"type": "varchar(18)",
"pii": ["身份证号"],
"pii_type": "identity",
"is_important_data": false,
"confidence": 99
}
],
"pii": ["手机号", "身份证号"],
"important": false,
"important_data_types": []
}
],
"statistics": {
"total_tables": 1,
"total_fields": 3,
"pii_fields_count": 2,
"important_data_fields_count": 0,
"average_confidence": 97.3
},
"processing_time": 5.2,
"model_used": "qwen-max",
"token_usage": {
"prompt_tokens": 1200,
"completion_tokens": 800,
"total_tokens": 2000
}
}
}
失败响应
{
"success": false,
"code": 500,
"message": "数据资产识别失败",
"error": {
"error_code": "AI_ANALYZE_ERROR",
"error_detail": "大模型 API 调用失败: Rate limit exceeded",
"retryable": true
}
}
💻 代码实现示例
提示词模板设计
SYSTEM_PROMPT = """你是一位专业的数据资产管理专家,擅长识别数据资产的中文名称、业务含义、敏感信息和重要数据特征。
## 你的专业能力
- 深入理解数据资产管理、数据合规(PIPL、数据安全法)等法规要求
- 熟悉各种业务场景下的数据资产命名规范
- 能够准确识别敏感个人信息(SPI)和重要数据
- 具备优秀的文本理解和生成能力
## 输出要求
1. **准确性**: 中文命名必须准确反映业务含义
2. **合规性**: PII 识别必须符合《个人信息保护法》(PIPL)
3. **完整性**: 重要数据识别必须符合《数据安全法》
4. **专业性**: 使用专业术语,符合行业标准
5. **结构化**: 严格按照JSON格式输出
"""
USER_PROMPT_TEMPLATE = """请基于以下信息识别数据资产:
## 行业背景
{industry_info}
## 业务背景
{context_info}
## 表结构信息
{tables_info}
## 识别要求
1. 为每个表生成中文名称(ai_name)和业务描述(desc)
2. 为每个字段生成中文名称(ai_name)和业务描述(desc)
3. 识别敏感个人信息(PII):
- 手机号、身份证号、姓名、邮箱、地址等
- 生物识别信息(人脸、指纹等)
- 医疗健康信息
- 金融账户信息
- 行踪轨迹信息
4. 识别重要数据(符合《数据安全法》):
- 涉及国家安全的数据
- 涉及公共利益的数据
- 高精度地理信息(军事禁区周边)
- 关键物资流向(稀土、芯片等)
5. 计算置信度评分(0-100):
- 字段命名规范度
- 注释完整性
- 业务含义明确度
## 输出格式(JSON)
{json_schema}
"""
JSON_SCHEMA = """
{
"tables": [
{
"raw_name": "string",
"ai_name": "string",
"desc": "string",
"confidence": "integer (0-100)",
"fields": [
{
"raw_name": "string",
"ai_name": "string",
"desc": "string",
"pii": ["string"],
"pii_type": "string | null",
"is_important_data": "boolean",
"confidence": "integer (0-100)"
}
],
"pii": ["string"],
"important": "boolean",
"important_data_types": ["string"]
}
]
}
"""
FastAPI 实现
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
import json
import os
from dotenv import load_dotenv
import httpx
import time
from loguru import logger
load_dotenv()
app = FastAPI()
# 大模型配置
MODEL_CONFIG = {
"qwen-max": {
"api_key": os.getenv("DASHSCOPE_API_KEY"),
"base_url": "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation",
"model_name": "qwen-max"
},
"gpt-4": {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": "https://api.openai.com/v1/chat/completions",
"model_name": "gpt-4"
}
}
class FieldInput(BaseModel):
raw_name: str
type: str
comment: Optional[str] = None
class TableInput(BaseModel):
raw_name: str
fields: List[FieldInput]
class AnalyzeRequest(BaseModel):
tables: List[TableInput]
project_id: str
industry: Optional[str] = None
context: Optional[str] = None
options: Optional[Dict] = None
class FieldOutput(BaseModel):
raw_name: str
ai_name: str
desc: str
type: str
pii: List[str] = []
pii_type: Optional[str] = None
is_important_data: bool = False
confidence: int = Field(ge=0, le=100)
class TableOutput(BaseModel):
raw_name: str
ai_name: str
desc: str
confidence: int = Field(ge=0, le=100)
ai_completed: bool = True
fields: List[FieldOutput]
pii: List[str] = []
important: bool = False
important_data_types: List[str] = []
def build_prompt(tables: List[TableInput], industry: str = None, context: str = None) -> str:
"""构建提示词"""
# 格式化表信息
tables_info = []
for table in tables:
table_info = f"表名: {table.raw_name}\n字段列表:\n"
for field in table.fields:
field_info = f" - {field.raw_name} ({field.type})"
if field.comment:
field_info += f" - {field.comment}"
table_info += field_info + "\n"
tables_info.append(table_info)
tables_info_str = "\n\n".join(tables_info)
# 行业信息
industry_info = industry if industry else "未指定"
# 业务背景
context_info = context if context else "未提供业务背景信息"
# 构建用户提示词
user_prompt = USER_PROMPT_TEMPLATE.format(
industry_info=industry_info,
context_info=context_info,
tables_info=tables_info_str,
json_schema=JSON_SCHEMA
)
return user_prompt
async def call_llm_api(prompt: str, model: str = "qwen-max", temperature: float = 0.3) -> str:
"""调用大模型 API"""
config = MODEL_CONFIG.get(model)
if not config:
raise ValueError(f"不支持的大模型: {model}")
headers = {
"Authorization": f"Bearer {config['api_key']}",
"Content-Type": "application/json"
}
if model == "qwen-max":
# 通义千问 API
payload = {
"model": config["model_name"],
"input": {
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
]
},
"parameters": {
"temperature": temperature,
"result_format": "message"
}
}
elif model == "gpt-4":
# OpenAI API
payload = {
"model": config["model_name"],
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"temperature": temperature,
"response_format": {"type": "json_object"}
}
async with httpx.AsyncClient(timeout=60.0) as client:
try:
response = await client.post(
config["base_url"],
headers=headers,
json=payload
)
response.raise_for_status()
result = response.json()
# 解析响应(根据不同的 API 格式)
if model == "qwen-max":
content = result["output"]["choices"][0]["message"]["content"]
elif model == "gpt-4":
content = result["choices"][0]["message"]["content"]
return content
except httpx.HTTPError as e:
logger.error(f"大模型 API 调用失败: {str(e)}")
raise Exception(f"大模型 API 调用失败: {str(e)}")
def parse_llm_response(response_text: str) -> Dict:
"""解析大模型返回的 JSON 结果"""
try:
# 提取 JSON 部分(如果返回的是 Markdown 格式)
if "```json" in response_text:
json_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
json_text = response_text.split("```")[1].split("```")[0].strip()
else:
json_text = response_text.strip()
# 解析 JSON
result = json.loads(json_text)
return result
except json.JSONDecodeError as e:
logger.error(f"JSON 解析失败: {str(e)}")
logger.error(f"原始响应: {response_text}")
raise Exception(f"大模型返回的 JSON 格式错误: {str(e)}")
def validate_pii_detection(field: FieldOutput, field_input: FieldInput) -> FieldOutput:
"""使用规则引擎验证和补充 PII 识别"""
# PII 关键词规则
pii_keywords = {
"phone": ["手机", "phone", "mobile", "tel", "telephone"],
"id_card": ["身份证", "id_card", "idcard", "identity"],
"name": ["姓名", "name", "real_name"],
"email": ["邮箱", "email", "mail"],
"address": ["地址", "address", "addr"]
}
field_name_lower = field.raw_name.lower()
# 如果 AI 未识别,使用规则引擎识别
if not field.pii:
for pii_type, keywords in pii_keywords.items():
if any(keyword in field_name_lower for keyword in keywords):
field.pii = [pii_type]
field.pii_type = pii_type
break
return field
def calculate_confidence(field: FieldInput, field_output: FieldOutput) -> int:
"""计算置信度评分"""
score = 50 # 基础分
# 命名规范度(30分)
if field.raw_name.islower() and '_' in field.raw_name:
score += 15 # 蛇形命名
elif field.raw_name.islower() and field.raw_name.isalnum():
score += 10 # 小写字母数字
# 注释完整性(20分)
if field.comment:
score += 20
# AI 识别结果(50分)
if field_output.ai_name and field_output.ai_name != field.raw_name:
score += 25
if field_output.desc:
score += 25
return min(score, 100)
@app.post("/api/v1/inventory/ai-analyze")
async def ai_analyze(request: AnalyzeRequest):
"""
数据资产智能识别接口
使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征
"""
start_time = time.time()
try:
# 获取配置
model = request.options.get("model", "qwen-max") if request.options else "qwen-max"
temperature = request.options.get("temperature", 0.3) if request.options else 0.3
enable_pii = request.options.get("enable_pii_detection", True) if request.options else True
enable_important = request.options.get("enable_important_data_detection", True) if request.options else True
# 构建提示词
prompt = build_prompt(
tables=request.tables,
industry=request.industry,
context=request.context
)
logger.info(f"调用大模型 {model} 进行数据资产识别")
# 调用大模型
response_text = await call_llm_api(prompt, model=model, temperature=temperature)
# 解析结果
llm_result = parse_llm_response(response_text)
# 转换为标准格式并验证
tables_output = []
total_pii_fields = 0
total_important_fields = 0
total_confidence = 0
for table_result, table_input in zip(llm_result.get("tables", []), request.tables):
fields_output = []
table_pii = []
table_important = False
for field_result, field_input in zip(table_result.get("fields", []), table_input.fields):
field_output = FieldOutput(
raw_name=field_result.get("raw_name", field_input.raw_name),
ai_name=field_result.get("ai_name", field_input.raw_name),
desc=field_result.get("desc", ""),
type=field_input.type,
pii=field_result.get("pii", []),
pii_type=field_result.get("pii_type"),
is_important_data=field_result.get("is_important_data", False),
confidence=field_result.get("confidence", 80)
)
# 规则引擎验证和补充
if enable_pii:
field_output = validate_pii_detection(field_output, field_input)
# 重新计算置信度
field_output.confidence = calculate_confidence(field_input, field_output)
# 收集 PII 信息
if field_output.pii:
table_pii.extend(field_output.pii)
total_pii_fields += 1
# 收集重要数据信息
if field_output.is_important_data:
table_important = True
total_important_fields += 1
fields_output.append(field_output)
total_confidence += field_output.confidence
table_output = TableOutput(
raw_name=table_result.get("raw_name", table_input.raw_name),
ai_name=table_result.get("ai_name", table_input.raw_name),
desc=table_result.get("desc", ""),
confidence=table_result.get("confidence", 80),
ai_completed=True,
fields=fields_output,
pii=list(set(table_pii)), # 去重
important=table_important,
important_data_types=table_result.get("important_data_types", [])
)
tables_output.append(table_output)
# 计算统计信息
total_fields = sum(len(table.fields) for table in tables_output)
avg_confidence = total_confidence / total_fields if total_fields > 0 else 0
processing_time = time.time() - start_time
# 构建响应
response_data = {
"tables": [table.dict() for table in tables_output],
"statistics": {
"total_tables": len(tables_output),
"total_fields": total_fields,
"pii_fields_count": total_pii_fields,
"important_data_fields_count": total_important_fields,
"average_confidence": round(avg_confidence, 2)
},
"processing_time": round(processing_time, 2),
"model_used": model,
"token_usage": {
"prompt_tokens": len(prompt) // 4, # 粗略估算
"completion_tokens": len(response_text) // 4,
"total_tokens": (len(prompt) + len(response_text)) // 4
}
}
return {
"success": True,
"code": 200,
"message": "数据资产识别成功",
"data": response_data
}
except Exception as e:
logger.error(f"数据资产识别失败: {str(e)}")
return JSONResponse(
status_code=500,
content={
"success": False,
"code": 500,
"message": "数据资产识别失败",
"error": {
"error_code": "AI_ANALYZE_ERROR",
"error_detail": str(e),
"retryable": "Rate limit" in str(e) or "timeout" in str(e).lower()
}
}
)
⚠️ 注意事项
1. 提示词工程
- 系统提示词: 定义 AI 角色为"数据资产管理专家"
- 少样本学习: 提供 5-10 个典型示例
- 约束条件: 明确 PII 和重要数据的识别标准
- 输出格式: 使用 JSON Schema 确保输出格式正确
2. PII 识别规则
必须符合《个人信息保护法》(PIPL),识别以下类型:
- 身份信息: 姓名、身份证号、护照号
- 联系信息: 手机号、邮箱、地址
- 生物识别: 人脸、指纹、声纹
- 医疗健康: 体检报告、疾病信息
- 金融账户: 银行卡号、账户信息
- 行踪轨迹: GPS 位置、行程记录
3. 重要数据识别规则
必须符合《数据安全法》,识别以下类型:
- 国家安全: 军事信息、国家秘密
- 公共利益: 关键基础设施信息
- 高精度地理: 军事禁区周边位置
- 关键物资: 稀土、芯片等关键物资流向
4. 错误处理和重试
- API 限流: 实现指数退避重试策略
- 超时处理: 设置合理的超时时间(60秒)
- 降级策略: API 失败时使用规则引擎作为降级方案
- 日志记录: 详细记录每次 API 调用的请求和响应
5. 性能优化
- 批量处理: 对于大量表,考虑批量调用 API
- 缓存机制: 相同输入缓存结果,减少 API 调用
- 异步处理: 对于大量数据,考虑异步处理
6. 成本控制
- Token 优化: 优化提示词,减少 Token 消耗
- 模型选择: 根据需求选择合适的模型(平衡成本和质量)
- 缓存策略: 对相同输入进行缓存
📝 开发检查清单
- 大模型 API 集成(通义千问/GPT-4)
- 提示词工程设计和优化
- PII 识别规则引擎
- 重要数据识别规则引擎
- 置信度评分算法
- JSON 解析和验证
- 错误处理和重试机制
- 缓存机制(可选)
- 日志记录
- 单元测试覆盖
- 性能测试