first commit

This commit is contained in:
Ubuntu 2026-01-10 11:44:31 +08:00
commit a6adce6ea5
8 changed files with 3020 additions and 0 deletions

596
docs/01-parse-document.md Normal file
View File

@ -0,0 +1,596 @@
# 接口开发说明 - 文档解析接口
## 📋 接口基本信息
- **接口路径**: `/api/v1/inventory/parse-document`
- **请求方法**: `POST`
- **接口功能**: 解析上传的数据字典文档Excel/Word/PDF提取表结构信息
- **涉及页面**: `InventoryStep.vue` - 方案一(已有文档导入)
- **是否涉及大模型**: ❌ 否
- **工作量评估**: 5 人日
- **优先级**: 中
---
## 🎯 功能描述
该接口用于解析用户上传的数据字典文档,支持以下格式:
- **Excel**: `.xlsx`, `.xls`
- **Word**: `.doc`, `.docx`
- **PDF**: `.pdf`(可选)
从文档中提取以下信息:
- 表名(英文)
- 字段名(英文)
- 字段类型
- 字段注释/描述(中文)
---
## 🔧 技术实现方案
### 技术栈
```python
# 核心依赖
fastapi>=0.104.0 # Web 框架
pydantic>=2.0.0 # 数据验证
python-multipart>=0.0.6 # 文件上传支持
# 文档处理
pandas>=2.0.0 # 数据处理
openpyxl>=3.1.0 # Excel 处理
python-docx>=1.1.0 # Word 处理
pdfplumber>=0.10.0 # PDF 处理(可选)
```
### 实现思路
1. **文件上传**: 使用 FastAPI 的 `UploadFile` 接收文件
2. **文件类型识别**: 根据文件扩展名或 MIME 类型识别文件格式
3. **文档解析**:
- Excel: 使用 `pandas``openpyxl` 读取
- Word: 使用 `python-docx` 解析表格和文本
- PDF: 使用 `pdfplumber` 提取表格和文本
4. **表结构提取**: 识别文档中的表结构信息,提取表名、字段名、类型、注释
5. **数据验证**: 验证提取的数据格式是否正确
6. **结果返回**: 返回标准化的表结构数据
---
## 📥 请求格式
### 请求方式
**Content-Type**: `multipart/form-data``application/json`
### 方式一:文件上传(推荐)
```http
POST /api/v1/inventory/parse-document
Content-Type: multipart/form-data
file: [二进制文件]
project_id: string
file_type: excel | word | pdf (可选,自动识别)
```
### 方式二:文件路径(如果文件已上传到服务器)
```json
{
"file_path": "/path/to/document.xlsx",
"file_type": "excel | word | pdf",
"project_id": "project_001"
}
```
### 请求参数说明
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| `file` | File | 是 | 上传的文件(方式一) |
| `file_path` | string | 是 | 文件路径(方式二) |
| `file_type` | string | 否 | 文件类型:`excel` / `word` / `pdf`,如果不传则根据文件扩展名自动识别 |
| `project_id` | string | 是 | 项目ID |
---
## 📤 响应格式
### 成功响应
```json
{
"success": true,
"code": 200,
"message": "文档解析成功",
"data": {
"tables": [
{
"raw_name": "t_user_base_01",
"display_name": "用户基础信息表",
"description": "存储用户基本信息的表",
"fields": [
{
"raw_name": "user_id",
"display_name": "用户ID",
"type": "varchar(64)",
"comment": "用户的唯一标识符",
"is_primary_key": true,
"is_nullable": false,
"default_value": null
},
{
"raw_name": "user_name",
"display_name": "用户名",
"type": "varchar(50)",
"comment": "用户登录名",
"is_primary_key": false,
"is_nullable": true,
"default_value": null
}
],
"field_count": 2
}
],
"total_tables": 10,
"total_fields": 245,
"parse_time": 1.23,
"file_info": {
"file_name": "数据字典.xlsx",
"file_size": 1024000,
"file_type": "excel"
}
}
}
```
### 失败响应
```json
{
"success": false,
"code": 400,
"message": "文件格式不支持",
"error": {
"error_code": "UNSUPPORTED_FILE_TYPE",
"error_detail": "仅支持 Excel (.xlsx, .xls), Word (.doc, .docx), PDF (.pdf) 格式"
}
}
```
### 响应字段说明
| 字段名 | 类型 | 说明 |
|--------|------|------|
| `success` | boolean | 请求是否成功 |
| `code` | integer | HTTP 状态码 |
| `message` | string | 响应消息 |
| `data` | object | 响应数据 |
| `data.tables` | array | 解析出的表列表 |
| `data.tables[].raw_name` | string | 表名(英文/原始名称) |
| `data.tables[].display_name` | string | 表显示名称(中文,如果文档中有) |
| `data.tables[].description` | string | 表描述 |
| `data.tables[].fields` | array | 字段列表 |
| `data.tables[].fields[].raw_name` | string | 字段名(英文) |
| `data.tables[].fields[].display_name` | string | 字段显示名称(中文) |
| `data.tables[].fields[].type` | string | 字段类型 |
| `data.tables[].fields[].comment` | string | 字段注释 |
| `data.total_tables` | integer | 总表数 |
| `data.total_fields` | integer | 总字段数 |
| `data.parse_time` | float | 解析耗时(秒) |
---
## 💻 代码实现示例
### FastAPI 实现
```python
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, List
import pandas as pd
from docx import Document
import pdfplumber
import os
from pathlib import Path
import time
app = FastAPI()
class FieldInfo(BaseModel):
raw_name: str
display_name: Optional[str] = None
type: str
comment: Optional[str] = None
is_primary_key: bool = False
is_nullable: bool = True
default_value: Optional[str] = None
class TableInfo(BaseModel):
raw_name: str
display_name: Optional[str] = None
description: Optional[str] = None
fields: List[FieldInfo]
field_count: int
class ParseDocumentResponse(BaseModel):
success: bool
code: int
message: str
data: Optional[dict] = None
error: Optional[dict] = None
def parse_excel(file_path: str) -> List[TableInfo]:
"""解析 Excel 文件"""
tables = []
try:
# 读取 Excel 文件
df = pd.read_excel(file_path, sheet_name=None) # 读取所有 sheet
for sheet_name, df_sheet in df.items():
# 识别表结构(根据 Excel 格式约定)
# 假设第一列是字段名,第二列是类型,第三列是注释
fields = []
for _, row in df_sheet.iterrows():
if pd.notna(row.iloc[0]): # 字段名不为空
field = FieldInfo(
raw_name=str(row.iloc[0]).strip(),
display_name=str(row.iloc[2]).strip() if len(row) > 2 and pd.notna(row.iloc[2]) else None,
type=str(row.iloc[1]).strip() if len(row) > 1 and pd.notna(row.iloc[1]) else "varchar(255)",
comment=str(row.iloc[2]).strip() if len(row) > 2 and pd.notna(row.iloc[2]) else None
)
fields.append(field)
if fields:
table = TableInfo(
raw_name=sheet_name,
display_name=sheet_name,
fields=fields,
field_count=len(fields)
)
tables.append(table)
except Exception as e:
raise Exception(f"Excel 解析失败: {str(e)}")
return tables
def parse_word(file_path: str) -> List[TableInfo]:
"""解析 Word 文件"""
tables = []
try:
doc = Document(file_path)
# 遍历文档中的表格
for table_idx, table in enumerate(doc.tables):
fields = []
# 假设第一行是表头,后续行是字段信息
# 约定:第一列字段名,第二列类型,第三列注释
for row in table.rows[1:]: # 跳过表头
if len(row.cells) >= 3:
field_name = row.cells[0].text.strip()
if field_name: # 字段名不为空
field = FieldInfo(
raw_name=field_name,
display_name=row.cells[2].text.strip() if len(row.cells) > 2 and row.cells[2].text.strip() else None,
type=row.cells[1].text.strip() if len(row.cells) > 1 and row.cells[1].text.strip() else "varchar(255)",
comment=row.cells[2].text.strip() if len(row.cells) > 2 and row.cells[2].text.strip() else None
)
fields.append(field)
if fields:
table_info = TableInfo(
raw_name=f"table_{table_idx + 1}",
display_name=f"表{table_idx + 1}",
fields=fields,
field_count=len(fields)
)
tables.append(table_info)
except Exception as e:
raise Exception(f"Word 解析失败: {str(e)}")
return tables
def parse_pdf(file_path: str) -> List[TableInfo]:
"""解析 PDF 文件"""
tables = []
try:
with pdfplumber.open(file_path) as pdf:
for page_idx, page in enumerate(pdf.pages):
# 提取表格
page_tables = page.extract_tables()
for table_idx, table in enumerate(page_tables):
if table and len(table) > 1:
fields = []
# 假设第一行是表头,后续行是字段信息
for row in table[1:]:
if len(row) >= 3 and row[0]:
field = FieldInfo(
raw_name=str(row[0]).strip(),
display_name=str(row[2]).strip() if len(row) > 2 and row[2] else None,
type=str(row[1]).strip() if len(row) > 1 and row[1] else "varchar(255)",
comment=str(row[2]).strip() if len(row) > 2 and row[2] else None
)
fields.append(field)
if fields:
table_info = TableInfo(
raw_name=f"table_{page_idx + 1}_{table_idx + 1}",
display_name=f"表{page_idx + 1}-{table_idx + 1}",
fields=fields,
field_count=len(fields)
)
tables.append(table_info)
except Exception as e:
raise Exception(f"PDF 解析失败: {str(e)}")
return tables
def detect_file_type(file_name: str) -> str:
"""根据文件扩展名检测文件类型"""
ext = Path(file_name).suffix.lower()
if ext in ['.xlsx', '.xls']:
return 'excel'
elif ext in ['.docx', '.doc']:
return 'word'
elif ext == '.pdf':
return 'pdf'
else:
raise ValueError(f"不支持的文件类型: {ext}")
@app.post("/api/v1/inventory/parse-document", response_model=ParseDocumentResponse)
async def parse_document(
file: Optional[UploadFile] = File(None),
file_path: Optional[str] = Form(None),
file_type: Optional[str] = Form(None),
project_id: str = Form(...)
):
"""
文档解析接口
支持解析 Excel、Word、PDF 格式的数据字典文档,提取表结构信息
"""
start_time = time.time()
try:
# 验证参数
if not file and not file_path:
raise HTTPException(
status_code=400,
detail="必须提供文件或文件路径"
)
# 处理文件上传
if file:
# 保存上传的文件到临时目录
upload_dir = Path("/tmp/uploads")
upload_dir.mkdir(exist_ok=True)
file_path = str(upload_dir / file.filename)
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
file_name = file.filename
file_size = len(content)
# 自动检测文件类型
if not file_type:
file_type = detect_file_type(file_name)
else:
# 使用提供的文件路径
if not os.path.exists(file_path):
raise HTTPException(
status_code=404,
detail=f"文件不存在: {file_path}"
)
file_name = Path(file_path).name
file_size = os.path.getsize(file_path)
# 自动检测文件类型
if not file_type:
file_type = detect_file_type(file_name)
# 根据文件类型选择解析方法
if file_type == 'excel':
tables = parse_excel(file_path)
elif file_type == 'word':
tables = parse_word(file_path)
elif file_type == 'pdf':
tables = parse_pdf(file_path)
else:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file_type}"
)
# 计算统计信息
total_fields = sum(table.field_count for table in tables)
parse_time = time.time() - start_time
# 构建响应数据
response_data = {
"tables": [table.dict() for table in tables],
"total_tables": len(tables),
"total_fields": total_fields,
"parse_time": round(parse_time, 2),
"file_info": {
"file_name": file_name,
"file_size": file_size,
"file_type": file_type
}
}
return ParseDocumentResponse(
success=True,
code=200,
message="文档解析成功",
data=response_data
)
except HTTPException:
raise
except Exception as e:
return ParseDocumentResponse(
success=False,
code=500,
message="文档解析失败",
error={
"error_code": "PARSE_ERROR",
"error_detail": str(e)
}
)
```
---
## 🧪 测试用例
### 单元测试示例
```python
import pytest
from fastapi.testclient import TestClient
from pathlib import Path
import tempfile
client = TestClient(app)
def test_parse_excel_document():
"""测试解析 Excel 文档"""
# 创建测试 Excel 文件
test_data = {
'字段名': ['user_id', 'user_name', 'email'],
'类型': ['varchar(64)', 'varchar(50)', 'varchar(100)'],
'注释': ['用户ID', '用户名', '邮箱']
}
df = pd.DataFrame(test_data)
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp:
df.to_excel(tmp.name, index=False)
with open(tmp.name, 'rb') as f:
response = client.post(
"/api/v1/inventory/parse-document",
files={"file": ("test.xlsx", f, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")},
data={"project_id": "test_project"}
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert len(data["data"]["tables"]) > 0
assert data["data"]["total_tables"] > 0
def test_unsupported_file_type():
"""测试不支持的文件类型"""
with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as tmp:
tmp.write(b"test content")
tmp.flush()
with open(tmp.name, 'rb') as f:
response = client.post(
"/api/v1/inventory/parse-document",
files={"file": ("test.txt", f, "text/plain")},
data={"project_id": "test_project"}
)
assert response.status_code == 400
data = response.json()
assert data["success"] is False
```
---
## ⚠️ 注意事项
### 1. 文件大小限制
- **Excel**: 建议限制为 50MB
- **Word**: 建议限制为 50MB
- **PDF**: 建议限制为 50MB
在 FastAPI 中设置:
```python
from fastapi import File, UploadFile
from fastapi.exceptions import RequestEntityTooLarge
@app.exception_handler(RequestEntityTooLarge)
async def handle_upload_limit(exc):
return JSONResponse(
status_code=413,
content={
"success": False,
"message": "文件大小超过限制(最大 50MB"
}
)
```
### 2. 文件格式约定
由于不同用户的数据字典文档格式可能不同,建议:
- **Excel**: 约定格式为第一列字段名,第二列类型,第三列注释
- **Word**: 约定使用表格格式,第一行表头,后续行字段信息
- **PDF**: 约定使用表格格式
如果格式不统一,需要增加更智能的识别逻辑。
### 3. 错误处理
- 文件读取失败:返回 400 错误
- 文件格式错误:返回 400 错误,提示正确的格式
- 解析失败:返回 500 错误,记录详细错误日志
- 文件过大:返回 413 错误
### 4. 性能优化
- 对于大文件,考虑使用异步处理
- 使用临时文件,处理完成后删除
- 考虑添加缓存机制(相同文件解析结果缓存)
### 5. 安全性
- 文件上传路径验证,防止路径遍历攻击
- 文件类型验证,防止恶意文件上传
- 文件大小限制,防止 DoS 攻击
- 临时文件及时清理
---
## 📝 开发检查清单
- [ ] 支持 Excel (.xlsx, .xls) 格式解析
- [ ] 支持 Word (.doc, .docx) 格式解析
- [ ] 支持 PDF (.pdf) 格式解析(可选)
- [ ] 文件类型自动识别
- [ ] 文件大小限制50MB
- [ ] 错误处理和异常捕获
- [ ] 单元测试覆盖
- [ ] 日志记录
- [ ] 临时文件清理
- [ ] API 文档生成Swagger
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md)
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)
- [FastAPI 官方文档](https://fastapi.tiangolo.com/)
- [pandas 文档](https://pandas.pydata.org/docs/)
- [python-docx 文档](https://python-docx.readthedocs.io/)
---
## 📞 联系方式
如有开发问题,请联系:
- **接口负责人**: [待填写]
- **技术顾问**: [待填写]

595
docs/02-parse-sql-result.md Normal file
View File

@ -0,0 +1,595 @@
# 接口开发说明 - SQL 结果解析接口
## 📋 接口基本信息
- **接口路径**: `/api/v1/inventory/parse-sql-result`
- **请求方法**: `POST`
- **接口功能**: 解析 IT 执行 SQL 脚本后导出的 Excel/CSV 结果文件,提取表名、字段名、字段类型等信息
- **涉及页面**: `InventoryStep.vue` - 方案二IT 脚本提取)
- **是否涉及大模型**: ❌ 否
- **工作量评估**: 2 人日
- **优先级**: 低
---
## 🎯 功能描述
该接口用于解析 IT 部门执行标准 SQL 脚本后导出的结果文件。SQL 脚本通常查询 `information_schema.COLUMNS` 表,导出的结果文件包含以下列:
- 表英文名 (TABLE_NAME)
- 表中文名/描述 (TABLE_COMMENT)
- 字段英文名 (COLUMN_NAME)
- 字段中文名 (COLUMN_COMMENT)
- 字段类型 (COLUMN_TYPE)
支持的文件格式:
- **Excel**: `.xlsx`, `.xls`
- **CSV**: `.csv`
---
## 🔧 技术实现方案
### 技术栈
```python
# 核心依赖
fastapi>=0.104.0 # Web 框架
pydantic>=2.0.0 # 数据验证
# 数据处理
pandas>=2.0.0 # CSV/Excel 解析
openpyxl>=3.1.0 # Excel 处理(如果使用 openpyxl
```
### 实现思路
1. **文件上传/路径**: 接收 Excel 或 CSV 文件
2. **文件解析**: 使用 `pandas` 读取文件
3. **数据清洗**: 清理空行、空值,标准化数据格式
4. **表结构提取**: 根据列名提取表名、字段名、类型等信息
5. **数据验证**: 验证数据完整性和格式正确性
6. **结果返回**: 返回标准化的表结构数据
---
## 📥 请求格式
### 请求方式
**Content-Type**: `multipart/form-data``application/json`
### 请求参数
```http
POST /api/v1/inventory/parse-sql-result
Content-Type: multipart/form-data
file: [二进制文件]
project_id: string
file_type: excel | csv (可选,自动识别)
```
```json
{
"file_path": "/path/to/result.xlsx",
"file_type": "excel | csv",
"project_id": "project_001"
}
```
### 请求参数说明
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| `file` | File | 是 | 上传的文件(方式一) |
| `file_path` | string | 是 | 文件路径(方式二) |
| `file_type` | string | 否 | 文件类型:`excel` / `csv`,如果不传则根据文件扩展名自动识别 |
| `project_id` | string | 是 | 项目ID |
### 标准 SQL 脚本示例
IT 部门需要执行的 SQL 脚本:
```sql
SELECT
TABLE_NAME AS '表英文名',
TABLE_COMMENT AS '表中文名/描述',
COLUMN_NAME AS '字段英文名',
COLUMN_COMMENT AS '字段中文名',
COLUMN_TYPE AS '字段类型'
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = '您的数据库名';
```
---
## 📤 响应格式
### 成功响应
```json
{
"success": true,
"code": 200,
"message": "SQL 结果解析成功",
"data": {
"tables": [
{
"raw_name": "t_user_base_01",
"display_name": "用户基础信息表",
"description": "存储用户基本信息的表",
"fields": [
{
"raw_name": "user_id",
"display_name": "用户ID",
"type": "varchar(64)",
"comment": "用户的唯一标识符"
}
],
"field_count": 10
}
],
"total_tables": 5,
"total_fields": 245,
"parse_time": 0.45,
"file_info": {
"file_name": "schema_export.xlsx",
"file_size": 512000,
"file_type": "excel"
}
}
}
```
### 失败响应
```json
{
"success": false,
"code": 400,
"message": "文件格式错误或缺少必要列",
"error": {
"error_code": "INVALID_FILE_FORMAT",
"error_detail": "文件缺少必要列:表英文名、字段英文名、字段类型"
}
}
```
---
## 💻 代码实现示例
### FastAPI 实现
```python
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, List, Dict
import pandas as pd
import os
from pathlib import Path
import time
app = FastAPI()
class FieldInfo(BaseModel):
raw_name: str
display_name: Optional[str] = None
type: str
comment: Optional[str] = None
class TableInfo(BaseModel):
raw_name: str
display_name: Optional[str] = None
description: Optional[str] = None
fields: List[FieldInfo]
field_count: int
def parse_sql_result_excel(file_path: str) -> List[TableInfo]:
"""解析 Excel 格式的 SQL 结果"""
try:
# 读取 Excel 文件
df = pd.read_excel(file_path)
# 标准化列名(支持多种可能的列名)
column_mapping = {
'表英文名': 'table_name',
'TABLE_NAME': 'table_name',
'table_name': 'table_name',
'表中文名/描述': 'table_comment',
'TABLE_COMMENT': 'table_comment',
'table_comment': 'table_comment',
'字段英文名': 'column_name',
'COLUMN_NAME': 'column_name',
'column_name': 'column_name',
'字段中文名': 'column_comment',
'COLUMN_COMMENT': 'column_comment',
'column_comment': 'column_comment',
'字段类型': 'column_type',
'COLUMN_TYPE': 'column_type',
'column_type': 'column_type'
}
# 重命名列
df.columns = df.columns.str.strip()
df = df.rename(columns=column_mapping)
# 验证必要列是否存在
required_columns = ['table_name', 'column_name', 'column_type']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"缺少必要列: {', '.join(missing_columns)}")
# 清理数据(去除空值)
df = df.dropna(subset=['table_name', 'column_name'])
# 按表名分组
tables_dict: Dict[str, List[FieldInfo]] = {}
for _, row in df.iterrows():
table_name = str(row['table_name']).strip()
column_name = str(row['column_name']).strip()
if not table_name or not column_name:
continue
# 获取字段信息
field = FieldInfo(
raw_name=column_name,
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
)
# 按表分组
if table_name not in tables_dict:
tables_dict[table_name] = []
tables_dict[table_name].append(field)
# 构建表信息
tables = []
for table_name, fields in tables_dict.items():
# 获取表的描述信息(取第一个字段的表描述,或使用表名)
table_comment = None
if 'table_comment' in df.columns:
table_comment_row = df[df['table_name'] == table_name].iloc[0]
if pd.notna(table_comment_row.get('table_comment')):
table_comment = str(table_comment_row['table_comment']).strip()
table = TableInfo(
raw_name=table_name,
display_name=table_comment if table_comment else table_name,
description=table_comment,
fields=fields,
field_count=len(fields)
)
tables.append(table)
return tables
except Exception as e:
raise Exception(f"Excel 解析失败: {str(e)}")
def parse_sql_result_csv(file_path: str) -> List[TableInfo]:
"""解析 CSV 格式的 SQL 结果"""
try:
# 读取 CSV 文件(尝试不同的编码)
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
df = None
for encoding in encodings:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
if df is None:
raise ValueError("无法解析 CSV 文件,请检查文件编码")
# 后续处理与 Excel 相同
return parse_sql_result_excel_dataframe(df)
except Exception as e:
raise Exception(f"CSV 解析失败: {str(e)}")
def parse_sql_result_excel_dataframe(df: pd.DataFrame) -> List[TableInfo]:
"""从 DataFrame 解析 SQL 结果(共用逻辑)"""
# 标准化列名
column_mapping = {
'表英文名': 'table_name',
'TABLE_NAME': 'table_name',
'table_name': 'table_name',
'表中文名/描述': 'table_comment',
'TABLE_COMMENT': 'table_comment',
'table_comment': 'table_comment',
'字段英文名': 'column_name',
'COLUMN_NAME': 'column_name',
'column_name': 'column_name',
'字段中文名': 'column_comment',
'COLUMN_COMMENT': 'column_comment',
'column_comment': 'column_comment',
'字段类型': 'column_type',
'COLUMN_TYPE': 'column_type',
'column_type': 'column_type'
}
df.columns = df.columns.str.strip()
df = df.rename(columns=column_mapping)
# 验证必要列
required_columns = ['table_name', 'column_name', 'column_type']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"缺少必要列: {', '.join(missing_columns)}")
# 清理数据
df = df.dropna(subset=['table_name', 'column_name'])
# 按表分组
tables_dict = {}
for _, row in df.iterrows():
table_name = str(row['table_name']).strip()
column_name = str(row['column_name']).strip()
if not table_name or not column_name:
continue
field = FieldInfo(
raw_name=column_name,
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
)
if table_name not in tables_dict:
tables_dict[table_name] = []
tables_dict[table_name].append(field)
# 构建表信息
tables = []
for table_name, fields in tables_dict.items():
table_comment = None
if 'table_comment' in df.columns:
table_comment_row = df[df['table_name'] == table_name].iloc[0]
if pd.notna(table_comment_row.get('table_comment')):
table_comment = str(table_comment_row['table_comment']).strip()
table = TableInfo(
raw_name=table_name,
display_name=table_comment if table_comment else table_name,
description=table_comment,
fields=fields,
field_count=len(fields)
)
tables.append(table)
return tables
@app.post("/api/v1/inventory/parse-sql-result")
async def parse_sql_result(
file: Optional[UploadFile] = File(None),
file_path: Optional[str] = Form(None),
file_type: Optional[str] = Form(None),
project_id: str = Form(...)
):
"""
SQL 结果解析接口
解析 IT 执行 SQL 脚本后导出的 Excel/CSV 结果文件
"""
start_time = time.time()
try:
# 验证参数
if not file and not file_path:
raise HTTPException(
status_code=400,
detail="必须提供文件或文件路径"
)
# 处理文件上传
if file:
upload_dir = Path("/tmp/uploads")
upload_dir.mkdir(exist_ok=True)
file_path = str(upload_dir / file.filename)
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
file_name = file.filename
file_size = len(content)
if not file_type:
ext = Path(file_name).suffix.lower()
if ext in ['.xlsx', '.xls']:
file_type = 'excel'
elif ext == '.csv':
file_type = 'csv'
else:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {ext}"
)
else:
if not os.path.exists(file_path):
raise HTTPException(
status_code=404,
detail=f"文件不存在: {file_path}"
)
file_name = Path(file_path).name
file_size = os.path.getsize(file_path)
if not file_type:
ext = Path(file_name).suffix.lower()
if ext in ['.xlsx', '.xls']:
file_type = 'excel'
elif ext == '.csv':
file_type = 'csv'
else:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {ext}"
)
# 根据文件类型解析
if file_type == 'excel':
tables = parse_sql_result_excel(file_path)
elif file_type == 'csv':
tables = parse_sql_result_csv(file_path)
else:
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file_type}"
)
# 计算统计信息
total_fields = sum(table.field_count for table in tables)
parse_time = time.time() - start_time
# 构建响应
response_data = {
"tables": [table.dict() for table in tables],
"total_tables": len(tables),
"total_fields": total_fields,
"parse_time": round(parse_time, 2),
"file_info": {
"file_name": file_name,
"file_size": file_size,
"file_type": file_type
}
}
return {
"success": True,
"code": 200,
"message": "SQL 结果解析成功",
"data": response_data
}
except HTTPException:
raise
except Exception as e:
return JSONResponse(
status_code=500,
content={
"success": False,
"code": 500,
"message": "SQL 结果解析失败",
"error": {
"error_code": "PARSE_ERROR",
"error_detail": str(e)
}
}
)
```
---
## 🧪 测试用例
### 单元测试示例
```python
import pytest
from fastapi.testclient import TestClient
import pandas as pd
import tempfile
client = TestClient(app)
def test_parse_sql_result_excel():
"""测试解析 Excel 格式的 SQL 结果"""
# 创建测试数据
test_data = {
'表英文名': ['t_user', 't_user', 't_order'],
'表中文名/描述': ['用户表', '用户表', '订单表'],
'字段英文名': ['user_id', 'user_name', 'order_id'],
'字段中文名': ['用户ID', '用户名', '订单ID'],
'字段类型': ['varchar(64)', 'varchar(50)', 'bigint']
}
df = pd.DataFrame(test_data)
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp:
df.to_excel(tmp.name, index=False)
with open(tmp.name, 'rb') as f:
response = client.post(
"/api/v1/inventory/parse-sql-result",
files={"file": ("test.xlsx", f, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")},
data={"project_id": "test_project"}
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["data"]["total_tables"] == 2 # t_user 和 t_order
assert data["data"]["total_fields"] == 3
def test_invalid_file_format():
"""测试无效文件格式"""
response = client.post(
"/api/v1/inventory/parse-sql-result",
files={"file": ("test.txt", b"invalid content", "text/plain")},
data={"project_id": "test_project"}
)
assert response.status_code == 400
data = response.json()
assert data["success"] is False
```
---
## ⚠️ 注意事项
### 1. 列名映射
由于不同数据库导出的列名可能不同,需要支持多种列名映射:
- `表英文名` / `TABLE_NAME` / `table_name`
- `字段英文名` / `COLUMN_NAME` / `column_name`
- `字段类型` / `COLUMN_TYPE` / `column_type`
### 2. CSV 编码问题
CSV 文件可能存在编码问题GBK、UTF-8 等),需要尝试多种编码:
- UTF-8优先
- GBK
- GB2312
- Latin-1
### 3. 数据清洗
- 去除空行和空值
- 标准化表名和字段名(去除前后空格)
- 处理特殊字符
### 4. 错误处理
- 文件格式验证
- 必要列验证
- 数据完整性验证
- 异常捕获和日志记录
---
## 📝 开发检查清单
- [ ] 支持 Excel (.xlsx, .xls) 格式解析
- [ ] 支持 CSV (.csv) 格式解析
- [ ] 支持多种列名映射
- [ ] CSV 编码自动检测
- [ ] 数据清洗和验证
- [ ] 错误处理和异常捕获
- [ ] 单元测试覆盖
- [ ] 日志记录
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md)
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)

View File

@ -0,0 +1,547 @@
# 接口开发说明 - 业务表解析接口
## 📋 接口基本信息
- **接口路径**: `/api/v1/inventory/parse-business-tables`
- **请求方法**: `POST`
- **接口功能**: 解析业务人员手动导出的核心业务表Excel/CSV支持批量文件解析和表结构识别
- **涉及页面**: `InventoryStep.vue` - 方案三(业务关键表导入)
- **是否涉及大模型**: ❌ 否
- **工作量评估**: 3 人日
- **优先级**: 中
---
## 🎯 功能描述
该接口用于解析业务人员手动导出的核心业务表文件,支持:
- **批量文件上传**: 一次可上传多个文件
- **格式支持**: Excel (.xlsx, .xls)、CSV (.csv)
- **表结构识别**: 自动识别 Excel 中的表结构(通过 Sheet 名称或文件名)
- **进度反馈**: 支持批量处理时的进度反馈
适用场景:
- SaaS 系统(如 Salesforce、金蝶、有赞无法直接连接数据库
- 业务人员手动导出核心业务表
- 需要批量处理多个文件
---
## 🔧 技术实现方案
### 技术栈
```python
# 核心依赖
fastapi>=0.104.0 # Web 框架
pydantic>=2.0.0 # 数据验证
celery>=5.3.0 # 异步任务(可选)
# 数据处理
pandas>=2.0.0 # 批量文件处理
openpyxl>=3.1.0 # Excel 处理
```
### 实现思路
1. **批量文件上传**: 接收多个文件
2. **文件解析**: 使用 `pandas` 批量读取文件
3. **表结构识别**: 根据文件名或 Sheet 名称识别表名
4. **字段识别**: 从 Excel/CSV 的表头识别字段名和类型
5. **进度反馈**: 使用异步任务或进度回调
6. **结果汇总**: 汇总所有文件的解析结果
---
## 📥 请求格式
### 请求方式
**Content-Type**: `multipart/form-data`
### 请求参数
```http
POST /api/v1/inventory/parse-business-tables
Content-Type: multipart/form-data
files: [文件1, 文件2, ...] # 多个文件
project_id: string
```
```json
{
"file_paths": ["/path/to/file1.xlsx", "/path/to/file2.csv", ...],
"project_id": "project_001"
}
```
### 请求参数说明
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| `files` | File[] | 是 | 上传的文件列表(方式一,支持多个) |
| `file_paths` | string[] | 是 | 文件路径列表(方式二) |
| `project_id` | string | 是 | 项目ID |
---
## 📤 响应格式
### 成功响应
```json
{
"success": true,
"code": 200,
"message": "业务表解析成功",
"data": {
"tables": [
{
"raw_name": "orders",
"display_name": "订单流水明细表",
"description": "从文件 orders.xlsx 解析",
"source_file": "orders.xlsx",
"fields": [
{
"raw_name": "order_id",
"display_name": "订单ID",
"type": "string",
"comment": null,
"inferred_type": "varchar(64)"
}
],
"field_count": 10,
"row_count": 10000
}
],
"total_tables": 5,
"total_fields": 150,
"total_files": 5,
"success_files": 5,
"failed_files": [],
"parse_time": 3.45,
"file_info": {
"processed_files": [
{
"file_name": "orders.xlsx",
"file_size": 1024000,
"tables_extracted": 1,
"status": "success"
}
]
}
}
}
```
### 异步任务响应(如果使用异步处理)
```json
{
"success": true,
"code": 202,
"message": "任务已提交,正在处理中",
"data": {
"task_id": "task_123456",
"total_files": 5,
"status": "processing",
"estimated_time": 30
}
}
```
---
## 💻 代码实现示例
### FastAPI 实现(同步版本)
```python
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, List, Dict
import pandas as pd
import os
from pathlib import Path
import time
from collections import defaultdict
app = FastAPI()
class FieldInfo(BaseModel):
raw_name: str
display_name: Optional[str] = None
type: str
comment: Optional[str] = None
inferred_type: Optional[str] = None
class TableInfo(BaseModel):
raw_name: str
display_name: Optional[str] = None
description: Optional[str] = None
source_file: str
fields: List[FieldInfo]
field_count: int
row_count: Optional[int] = None
def infer_field_type(pd_type: str) -> str:
"""根据 pandas 类型推断数据库字段类型"""
type_mapping = {
'object': 'varchar(255)',
'int64': 'bigint',
'int32': 'int',
'float64': 'double',
'float32': 'float',
'bool': 'tinyint',
'datetime64[ns]': 'datetime',
'date': 'date'
}
return type_mapping.get(str(pd_type), 'varchar(255)')
def parse_excel_file(file_path: str, file_name: str) -> List[TableInfo]:
"""解析单个 Excel 文件"""
tables = []
try:
# 读取所有 Sheet
excel_file = pd.ExcelFile(file_path)
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 跳过空 Sheet
if df.empty:
continue
# 识别字段
fields = []
for col in df.columns:
# 推断字段类型
col_type = str(df[col].dtype)
inferred_type = infer_field_type(col_type)
field = FieldInfo(
raw_name=str(col).strip(),
display_name=str(col).strip(),
type=inferred_type,
comment=None,
inferred_type=inferred_type
)
fields.append(field)
if fields:
# 使用 Sheet 名称或文件名作为表名
table_name = sheet_name.lower().replace(' ', '_').replace('-', '_')
if not table_name:
table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_')
table = TableInfo(
raw_name=table_name,
display_name=sheet_name,
description=f"从文件 {file_name} 的 Sheet '{sheet_name}' 解析",
source_file=file_name,
fields=fields,
field_count=len(fields),
row_count=len(df)
)
tables.append(table)
except Exception as e:
raise Exception(f"解析文件 {file_name} 失败: {str(e)}")
return tables
def parse_csv_file(file_path: str, file_name: str) -> List[TableInfo]:
"""解析单个 CSV 文件"""
tables = []
try:
# 尝试多种编码
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
df = None
for encoding in encodings:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
if df is None:
raise ValueError("无法解析 CSV 文件,请检查文件编码")
if df.empty:
return tables
# 识别字段
fields = []
for col in df.columns:
col_type = str(df[col].dtype)
inferred_type = infer_field_type(col_type)
field = FieldInfo(
raw_name=str(col).strip(),
display_name=str(col).strip(),
type=inferred_type,
comment=None,
inferred_type=inferred_type
)
fields.append(field)
if fields:
# 使用文件名作为表名
table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_')
table = TableInfo(
raw_name=table_name,
display_name=Path(file_name).stem,
description=f"从文件 {file_name} 解析",
source_file=file_name,
fields=fields,
field_count=len(fields),
row_count=len(df)
)
tables.append(table)
except Exception as e:
raise Exception(f"解析文件 {file_name} 失败: {str(e)}")
return tables
@app.post("/api/v1/inventory/parse-business-tables")
async def parse_business_tables(
files: List[UploadFile] = File(...),
project_id: str = Form(...)
):
"""
业务表解析接口
批量解析业务人员导出的核心业务表文件
"""
start_time = time.time()
upload_dir = Path("/tmp/uploads")
upload_dir.mkdir(exist_ok=True)
all_tables = []
processed_files = []
failed_files = []
try:
# 处理每个文件
for file in files:
file_name = file.filename
file_path = str(upload_dir / file_name)
try:
# 保存文件
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
file_size = len(content)
# 根据文件扩展名选择解析方法
ext = Path(file_name).suffix.lower()
if ext in ['.xlsx', '.xls']:
tables = parse_excel_file(file_path, file_name)
elif ext == '.csv':
tables = parse_csv_file(file_path, file_name)
else:
failed_files.append({
"file_name": file_name,
"error": f"不支持的文件类型: {ext}"
})
continue
all_tables.extend(tables)
processed_files.append({
"file_name": file_name,
"file_size": file_size,
"tables_extracted": len(tables),
"status": "success"
})
# 清理临时文件
os.remove(file_path)
except Exception as e:
failed_files.append({
"file_name": file_name,
"error": str(e)
})
# 清理临时文件
if os.path.exists(file_path):
os.remove(file_path)
# 计算统计信息
total_fields = sum(table.field_count for table in all_tables)
parse_time = time.time() - start_time
# 构建响应
response_data = {
"tables": [table.dict() for table in all_tables],
"total_tables": len(all_tables),
"total_fields": total_fields,
"total_files": len(files),
"success_files": len(processed_files),
"failed_files": failed_files,
"parse_time": round(parse_time, 2),
"file_info": {
"processed_files": processed_files
}
}
return {
"success": True,
"code": 200,
"message": f"成功解析 {len(processed_files)} 个文件,提取 {len(all_tables)} 个表",
"data": response_data
}
except Exception as e:
return JSONResponse(
status_code=500,
content={
"success": False,
"code": 500,
"message": "业务表解析失败",
"error": {
"error_code": "PARSE_ERROR",
"error_detail": str(e)
}
}
)
```
### 异步版本(使用 Celery可选
```python
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379')
@celery_app.task
def parse_business_tables_async(file_paths: List[str], project_id: str):
"""异步解析业务表"""
# 解析逻辑同上
pass
@app.post("/api/v1/inventory/parse-business-tables-async")
async def parse_business_tables_async_endpoint(
files: List[UploadFile] = File(...),
project_id: str = Form(...)
):
"""异步业务表解析接口"""
# 保存文件
file_paths = []
for file in files:
file_path = f"/tmp/uploads/{file.filename}"
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
file_paths.append(file_path)
# 提交异步任务
task = parse_business_tables_async.delay(file_paths, project_id)
return {
"success": True,
"code": 202,
"message": "任务已提交,正在处理中",
"data": {
"task_id": task.id,
"total_files": len(files),
"status": "processing",
"estimated_time": len(files) * 10 # 估算时间(秒)
}
}
@app.get("/api/v1/inventory/parse-business-tables-status/{task_id}")
async def get_parse_status(task_id: str):
"""查询解析任务状态"""
task = celery_app.AsyncResult(task_id)
if task.ready():
return {
"success": True,
"code": 200,
"data": {
"task_id": task_id,
"status": "completed",
"result": task.result
}
}
else:
return {
"success": True,
"code": 200,
"data": {
"task_id": task_id,
"status": "processing",
"progress": task.info.get('progress', 0) if task.info else 0
}
}
```
---
## ⚠️ 注意事项
### 1. 批量处理性能
- 对于大量文件,建议使用异步处理
- 设置合理的文件大小限制
- 考虑并行处理以提高性能
### 2. 表名识别
由于是业务人员手动导出,表名识别可能不准确:
- 优先使用 Excel Sheet 名称
- 其次使用文件名
- 提供手动修正功能(可选)
### 3. 字段类型推断
- 基于 pandas 类型推断,可能不够准确
- 后续可通过 AI 识别接口进一步优化
- 记录推断类型,便于后续验证
### 4. 错误处理
- 单个文件失败不应影响其他文件处理
- 记录详细的错误信息
- 提供失败文件列表
### 5. 资源管理
- 及时清理临时文件
- 控制并发文件数量
- 限制单个文件大小
---
## 📝 开发检查清单
- [ ] 支持批量文件上传
- [ ] 支持 Excel (.xlsx, .xls) 格式
- [ ] 支持 CSV (.csv) 格式
- [ ] Excel 多 Sheet 支持
- [ ] CSV 编码自动检测
- [ ] 字段类型推断
- [ ] 进度反馈(异步版本)
- [ ] 错误处理(单个文件失败不影响其他)
- [ ] 临时文件清理
- [ ] 单元测试覆盖
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md)
- [接口 1.1 - 文档解析接口](./01-parse-document.md)
- [接口 1.2 - SQL 结果解析接口](./02-parse-sql-result.md)
- [接口 1.4 - 数据资产智能识别接口](./04-ai-analyze.md) - 可进一步优化识别结果

751
docs/04-ai-analyze.md Normal file
View File

@ -0,0 +1,751 @@
# 接口开发说明 - 数据资产智能识别接口 ⭐⭐⭐
## 📋 接口基本信息
- **接口路径**: `/api/v1/inventory/ai-analyze`
- **请求方法**: `POST`
- **接口功能**: 使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征,并提供置信度评分
- **涉及页面**: `InventoryStep.vue` - AI 盘点处理阶段
- **是否涉及大模型**: ✅ **是**(核心功能)
- **工作量评估**: **15 人日**
- **优先级**: **高**
---
## 🎯 功能描述
该接口是数据资产盘点系统的核心功能,使用大模型技术智能识别和标注数据资产,具体功能包括:
1. **表名和字段名中文命名识别**
- 将英文表名/字段名转换为中文名称
- 识别业务含义
2. **业务含义描述生成**
- 自动生成表的中文描述
- 自动生成字段的中文描述
3. **PII个人信息识别**
- 识别敏感个人信息SPI
- 符合《个人信息保护法》(PIPL) 要求
- 识别类型:手机号、身份证、姓名、邮箱、地址等
4. **重要数据识别**
- 识别《数据安全法》定义的重要数据
- 涉及国家安全、公共利益的数据
5. **置信度评分**
- 评估识别结果的可靠性0-100%
- 考虑字段命名规范度、注释完整性等因素
---
## 🔧 技术实现方案
### 技术栈
```python
# 核心依赖
fastapi>=0.104.0 # Web 框架
pydantic>=2.0.0 # 数据验证
httpx>=0.24.0 # HTTP 客户端(用于调用大模型 API
# 大模型 SDK
openai>=1.0.0 # OpenAI API (如果使用 GPT-4)
dashscope>=1.14.0 # 通义千问 API
qianfan>=0.1.0 # 文心一言 API
# 工具库
python-dotenv>=1.0.0 # 环境变量管理
loguru>=0.7.0 # 日志管理
redis>=5.0.0 # 缓存(可选)
```
### 大模型选择建议
| 场景 | 推荐模型 | 理由 |
|------|---------|------|
| 数据资产识别 | 通义千问 / GPT-4 | 需要准确理解表结构和业务含义 |
### 实现思路
1. **输入数据准备**: 整理表结构信息、行业背景、业务上下文
2. **提示词构建**: 根据输入数据构建专业的提示词
3. **大模型调用**: 调用大模型 API 进行识别
4. **结果解析**: 解析大模型返回的 JSON 结果
5. **规则引擎验证**: 使用规则引擎验证和补充识别结果
6. **置信度评分**: 计算识别结果的置信度
7. **结果验证**: 验证数据格式和逻辑正确性
---
## 📥 请求格式
### 请求方式
**Content-Type**: `application/json`
### 请求参数
```json
{
"tables": [
{
"raw_name": "t_user_base_01",
"fields": [
{
"raw_name": "user_id",
"type": "varchar(64)",
"comment": "用户ID"
},
{
"raw_name": "phone",
"type": "varchar(11)",
"comment": "手机号"
},
{
"raw_name": "id_card",
"type": "varchar(18)",
"comment": "身份证号"
}
]
}
],
"project_id": "project_001",
"industry": "retail-fresh",
"context": "某连锁生鲜零售企业,主营水果、蔬菜等生鲜产品",
"options": {
"model": "qwen-max",
"temperature": 0.3,
"enable_pii_detection": true,
"enable_important_data_detection": true
}
}
```
### 请求参数说明
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| `tables` | array | 是 | 表列表,每个表包含表名和字段列表 |
| `tables[].raw_name` | string | 是 | 表名(英文/原始名称) |
| `tables[].fields` | array | 是 | 字段列表 |
| `tables[].fields[].raw_name` | string | 是 | 字段名(英文) |
| `tables[].fields[].type` | string | 是 | 字段类型 |
| `tables[].fields[].comment` | string | 否 | 字段注释(如果有) |
| `project_id` | string | 是 | 项目ID |
| `industry` | string | 否 | 行业信息retail-fresh |
| `context` | string | 否 | 业务背景信息 |
| `options` | object | 否 | 可选配置 |
| `options.model` | string | 否 | 大模型选择qwen-max/gpt-4/ernie-bot |
| `options.temperature` | float | 否 | 温度参数0.0-1.0),默认 0.3 |
| `options.enable_pii_detection` | boolean | 否 | 是否启用 PII 识别,默认 true |
| `options.enable_important_data_detection` | boolean | 否 | 是否启用重要数据识别,默认 true |
---
## 📤 响应格式
### 成功响应
```json
{
"success": true,
"code": 200,
"message": "数据资产识别成功",
"data": {
"tables": [
{
"raw_name": "t_user_base_01",
"ai_name": "会员基础信息表",
"desc": "存储C端注册用户的核心身份信息",
"confidence": 98,
"ai_completed": true,
"fields": [
{
"raw_name": "user_id",
"ai_name": "用户ID",
"desc": "用户的唯一标识符",
"type": "varchar(64)",
"pii": [],
"pii_type": null,
"is_important_data": false,
"confidence": 95
},
{
"raw_name": "phone",
"ai_name": "手机号",
"desc": "用户的联系电话",
"type": "varchar(11)",
"pii": ["手机号"],
"pii_type": "contact",
"is_important_data": false,
"confidence": 98
},
{
"raw_name": "id_card",
"ai_name": "身份证号",
"desc": "用户的身份证号码",
"type": "varchar(18)",
"pii": ["身份证号"],
"pii_type": "identity",
"is_important_data": false,
"confidence": 99
}
],
"pii": ["手机号", "身份证号"],
"important": false,
"important_data_types": []
}
],
"statistics": {
"total_tables": 1,
"total_fields": 3,
"pii_fields_count": 2,
"important_data_fields_count": 0,
"average_confidence": 97.3
},
"processing_time": 5.2,
"model_used": "qwen-max",
"token_usage": {
"prompt_tokens": 1200,
"completion_tokens": 800,
"total_tokens": 2000
}
}
}
```
### 失败响应
```json
{
"success": false,
"code": 500,
"message": "数据资产识别失败",
"error": {
"error_code": "AI_ANALYZE_ERROR",
"error_detail": "大模型 API 调用失败: Rate limit exceeded",
"retryable": true
}
}
```
---
## 💻 代码实现示例
### 提示词模板设计
```python
SYSTEM_PROMPT = """你是一位专业的数据资产管理专家,擅长识别数据资产的中文名称、业务含义、敏感信息和重要数据特征。
## 你的专业能力
- 深入理解数据资产管理、数据合规PIPL、数据安全法等法规要求
- 熟悉各种业务场景下的数据资产命名规范
- 能够准确识别敏感个人信息SPI和重要数据
- 具备优秀的文本理解和生成能力
## 输出要求
1. **准确性**: 中文命名必须准确反映业务含义
2. **合规性**: PII 识别必须符合《个人信息保护法》(PIPL)
3. **完整性**: 重要数据识别必须符合《数据安全法》
4. **专业性**: 使用专业术语,符合行业标准
5. **结构化**: 严格按照JSON格式输出
"""
USER_PROMPT_TEMPLATE = """请基于以下信息识别数据资产:
## 行业背景
{industry_info}
## 业务背景
{context_info}
## 表结构信息
{tables_info}
## 识别要求
1. 为每个表生成中文名称ai_name和业务描述desc
2. 为每个字段生成中文名称ai_name和业务描述desc
3. 识别敏感个人信息PII
- 手机号、身份证号、姓名、邮箱、地址等
- 生物识别信息(人脸、指纹等)
- 医疗健康信息
- 金融账户信息
- 行踪轨迹信息
4. 识别重要数据(符合《数据安全法》):
- 涉及国家安全的数据
- 涉及公共利益的数据
- 高精度地理信息(军事禁区周边)
- 关键物资流向(稀土、芯片等)
5. 计算置信度评分0-100
- 字段命名规范度
- 注释完整性
- 业务含义明确度
## 输出格式JSON
{json_schema}
"""
JSON_SCHEMA = """
{
"tables": [
{
"raw_name": "string",
"ai_name": "string",
"desc": "string",
"confidence": "integer (0-100)",
"fields": [
{
"raw_name": "string",
"ai_name": "string",
"desc": "string",
"pii": ["string"],
"pii_type": "string | null",
"is_important_data": "boolean",
"confidence": "integer (0-100)"
}
],
"pii": ["string"],
"important": "boolean",
"important_data_types": ["string"]
}
]
}
"""
```
### FastAPI 实现
```python
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
import json
import os
from dotenv import load_dotenv
import httpx
import time
from loguru import logger
load_dotenv()
app = FastAPI()
# 大模型配置
MODEL_CONFIG = {
"qwen-max": {
"api_key": os.getenv("DASHSCOPE_API_KEY"),
"base_url": "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation",
"model_name": "qwen-max"
},
"gpt-4": {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": "https://api.openai.com/v1/chat/completions",
"model_name": "gpt-4"
}
}
class FieldInput(BaseModel):
raw_name: str
type: str
comment: Optional[str] = None
class TableInput(BaseModel):
raw_name: str
fields: List[FieldInput]
class AnalyzeRequest(BaseModel):
tables: List[TableInput]
project_id: str
industry: Optional[str] = None
context: Optional[str] = None
options: Optional[Dict] = None
class FieldOutput(BaseModel):
raw_name: str
ai_name: str
desc: str
type: str
pii: List[str] = []
pii_type: Optional[str] = None
is_important_data: bool = False
confidence: int = Field(ge=0, le=100)
class TableOutput(BaseModel):
raw_name: str
ai_name: str
desc: str
confidence: int = Field(ge=0, le=100)
ai_completed: bool = True
fields: List[FieldOutput]
pii: List[str] = []
important: bool = False
important_data_types: List[str] = []
def build_prompt(tables: List[TableInput], industry: str = None, context: str = None) -> str:
"""构建提示词"""
# 格式化表信息
tables_info = []
for table in tables:
table_info = f"表名: {table.raw_name}\n字段列表:\n"
for field in table.fields:
field_info = f" - {field.raw_name} ({field.type})"
if field.comment:
field_info += f" - {field.comment}"
table_info += field_info + "\n"
tables_info.append(table_info)
tables_info_str = "\n\n".join(tables_info)
# 行业信息
industry_info = industry if industry else "未指定"
# 业务背景
context_info = context if context else "未提供业务背景信息"
# 构建用户提示词
user_prompt = USER_PROMPT_TEMPLATE.format(
industry_info=industry_info,
context_info=context_info,
tables_info=tables_info_str,
json_schema=JSON_SCHEMA
)
return user_prompt
async def call_llm_api(prompt: str, model: str = "qwen-max", temperature: float = 0.3) -> str:
"""调用大模型 API"""
config = MODEL_CONFIG.get(model)
if not config:
raise ValueError(f"不支持的大模型: {model}")
headers = {
"Authorization": f"Bearer {config['api_key']}",
"Content-Type": "application/json"
}
if model == "qwen-max":
# 通义千问 API
payload = {
"model": config["model_name"],
"input": {
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
]
},
"parameters": {
"temperature": temperature,
"result_format": "message"
}
}
elif model == "gpt-4":
# OpenAI API
payload = {
"model": config["model_name"],
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"temperature": temperature,
"response_format": {"type": "json_object"}
}
async with httpx.AsyncClient(timeout=60.0) as client:
try:
response = await client.post(
config["base_url"],
headers=headers,
json=payload
)
response.raise_for_status()
result = response.json()
# 解析响应(根据不同的 API 格式)
if model == "qwen-max":
content = result["output"]["choices"][0]["message"]["content"]
elif model == "gpt-4":
content = result["choices"][0]["message"]["content"]
return content
except httpx.HTTPError as e:
logger.error(f"大模型 API 调用失败: {str(e)}")
raise Exception(f"大模型 API 调用失败: {str(e)}")
def parse_llm_response(response_text: str) -> Dict:
"""解析大模型返回的 JSON 结果"""
try:
# 提取 JSON 部分(如果返回的是 Markdown 格式)
if "```json" in response_text:
json_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
json_text = response_text.split("```")[1].split("```")[0].strip()
else:
json_text = response_text.strip()
# 解析 JSON
result = json.loads(json_text)
return result
except json.JSONDecodeError as e:
logger.error(f"JSON 解析失败: {str(e)}")
logger.error(f"原始响应: {response_text}")
raise Exception(f"大模型返回的 JSON 格式错误: {str(e)}")
def validate_pii_detection(field: FieldOutput, field_input: FieldInput) -> FieldOutput:
"""使用规则引擎验证和补充 PII 识别"""
# PII 关键词规则
pii_keywords = {
"phone": ["手机", "phone", "mobile", "tel", "telephone"],
"id_card": ["身份证", "id_card", "idcard", "identity"],
"name": ["姓名", "name", "real_name"],
"email": ["邮箱", "email", "mail"],
"address": ["地址", "address", "addr"]
}
field_name_lower = field.raw_name.lower()
# 如果 AI 未识别,使用规则引擎识别
if not field.pii:
for pii_type, keywords in pii_keywords.items():
if any(keyword in field_name_lower for keyword in keywords):
field.pii = [pii_type]
field.pii_type = pii_type
break
return field
def calculate_confidence(field: FieldInput, field_output: FieldOutput) -> int:
"""计算置信度评分"""
score = 50 # 基础分
# 命名规范度30分
if field.raw_name.islower() and '_' in field.raw_name:
score += 15 # 蛇形命名
elif field.raw_name.islower() and field.raw_name.isalnum():
score += 10 # 小写字母数字
# 注释完整性20分
if field.comment:
score += 20
# AI 识别结果50分
if field_output.ai_name and field_output.ai_name != field.raw_name:
score += 25
if field_output.desc:
score += 25
return min(score, 100)
@app.post("/api/v1/inventory/ai-analyze")
async def ai_analyze(request: AnalyzeRequest):
"""
数据资产智能识别接口
使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征
"""
start_time = time.time()
try:
# 获取配置
model = request.options.get("model", "qwen-max") if request.options else "qwen-max"
temperature = request.options.get("temperature", 0.3) if request.options else 0.3
enable_pii = request.options.get("enable_pii_detection", True) if request.options else True
enable_important = request.options.get("enable_important_data_detection", True) if request.options else True
# 构建提示词
prompt = build_prompt(
tables=request.tables,
industry=request.industry,
context=request.context
)
logger.info(f"调用大模型 {model} 进行数据资产识别")
# 调用大模型
response_text = await call_llm_api(prompt, model=model, temperature=temperature)
# 解析结果
llm_result = parse_llm_response(response_text)
# 转换为标准格式并验证
tables_output = []
total_pii_fields = 0
total_important_fields = 0
total_confidence = 0
for table_result, table_input in zip(llm_result.get("tables", []), request.tables):
fields_output = []
table_pii = []
table_important = False
for field_result, field_input in zip(table_result.get("fields", []), table_input.fields):
field_output = FieldOutput(
raw_name=field_result.get("raw_name", field_input.raw_name),
ai_name=field_result.get("ai_name", field_input.raw_name),
desc=field_result.get("desc", ""),
type=field_input.type,
pii=field_result.get("pii", []),
pii_type=field_result.get("pii_type"),
is_important_data=field_result.get("is_important_data", False),
confidence=field_result.get("confidence", 80)
)
# 规则引擎验证和补充
if enable_pii:
field_output = validate_pii_detection(field_output, field_input)
# 重新计算置信度
field_output.confidence = calculate_confidence(field_input, field_output)
# 收集 PII 信息
if field_output.pii:
table_pii.extend(field_output.pii)
total_pii_fields += 1
# 收集重要数据信息
if field_output.is_important_data:
table_important = True
total_important_fields += 1
fields_output.append(field_output)
total_confidence += field_output.confidence
table_output = TableOutput(
raw_name=table_result.get("raw_name", table_input.raw_name),
ai_name=table_result.get("ai_name", table_input.raw_name),
desc=table_result.get("desc", ""),
confidence=table_result.get("confidence", 80),
ai_completed=True,
fields=fields_output,
pii=list(set(table_pii)), # 去重
important=table_important,
important_data_types=table_result.get("important_data_types", [])
)
tables_output.append(table_output)
# 计算统计信息
total_fields = sum(len(table.fields) for table in tables_output)
avg_confidence = total_confidence / total_fields if total_fields > 0 else 0
processing_time = time.time() - start_time
# 构建响应
response_data = {
"tables": [table.dict() for table in tables_output],
"statistics": {
"total_tables": len(tables_output),
"total_fields": total_fields,
"pii_fields_count": total_pii_fields,
"important_data_fields_count": total_important_fields,
"average_confidence": round(avg_confidence, 2)
},
"processing_time": round(processing_time, 2),
"model_used": model,
"token_usage": {
"prompt_tokens": len(prompt) // 4, # 粗略估算
"completion_tokens": len(response_text) // 4,
"total_tokens": (len(prompt) + len(response_text)) // 4
}
}
return {
"success": True,
"code": 200,
"message": "数据资产识别成功",
"data": response_data
}
except Exception as e:
logger.error(f"数据资产识别失败: {str(e)}")
return JSONResponse(
status_code=500,
content={
"success": False,
"code": 500,
"message": "数据资产识别失败",
"error": {
"error_code": "AI_ANALYZE_ERROR",
"error_detail": str(e),
"retryable": "Rate limit" in str(e) or "timeout" in str(e).lower()
}
}
)
```
---
## ⚠️ 注意事项
### 1. 提示词工程
- **系统提示词**: 定义 AI 角色为"数据资产管理专家"
- **少样本学习**: 提供 5-10 个典型示例
- **约束条件**: 明确 PII 和重要数据的识别标准
- **输出格式**: 使用 JSON Schema 确保输出格式正确
### 2. PII 识别规则
必须符合《个人信息保护法》(PIPL),识别以下类型:
- **身份信息**: 姓名、身份证号、护照号
- **联系信息**: 手机号、邮箱、地址
- **生物识别**: 人脸、指纹、声纹
- **医疗健康**: 体检报告、疾病信息
- **金融账户**: 银行卡号、账户信息
- **行踪轨迹**: GPS 位置、行程记录
### 3. 重要数据识别规则
必须符合《数据安全法》,识别以下类型:
- **国家安全**: 军事信息、国家秘密
- **公共利益**: 关键基础设施信息
- **高精度地理**: 军事禁区周边位置
- **关键物资**: 稀土、芯片等关键物资流向
### 4. 错误处理和重试
- **API 限流**: 实现指数退避重试策略
- **超时处理**: 设置合理的超时时间60秒
- **降级策略**: API 失败时使用规则引擎作为降级方案
- **日志记录**: 详细记录每次 API 调用的请求和响应
### 5. 性能优化
- **批量处理**: 对于大量表,考虑批量调用 API
- **缓存机制**: 相同输入缓存结果,减少 API 调用
- **异步处理**: 对于大量数据,考虑异步处理
### 6. 成本控制
- **Token 优化**: 优化提示词,减少 Token 消耗
- **模型选择**: 根据需求选择合适的模型(平衡成本和质量)
- **缓存策略**: 对相同输入进行缓存
---
## 📝 开发检查清单
- [ ] 大模型 API 集成(通义千问/GPT-4
- [ ] 提示词工程设计和优化
- [ ] PII 识别规则引擎
- [ ] 重要数据识别规则引擎
- [ ] 置信度评分算法
- [ ] JSON 解析和验证
- [ ] 错误处理和重试机制
- [ ] 缓存机制(可选)
- [ ] 日志记录
- [ ] 单元测试覆盖
- [ ] 性能测试
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md)
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)
- [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md)
- [通义千问 API 文档](https://help.aliyun.com/zh/model-studio/)
- [OpenAI API 文档](https://platform.openai.com/docs)

View File

@ -0,0 +1,145 @@
# 接口开发说明 - 潜在场景推荐接口 ⭐⭐
## 📋 接口基本信息
- **接口路径**: `/api/v1/value/scenario-recommendation`
- **请求方法**: `POST`
- **接口功能**: 基于企业背景、数据资产清单和存量场景,使用 AI 推荐潜在的数据应用场景
- **涉及页面**: `ValueStep.vue` - AI 推荐潜在场景清单
- **是否涉及大模型**: ✅ **是**
- **工作量评估**: **12 人日**
- **优先级**: **高**
---
## 🎯 功能描述
该接口使用大模型技术,基于企业背景、数据资产清单和存量场景,智能推荐潜在的数据应用场景,包括:
1. **场景分类**: 降本增效、营销增长、金融服务、决策支持等
2. **推荐指数评分**: 1-5星评分
3. **场景依赖分析**: 分析场景依赖哪些数据资产
4. **商业价值评估**: 评估场景的商业价值和实施难度
---
## 📥 请求格式
### 请求参数
```json
{
"project_id": "project_001",
"company_info": {
"industry": ["retail-fresh"],
"description": "某连锁生鲜零售企业主营水果、蔬菜等生鲜产品拥有线下门店500家",
"data_scale": "100TB",
"data_sources": ["self-generated"]
},
"data_assets": [
{
"name": "会员基础信息表",
"core_tables": ["Dim_Customer"],
"description": "存储C端注册用户的核心身份信息"
},
{
"name": "订单流水记录表",
"core_tables": ["Fact_Sales"],
"description": "全渠道销售交易明细"
}
],
"existing_scenarios": [
{
"name": "月度销售经营报表",
"description": "统计各区域门店的月度GMV维度单一"
}
],
"options": {
"model": "qwen-max",
"recommendation_count": 10,
"exclude_types": []
}
}
```
---
## 📤 响应格式
### 成功响应
```json
{
"success": true,
"code": 200,
"message": "场景推荐成功",
"data": {
"recommended_scenarios": [
{
"id": 1,
"name": "精准会员营销",
"type": "营销增长",
"recommendation_index": 5,
"desc": "基于用户画像与历史交易行为,实现千人千面的优惠券发放。",
"dependencies": ["会员基础信息表", "订单流水记录表"],
"business_value": "提升复购率 15-20%",
"implementation_difficulty": "中等",
"estimated_roi": "高",
"technical_requirements": ["用户画像引擎", "推荐算法"],
"data_requirements": ["会员基础信息", "交易历史", "行为数据"]
}
],
"total_count": 10,
"generation_time": 8.5,
"model_used": "qwen-max"
}
}
```
---
## 💻 提示词模板
```python
SCENARIO_RECOMMENDATION_PROMPT = """基于以下企业信息,推荐潜在的数据应用场景:
## 企业信息
行业: {industry}
企业描述: {company_description}
数据规模: {data_scale}
数据来源: {data_sources}
## 可用数据资产
{data_assets_info}
## 存量场景(避免重复推荐)
{existing_scenarios_info}
## 推荐要求
1. 推荐 {count} 个潜在数据应用场景
2. 场景分类:降本增效、营销增长、金融服务、决策支持、风险控制等
3. 推荐指数评分1-5星综合考虑业务价值、实施难度、数据准备度
4. 分析场景依赖的数据资产
5. 评估商业价值和实施难度
6. 避免与存量场景重复
## 输出格式JSON
{json_schema}
"""
```
---
## ⚠️ 注意事项
1. **场景分类**: 需要明确定义场景分类标准
2. **推荐指数算法**: 综合考虑业务价值、实施难度、数据准备度
3. **依赖分析**: 准确识别场景依赖的数据资产
4. **避免重复**: 与存量场景对比,避免重复推荐
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md)
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)

View File

@ -0,0 +1,86 @@
# 接口开发说明 - 存量场景优化建议接口
## 📋 接口基本信息
- **接口路径**: `/api/v1/value/scenario-optimization`
- **请求方法**: `POST`
- **接口功能**: 基于存量场景信息和截图,分析场景不足,提供优化建议和改进方向
- **涉及页面**: `ContextStep.vue` - 生成场景挖掘与优化建议按钮
- **是否涉及大模型**: ✅ **是**
- **工作量评估**: 8 人日
- **优先级**: 中
---
## 🎯 功能描述
该接口使用大模型技术分析存量场景的不足,并提供优化建议,支持:
1. **图片识别OCR**: 如果上传了场景截图,使用 OCR 识别内容
2. **场景分析**: 分析现有场景的功能和不足
3. **优化建议**: 提供具体的优化建议和改进方向
4. **价值提升**: 识别可提升的价值点
---
## 📥 请求格式
```json
{
"existing_scenarios": [
{
"name": "月度销售经营报表",
"description": "统计各区域门店的月度GMV维度单一",
"image_url": "https://example.com/screenshot.png" // 可选
}
],
"data_assets": [...],
"company_info": {...}
}
```
---
## 📤 响应格式
```json
{
"success": true,
"data": {
"optimization_suggestions": [
{
"scenario_name": "月度销售经营报表",
"current_status": "维度单一仅统计GMV",
"suggestions": [
"增加时间维度分析(同比、环比)",
"增加商品类别维度分析",
"增加区域对比分析"
],
"potential_value": "提升决策支持能力 30%"
}
]
}
}
```
---
## 💻 技术实现要点
1. **OCR 集成**: 使用 PaddleOCR 识别场景截图
2. **大模型分析**: 调用大模型分析场景不足
3. **建议生成**: 基于分析结果生成优化建议
---
## ⚠️ 注意事项
1. **图片处理**: 支持常见图片格式PNG、JPG、JPEG
2. **OCR 准确性**: 需要处理 OCR 识别错误的情况
3. **建议可操作性**: 优化建议必须具体、可执行
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md)
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)

150
docs/07-generate-report.md Normal file
View File

@ -0,0 +1,150 @@
# 接口开发说明 - 完整报告生成接口 ⭐⭐⭐
## 📋 接口基本信息
- **接口路径**: `/api/v1/delivery/generate-report`
- **请求方法**: `POST`
- **接口功能**: 基于数据盘点结果、背景调研信息和价值挖掘场景,使用大模型生成完整的数据资产盘点工作总结报告(四个章节)
- **涉及页面**: `DeliveryStep.vue` - 成果交付页面
- **是否涉及大模型**: ✅ **是**(核心功能)
- **工作量评估**: **20 人日**
- **优先级**: **高**
---
## 🎯 功能描述
该接口是数据资产盘点系统的核心输出功能,使用大模型生成完整的工作总结报告,包含四个章节:
1. **章节一**: 企业数字化情况简介(企业背景、信息化建设现状、业务流与数据流)
2. **章节二**: 数据资源统计(数据总量、存储分布、数据来源结构)
3. **章节三**: 数据资产情况盘点(资产构成、应用场景、合规风险提示)
4. **章节四**: 专家建议与下一步计划(合规整改、技术演进、价值深化)
支持功能:
- **分阶段生成**: 支持分阶段生成,提高质量和可控性
- **内容验证**: 验证统计数据逻辑正确性如百分比总和为100%
- **合规性检查**: 验证合规风险分析的完整性
- **格式化输出**: 返回结构化的 JSON 格式
---
## 📥 请求格式
```json
{
"project_id": "project_001",
"project_info": {
"project_name": "数据资产盘点项目",
"industry": "retail-fresh",
"company_name": "某连锁生鲜零售企业"
},
"inventory_data": {
"total_tables": 14582,
"total_fields": 245000,
"total_data_volume": "58 PB",
"storage_distribution": [...],
"data_source_structure": {
"structured": 35,
"semi_structured": 65
},
"identified_assets": [...]
},
"context_data": {
"enterprise_background": "...",
"informatization_status": "...",
"business_flow": "..."
},
"value_data": {
"selected_scenarios": [...]
},
"options": {
"language": "zh-CN",
"detail_level": "standard",
"generation_mode": "full | staged"
}
}
```
---
## 📤 响应格式
详见《数据资产盘点报告-大模型接口设计文档.md》中的详细响应格式定义。
---
## 💻 实现要点
### 1. 分阶段生成策略(推荐)
```python
# 阶段一:生成章节一和章节二
stage1_result = await generate_sections_1_2(inventory_data, context_data)
# 阶段二:生成章节三(重点合规风险分析)
stage2_result = await generate_section_3(identified_assets, stage1_result)
# 阶段三:生成章节四(基于前面章节的分析结果)
stage3_result = await generate_section_4(stage1_result, stage2_result, value_data)
```
### 2. 数据验证
```python
def validate_report_data(report_data: dict) -> bool:
"""验证报告数据"""
# 验证百分比总和为100%
section2 = report_data.get("section2", {})
structured = section2.get("data_source_structure", {}).get("structured", {}).get("percentage", 0)
semi_structured = section2.get("data_source_structure", {}).get("semi_structured", {}).get("percentage", 0)
if structured + semi_structured != 100:
raise ValueError("数据来源结构百分比总和必须为100%")
# 验证合规风险分析完整性
section3 = report_data.get("section3", {})
assets = section3.get("assets", [])
for asset in assets:
if not asset.get("compliance_risks", {}).get("warnings"):
logger.warning(f"资产 {asset.get('title')} 缺少合规风险分析")
return True
```
### 3. 提示词模板
详见《数据资产盘点报告-大模型接口设计文档.md》中的提示词工程设计方案。
---
## ⚠️ 注意事项
1. **长文本生成**: 需要使用支持长文本的模型GPT-4 / 通义千问 Max
2. **Token 消耗**: 报告生成会消耗大量 Token需要优化提示词
3. **数据准确性**: 统计数据必须准确,基于输入数据
4. **合规性**: 合规风险分析必须符合 PIPL、数据安全法等法规
5. **建议可操作性**: 专家建议必须具体、可执行
---
## 📝 开发检查清单
- [ ] 大模型集成GPT-4 / 通义千问 Max
- [ ] 分阶段生成策略实现
- [ ] 四个章节的提示词工程
- [ ] 数据验证引擎
- [ ] 合规性验证
- [ ] 错误处理和重试机制
- [ ] 缓存机制(可选)
- [ ] 日志记录
- [ ] 单元测试
---
## 🔗 相关文档
- [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md) - **详细设计文档**
- [接口清单表格](../Python接口清单表格.md)
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)

150
docs/README.md Normal file
View File

@ -0,0 +1,150 @@
# API 接口开发文档索引
## 📋 文档说明
本目录包含数据资源盘点系统中所有需要 Python 开发的接口的详细开发说明文档。每个接口都有独立的文档,包含完整的开发指导信息。
---
## 📚 接口文档列表
### 模块一:数据盘点智能分析服务
| 序号 | 接口名称 | 文档路径 | 是否大模型 | 优先级 | 工作量 |
|------|---------|---------|-----------|--------|--------|
| 1.1 | [文档解析接口](./01-parse-document.md) | `01-parse-document.md` | ❌ | 中 | 5 人日 |
| 1.2 | [SQL 结果解析接口](./02-parse-sql-result.md) | `02-parse-sql-result.md` | ❌ | 低 | 2 人日 |
| 1.3 | [业务表解析接口](./03-parse-business-tables.md) | `03-parse-business-tables.md` | ❌ | 中 | 3 人日 |
| 1.4 | [数据资产智能识别接口 ⭐⭐⭐](./04-ai-analyze.md) | `04-ai-analyze.md` | ✅ **是** | **高** | **15 人日** |
### 模块二:场景挖掘智能推荐服务
| 序号 | 接口名称 | 文档路径 | 是否大模型 | 优先级 | 工作量 |
|------|---------|---------|-----------|--------|--------|
| 2.1 | [潜在场景推荐接口 ⭐⭐](./05-scenario-recommendation.md) | `05-scenario-recommendation.md` | ✅ **是** | **高** | **12 人日** |
| 2.2 | [存量场景优化建议接口](./06-scenario-optimization.md) | `06-scenario-optimization.md` | ✅ **是** | 中 | 8 人日 |
### 模块三:数据资产盘点报告生成服务
| 序号 | 接口名称 | 文档路径 | 是否大模型 | 优先级 | 工作量 |
|------|---------|---------|-----------|--------|--------|
| 3.1 | [完整报告生成接口 ⭐⭐⭐](./07-generate-report.md) | `07-generate-report.md` | ✅ **是** | **高** | **20 人日** |
---
## 📊 文档内容结构
每个接口文档包含以下内容:
1. **接口基本信息**
- 接口路径、请求方法
- 功能描述、涉及页面
- 工作量评估、优先级
2. **功能描述**
- 详细的功能说明
- 适用场景
3. **技术实现方案**
- 技术栈推荐
- 实现思路
- 架构设计
4. **请求/响应格式**
- 详细的请求参数说明
- 响应格式定义
- 字段说明
5. **代码实现示例**
- FastAPI 实现代码
- 关键逻辑示例
- 最佳实践
6. **测试用例**
- 单元测试示例
- 集成测试建议
7. **注意事项**
- 常见问题和解决方案
- 性能优化建议
- 安全注意事项
8. **开发检查清单**
- 开发任务清单
- 验收标准
---
## 🎯 快速导航
### 按优先级排序
**高优先级(核心功能)**:
1. [数据资产智能识别接口](./04-ai-analyze.md) - 15 人日 ⭐⭐⭐
2. [完整报告生成接口](./07-generate-report.md) - 20 人日 ⭐⭐⭐
3. [潜在场景推荐接口](./05-scenario-recommendation.md) - 12 人日 ⭐⭐
**中优先级**:
4. [文档解析接口](./01-parse-document.md) - 5 人日
5. [业务表解析接口](./03-parse-business-tables.md) - 3 人日
6. [存量场景优化建议接口](./06-scenario-optimization.md) - 8 人日
**低优先级**:
7. [SQL 结果解析接口](./02-parse-sql-result.md) - 2 人日
### 按功能分类
**大模型接口4个**:
- [数据资产智能识别接口](./04-ai-analyze.md)
- [潜在场景推荐接口](./05-scenario-recommendation.md)
- [存量场景优化建议接口](./06-scenario-optimization.md)
- [完整报告生成接口](./07-generate-report.md)
**数据解析接口3个**:
- [文档解析接口](./01-parse-document.md)
- [SQL 结果解析接口](./02-parse-sql-result.md)
- [业务表解析接口](./03-parse-business-tables.md)
---
## 📈 开发建议
### 第一阶段MVP 版本)- 4 周
**推荐顺序**:
1. [数据资产智能识别接口](./04-ai-analyze.md) - 核心功能
2. [完整报告生成接口](./07-generate-report.md) - 核心功能(简化版)
3. [文档解析接口](./01-parse-document.md) - 基础功能
### 第二阶段(完善版本)- 3 周
**推荐顺序**:
1. [潜在场景推荐接口](./05-scenario-recommendation.md)
2. [存量场景优化建议接口](./06-scenario-optimization.md)
3. [业务表解析接口](./03-parse-business-tables.md)
4. [SQL 结果解析接口](./02-parse-sql-result.md)
---
## 🔗 相关文档
- [接口清单表格](../Python接口清单表格.md) - 接口总览和统计
- [Python技术人员工作量文档](../Python技术人员工作量文档.md) - 详细工作量评估
- [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md) - 报告生成接口详细设计
---
## 📞 联系方式
如有接口开发相关问题,请联系:
- **Python 技术负责人**: [待填写]
- **大模型技术顾问**: [待填写]
- **接口对接负责人**: [待填写]
---
## 📅 更新记录
| 版本 | 日期 | 更新内容 | 作者 |
|------|------|---------|------|
| v1.0 | 2025-01-XX | 初始版本,包含 7 个接口的完整开发说明文档 | AI Assistant |