From a6adce6ea5b749559e3b9597124cccd47efb029d Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 10 Jan 2026 11:44:31 +0800 Subject: [PATCH] first commit --- docs/01-parse-document.md | 596 +++++++++++++++++++++++ docs/02-parse-sql-result.md | 595 +++++++++++++++++++++++ docs/03-parse-business-tables.md | 547 +++++++++++++++++++++ docs/04-ai-analyze.md | 751 +++++++++++++++++++++++++++++ docs/05-scenario-recommendation.md | 145 ++++++ docs/06-scenario-optimization.md | 86 ++++ docs/07-generate-report.md | 150 ++++++ docs/README.md | 150 ++++++ 8 files changed, 3020 insertions(+) create mode 100644 docs/01-parse-document.md create mode 100644 docs/02-parse-sql-result.md create mode 100644 docs/03-parse-business-tables.md create mode 100644 docs/04-ai-analyze.md create mode 100644 docs/05-scenario-recommendation.md create mode 100644 docs/06-scenario-optimization.md create mode 100644 docs/07-generate-report.md create mode 100644 docs/README.md diff --git a/docs/01-parse-document.md b/docs/01-parse-document.md new file mode 100644 index 0000000..75cc644 --- /dev/null +++ b/docs/01-parse-document.md @@ -0,0 +1,596 @@ +# 接口开发说明 - 文档解析接口 + +## 📋 接口基本信息 + +- **接口路径**: `/api/v1/inventory/parse-document` +- **请求方法**: `POST` +- **接口功能**: 解析上传的数据字典文档(Excel/Word/PDF),提取表结构信息 +- **涉及页面**: `InventoryStep.vue` - 方案一(已有文档导入) +- **是否涉及大模型**: ❌ 否 +- **工作量评估**: 5 人日 +- **优先级**: 中 + +--- + +## 🎯 功能描述 + +该接口用于解析用户上传的数据字典文档,支持以下格式: +- **Excel**: `.xlsx`, `.xls` +- **Word**: `.doc`, `.docx` +- **PDF**: `.pdf`(可选) + +从文档中提取以下信息: +- 表名(英文) +- 字段名(英文) +- 字段类型 +- 字段注释/描述(中文) + +--- + +## 🔧 技术实现方案 + +### 技术栈 + +```python +# 核心依赖 +fastapi>=0.104.0 # Web 框架 +pydantic>=2.0.0 # 数据验证 +python-multipart>=0.0.6 # 文件上传支持 + +# 文档处理 +pandas>=2.0.0 # 数据处理 +openpyxl>=3.1.0 # Excel 处理 +python-docx>=1.1.0 # Word 处理 +pdfplumber>=0.10.0 # PDF 处理(可选) +``` + +### 实现思路 + +1. **文件上传**: 使用 FastAPI 的 `UploadFile` 接收文件 +2. **文件类型识别**: 根据文件扩展名或 MIME 类型识别文件格式 +3. **文档解析**: + - Excel: 使用 `pandas` 或 `openpyxl` 读取 + - Word: 使用 `python-docx` 解析表格和文本 + - PDF: 使用 `pdfplumber` 提取表格和文本 +4. **表结构提取**: 识别文档中的表结构信息,提取表名、字段名、类型、注释 +5. **数据验证**: 验证提取的数据格式是否正确 +6. **结果返回**: 返回标准化的表结构数据 + +--- + +## 📥 请求格式 + +### 请求方式 + +**Content-Type**: `multipart/form-data` 或 `application/json` + +### 方式一:文件上传(推荐) + +```http +POST /api/v1/inventory/parse-document +Content-Type: multipart/form-data + +file: [二进制文件] +project_id: string +file_type: excel | word | pdf (可选,自动识别) +``` + +### 方式二:文件路径(如果文件已上传到服务器) + +```json +{ + "file_path": "/path/to/document.xlsx", + "file_type": "excel | word | pdf", + "project_id": "project_001" +} +``` + +### 请求参数说明 + +| 参数名 | 类型 | 必填 | 说明 | +|--------|------|------|------| +| `file` | File | 是 | 上传的文件(方式一) | +| `file_path` | string | 是 | 文件路径(方式二) | +| `file_type` | string | 否 | 文件类型:`excel` / `word` / `pdf`,如果不传则根据文件扩展名自动识别 | +| `project_id` | string | 是 | 项目ID | + +--- + +## 📤 响应格式 + +### 成功响应 + +```json +{ + "success": true, + "code": 200, + "message": "文档解析成功", + "data": { + "tables": [ + { + "raw_name": "t_user_base_01", + "display_name": "用户基础信息表", + "description": "存储用户基本信息的表", + "fields": [ + { + "raw_name": "user_id", + "display_name": "用户ID", + "type": "varchar(64)", + "comment": "用户的唯一标识符", + "is_primary_key": true, + "is_nullable": false, + "default_value": null + }, + { + "raw_name": "user_name", + "display_name": "用户名", + "type": "varchar(50)", + "comment": "用户登录名", + "is_primary_key": false, + "is_nullable": true, + "default_value": null + } + ], + "field_count": 2 + } + ], + "total_tables": 10, + "total_fields": 245, + "parse_time": 1.23, + "file_info": { + "file_name": "数据字典.xlsx", + "file_size": 1024000, + "file_type": "excel" + } + } +} +``` + +### 失败响应 + +```json +{ + "success": false, + "code": 400, + "message": "文件格式不支持", + "error": { + "error_code": "UNSUPPORTED_FILE_TYPE", + "error_detail": "仅支持 Excel (.xlsx, .xls), Word (.doc, .docx), PDF (.pdf) 格式" + } +} +``` + +### 响应字段说明 + +| 字段名 | 类型 | 说明 | +|--------|------|------| +| `success` | boolean | 请求是否成功 | +| `code` | integer | HTTP 状态码 | +| `message` | string | 响应消息 | +| `data` | object | 响应数据 | +| `data.tables` | array | 解析出的表列表 | +| `data.tables[].raw_name` | string | 表名(英文/原始名称) | +| `data.tables[].display_name` | string | 表显示名称(中文,如果文档中有) | +| `data.tables[].description` | string | 表描述 | +| `data.tables[].fields` | array | 字段列表 | +| `data.tables[].fields[].raw_name` | string | 字段名(英文) | +| `data.tables[].fields[].display_name` | string | 字段显示名称(中文) | +| `data.tables[].fields[].type` | string | 字段类型 | +| `data.tables[].fields[].comment` | string | 字段注释 | +| `data.total_tables` | integer | 总表数 | +| `data.total_fields` | integer | 总字段数 | +| `data.parse_time` | float | 解析耗时(秒) | + +--- + +## 💻 代码实现示例 + +### FastAPI 实现 + +```python +from fastapi import FastAPI, UploadFile, File, Form, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from typing import Optional, List +import pandas as pd +from docx import Document +import pdfplumber +import os +from pathlib import Path +import time + +app = FastAPI() + +class FieldInfo(BaseModel): + raw_name: str + display_name: Optional[str] = None + type: str + comment: Optional[str] = None + is_primary_key: bool = False + is_nullable: bool = True + default_value: Optional[str] = None + +class TableInfo(BaseModel): + raw_name: str + display_name: Optional[str] = None + description: Optional[str] = None + fields: List[FieldInfo] + field_count: int + +class ParseDocumentResponse(BaseModel): + success: bool + code: int + message: str + data: Optional[dict] = None + error: Optional[dict] = None + +def parse_excel(file_path: str) -> List[TableInfo]: + """解析 Excel 文件""" + tables = [] + try: + # 读取 Excel 文件 + df = pd.read_excel(file_path, sheet_name=None) # 读取所有 sheet + + for sheet_name, df_sheet in df.items(): + # 识别表结构(根据 Excel 格式约定) + # 假设第一列是字段名,第二列是类型,第三列是注释 + fields = [] + for _, row in df_sheet.iterrows(): + if pd.notna(row.iloc[0]): # 字段名不为空 + field = FieldInfo( + raw_name=str(row.iloc[0]).strip(), + display_name=str(row.iloc[2]).strip() if len(row) > 2 and pd.notna(row.iloc[2]) else None, + type=str(row.iloc[1]).strip() if len(row) > 1 and pd.notna(row.iloc[1]) else "varchar(255)", + comment=str(row.iloc[2]).strip() if len(row) > 2 and pd.notna(row.iloc[2]) else None + ) + fields.append(field) + + if fields: + table = TableInfo( + raw_name=sheet_name, + display_name=sheet_name, + fields=fields, + field_count=len(fields) + ) + tables.append(table) + + except Exception as e: + raise Exception(f"Excel 解析失败: {str(e)}") + + return tables + +def parse_word(file_path: str) -> List[TableInfo]: + """解析 Word 文件""" + tables = [] + try: + doc = Document(file_path) + + # 遍历文档中的表格 + for table_idx, table in enumerate(doc.tables): + fields = [] + # 假设第一行是表头,后续行是字段信息 + # 约定:第一列字段名,第二列类型,第三列注释 + for row in table.rows[1:]: # 跳过表头 + if len(row.cells) >= 3: + field_name = row.cells[0].text.strip() + if field_name: # 字段名不为空 + field = FieldInfo( + raw_name=field_name, + display_name=row.cells[2].text.strip() if len(row.cells) > 2 and row.cells[2].text.strip() else None, + type=row.cells[1].text.strip() if len(row.cells) > 1 and row.cells[1].text.strip() else "varchar(255)", + comment=row.cells[2].text.strip() if len(row.cells) > 2 and row.cells[2].text.strip() else None + ) + fields.append(field) + + if fields: + table_info = TableInfo( + raw_name=f"table_{table_idx + 1}", + display_name=f"表{table_idx + 1}", + fields=fields, + field_count=len(fields) + ) + tables.append(table_info) + + except Exception as e: + raise Exception(f"Word 解析失败: {str(e)}") + + return tables + +def parse_pdf(file_path: str) -> List[TableInfo]: + """解析 PDF 文件""" + tables = [] + try: + with pdfplumber.open(file_path) as pdf: + for page_idx, page in enumerate(pdf.pages): + # 提取表格 + page_tables = page.extract_tables() + for table_idx, table in enumerate(page_tables): + if table and len(table) > 1: + fields = [] + # 假设第一行是表头,后续行是字段信息 + for row in table[1:]: + if len(row) >= 3 and row[0]: + field = FieldInfo( + raw_name=str(row[0]).strip(), + display_name=str(row[2]).strip() if len(row) > 2 and row[2] else None, + type=str(row[1]).strip() if len(row) > 1 and row[1] else "varchar(255)", + comment=str(row[2]).strip() if len(row) > 2 and row[2] else None + ) + fields.append(field) + + if fields: + table_info = TableInfo( + raw_name=f"table_{page_idx + 1}_{table_idx + 1}", + display_name=f"表{page_idx + 1}-{table_idx + 1}", + fields=fields, + field_count=len(fields) + ) + tables.append(table_info) + + except Exception as e: + raise Exception(f"PDF 解析失败: {str(e)}") + + return tables + +def detect_file_type(file_name: str) -> str: + """根据文件扩展名检测文件类型""" + ext = Path(file_name).suffix.lower() + if ext in ['.xlsx', '.xls']: + return 'excel' + elif ext in ['.docx', '.doc']: + return 'word' + elif ext == '.pdf': + return 'pdf' + else: + raise ValueError(f"不支持的文件类型: {ext}") + +@app.post("/api/v1/inventory/parse-document", response_model=ParseDocumentResponse) +async def parse_document( + file: Optional[UploadFile] = File(None), + file_path: Optional[str] = Form(None), + file_type: Optional[str] = Form(None), + project_id: str = Form(...) +): + """ + 文档解析接口 + + 支持解析 Excel、Word、PDF 格式的数据字典文档,提取表结构信息 + """ + start_time = time.time() + + try: + # 验证参数 + if not file and not file_path: + raise HTTPException( + status_code=400, + detail="必须提供文件或文件路径" + ) + + # 处理文件上传 + if file: + # 保存上传的文件到临时目录 + upload_dir = Path("/tmp/uploads") + upload_dir.mkdir(exist_ok=True) + file_path = str(upload_dir / file.filename) + + with open(file_path, "wb") as f: + content = await file.read() + f.write(content) + + file_name = file.filename + file_size = len(content) + + # 自动检测文件类型 + if not file_type: + file_type = detect_file_type(file_name) + else: + # 使用提供的文件路径 + if not os.path.exists(file_path): + raise HTTPException( + status_code=404, + detail=f"文件不存在: {file_path}" + ) + file_name = Path(file_path).name + file_size = os.path.getsize(file_path) + + # 自动检测文件类型 + if not file_type: + file_type = detect_file_type(file_name) + + # 根据文件类型选择解析方法 + if file_type == 'excel': + tables = parse_excel(file_path) + elif file_type == 'word': + tables = parse_word(file_path) + elif file_type == 'pdf': + tables = parse_pdf(file_path) + else: + raise HTTPException( + status_code=400, + detail=f"不支持的文件类型: {file_type}" + ) + + # 计算统计信息 + total_fields = sum(table.field_count for table in tables) + parse_time = time.time() - start_time + + # 构建响应数据 + response_data = { + "tables": [table.dict() for table in tables], + "total_tables": len(tables), + "total_fields": total_fields, + "parse_time": round(parse_time, 2), + "file_info": { + "file_name": file_name, + "file_size": file_size, + "file_type": file_type + } + } + + return ParseDocumentResponse( + success=True, + code=200, + message="文档解析成功", + data=response_data + ) + + except HTTPException: + raise + except Exception as e: + return ParseDocumentResponse( + success=False, + code=500, + message="文档解析失败", + error={ + "error_code": "PARSE_ERROR", + "error_detail": str(e) + } + ) +``` + +--- + +## 🧪 测试用例 + +### 单元测试示例 + +```python +import pytest +from fastapi.testclient import TestClient +from pathlib import Path +import tempfile + +client = TestClient(app) + +def test_parse_excel_document(): + """测试解析 Excel 文档""" + # 创建测试 Excel 文件 + test_data = { + '字段名': ['user_id', 'user_name', 'email'], + '类型': ['varchar(64)', 'varchar(50)', 'varchar(100)'], + '注释': ['用户ID', '用户名', '邮箱'] + } + df = pd.DataFrame(test_data) + + with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp: + df.to_excel(tmp.name, index=False) + + with open(tmp.name, 'rb') as f: + response = client.post( + "/api/v1/inventory/parse-document", + files={"file": ("test.xlsx", f, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")}, + data={"project_id": "test_project"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert len(data["data"]["tables"]) > 0 + assert data["data"]["total_tables"] > 0 + +def test_unsupported_file_type(): + """测试不支持的文件类型""" + with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as tmp: + tmp.write(b"test content") + tmp.flush() + + with open(tmp.name, 'rb') as f: + response = client.post( + "/api/v1/inventory/parse-document", + files={"file": ("test.txt", f, "text/plain")}, + data={"project_id": "test_project"} + ) + + assert response.status_code == 400 + data = response.json() + assert data["success"] is False +``` + +--- + +## ⚠️ 注意事项 + +### 1. 文件大小限制 + +- **Excel**: 建议限制为 50MB +- **Word**: 建议限制为 50MB +- **PDF**: 建议限制为 50MB + +在 FastAPI 中设置: + +```python +from fastapi import File, UploadFile +from fastapi.exceptions import RequestEntityTooLarge + +@app.exception_handler(RequestEntityTooLarge) +async def handle_upload_limit(exc): + return JSONResponse( + status_code=413, + content={ + "success": False, + "message": "文件大小超过限制(最大 50MB)" + } + ) +``` + +### 2. 文件格式约定 + +由于不同用户的数据字典文档格式可能不同,建议: +- **Excel**: 约定格式为第一列字段名,第二列类型,第三列注释 +- **Word**: 约定使用表格格式,第一行表头,后续行字段信息 +- **PDF**: 约定使用表格格式 + +如果格式不统一,需要增加更智能的识别逻辑。 + +### 3. 错误处理 + +- 文件读取失败:返回 400 错误 +- 文件格式错误:返回 400 错误,提示正确的格式 +- 解析失败:返回 500 错误,记录详细错误日志 +- 文件过大:返回 413 错误 + +### 4. 性能优化 + +- 对于大文件,考虑使用异步处理 +- 使用临时文件,处理完成后删除 +- 考虑添加缓存机制(相同文件解析结果缓存) + +### 5. 安全性 + +- 文件上传路径验证,防止路径遍历攻击 +- 文件类型验证,防止恶意文件上传 +- 文件大小限制,防止 DoS 攻击 +- 临时文件及时清理 + +--- + +## 📝 开发检查清单 + +- [ ] 支持 Excel (.xlsx, .xls) 格式解析 +- [ ] 支持 Word (.doc, .docx) 格式解析 +- [ ] 支持 PDF (.pdf) 格式解析(可选) +- [ ] 文件类型自动识别 +- [ ] 文件大小限制(50MB) +- [ ] 错误处理和异常捕获 +- [ ] 单元测试覆盖 +- [ ] 日志记录 +- [ ] 临时文件清理 +- [ ] API 文档生成(Swagger) + +--- + +## 🔗 相关文档 + +- [接口清单表格](../Python接口清单表格.md) +- [Python技术人员工作量文档](../Python技术人员工作量文档.md) +- [FastAPI 官方文档](https://fastapi.tiangolo.com/) +- [pandas 文档](https://pandas.pydata.org/docs/) +- [python-docx 文档](https://python-docx.readthedocs.io/) + +--- + +## 📞 联系方式 + +如有开发问题,请联系: +- **接口负责人**: [待填写] +- **技术顾问**: [待填写] diff --git a/docs/02-parse-sql-result.md b/docs/02-parse-sql-result.md new file mode 100644 index 0000000..62d8aa2 --- /dev/null +++ b/docs/02-parse-sql-result.md @@ -0,0 +1,595 @@ +# 接口开发说明 - SQL 结果解析接口 + +## 📋 接口基本信息 + +- **接口路径**: `/api/v1/inventory/parse-sql-result` +- **请求方法**: `POST` +- **接口功能**: 解析 IT 执行 SQL 脚本后导出的 Excel/CSV 结果文件,提取表名、字段名、字段类型等信息 +- **涉及页面**: `InventoryStep.vue` - 方案二(IT 脚本提取) +- **是否涉及大模型**: ❌ 否 +- **工作量评估**: 2 人日 +- **优先级**: 低 + +--- + +## 🎯 功能描述 + +该接口用于解析 IT 部门执行标准 SQL 脚本后导出的结果文件。SQL 脚本通常查询 `information_schema.COLUMNS` 表,导出的结果文件包含以下列: +- 表英文名 (TABLE_NAME) +- 表中文名/描述 (TABLE_COMMENT) +- 字段英文名 (COLUMN_NAME) +- 字段中文名 (COLUMN_COMMENT) +- 字段类型 (COLUMN_TYPE) + +支持的文件格式: +- **Excel**: `.xlsx`, `.xls` +- **CSV**: `.csv` + +--- + +## 🔧 技术实现方案 + +### 技术栈 + +```python +# 核心依赖 +fastapi>=0.104.0 # Web 框架 +pydantic>=2.0.0 # 数据验证 + +# 数据处理 +pandas>=2.0.0 # CSV/Excel 解析 +openpyxl>=3.1.0 # Excel 处理(如果使用 openpyxl) +``` + +### 实现思路 + +1. **文件上传/路径**: 接收 Excel 或 CSV 文件 +2. **文件解析**: 使用 `pandas` 读取文件 +3. **数据清洗**: 清理空行、空值,标准化数据格式 +4. **表结构提取**: 根据列名提取表名、字段名、类型等信息 +5. **数据验证**: 验证数据完整性和格式正确性 +6. **结果返回**: 返回标准化的表结构数据 + +--- + +## 📥 请求格式 + +### 请求方式 + +**Content-Type**: `multipart/form-data` 或 `application/json` + +### 请求参数 + +```http +POST /api/v1/inventory/parse-sql-result +Content-Type: multipart/form-data + +file: [二进制文件] +project_id: string +file_type: excel | csv (可选,自动识别) +``` + +或 + +```json +{ + "file_path": "/path/to/result.xlsx", + "file_type": "excel | csv", + "project_id": "project_001" +} +``` + +### 请求参数说明 + +| 参数名 | 类型 | 必填 | 说明 | +|--------|------|------|------| +| `file` | File | 是 | 上传的文件(方式一) | +| `file_path` | string | 是 | 文件路径(方式二) | +| `file_type` | string | 否 | 文件类型:`excel` / `csv`,如果不传则根据文件扩展名自动识别 | +| `project_id` | string | 是 | 项目ID | + +### 标准 SQL 脚本示例 + +IT 部门需要执行的 SQL 脚本: + +```sql +SELECT + TABLE_NAME AS '表英文名', + TABLE_COMMENT AS '表中文名/描述', + COLUMN_NAME AS '字段英文名', + COLUMN_COMMENT AS '字段中文名', + COLUMN_TYPE AS '字段类型' +FROM information_schema.COLUMNS +WHERE TABLE_SCHEMA = '您的数据库名'; +``` + +--- + +## 📤 响应格式 + +### 成功响应 + +```json +{ + "success": true, + "code": 200, + "message": "SQL 结果解析成功", + "data": { + "tables": [ + { + "raw_name": "t_user_base_01", + "display_name": "用户基础信息表", + "description": "存储用户基本信息的表", + "fields": [ + { + "raw_name": "user_id", + "display_name": "用户ID", + "type": "varchar(64)", + "comment": "用户的唯一标识符" + } + ], + "field_count": 10 + } + ], + "total_tables": 5, + "total_fields": 245, + "parse_time": 0.45, + "file_info": { + "file_name": "schema_export.xlsx", + "file_size": 512000, + "file_type": "excel" + } + } +} +``` + +### 失败响应 + +```json +{ + "success": false, + "code": 400, + "message": "文件格式错误或缺少必要列", + "error": { + "error_code": "INVALID_FILE_FORMAT", + "error_detail": "文件缺少必要列:表英文名、字段英文名、字段类型" + } +} +``` + +--- + +## 💻 代码实现示例 + +### FastAPI 实现 + +```python +from fastapi import FastAPI, UploadFile, File, Form, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from typing import Optional, List, Dict +import pandas as pd +import os +from pathlib import Path +import time + +app = FastAPI() + +class FieldInfo(BaseModel): + raw_name: str + display_name: Optional[str] = None + type: str + comment: Optional[str] = None + +class TableInfo(BaseModel): + raw_name: str + display_name: Optional[str] = None + description: Optional[str] = None + fields: List[FieldInfo] + field_count: int + +def parse_sql_result_excel(file_path: str) -> List[TableInfo]: + """解析 Excel 格式的 SQL 结果""" + try: + # 读取 Excel 文件 + df = pd.read_excel(file_path) + + # 标准化列名(支持多种可能的列名) + column_mapping = { + '表英文名': 'table_name', + 'TABLE_NAME': 'table_name', + 'table_name': 'table_name', + '表中文名/描述': 'table_comment', + 'TABLE_COMMENT': 'table_comment', + 'table_comment': 'table_comment', + '字段英文名': 'column_name', + 'COLUMN_NAME': 'column_name', + 'column_name': 'column_name', + '字段中文名': 'column_comment', + 'COLUMN_COMMENT': 'column_comment', + 'column_comment': 'column_comment', + '字段类型': 'column_type', + 'COLUMN_TYPE': 'column_type', + 'column_type': 'column_type' + } + + # 重命名列 + df.columns = df.columns.str.strip() + df = df.rename(columns=column_mapping) + + # 验证必要列是否存在 + required_columns = ['table_name', 'column_name', 'column_type'] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + raise ValueError(f"缺少必要列: {', '.join(missing_columns)}") + + # 清理数据(去除空值) + df = df.dropna(subset=['table_name', 'column_name']) + + # 按表名分组 + tables_dict: Dict[str, List[FieldInfo]] = {} + for _, row in df.iterrows(): + table_name = str(row['table_name']).strip() + column_name = str(row['column_name']).strip() + + if not table_name or not column_name: + continue + + # 获取字段信息 + field = FieldInfo( + raw_name=column_name, + display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None, + type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)', + comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None + ) + + # 按表分组 + if table_name not in tables_dict: + tables_dict[table_name] = [] + tables_dict[table_name].append(field) + + # 构建表信息 + tables = [] + for table_name, fields in tables_dict.items(): + # 获取表的描述信息(取第一个字段的表描述,或使用表名) + table_comment = None + if 'table_comment' in df.columns: + table_comment_row = df[df['table_name'] == table_name].iloc[0] + if pd.notna(table_comment_row.get('table_comment')): + table_comment = str(table_comment_row['table_comment']).strip() + + table = TableInfo( + raw_name=table_name, + display_name=table_comment if table_comment else table_name, + description=table_comment, + fields=fields, + field_count=len(fields) + ) + tables.append(table) + + return tables + + except Exception as e: + raise Exception(f"Excel 解析失败: {str(e)}") + +def parse_sql_result_csv(file_path: str) -> List[TableInfo]: + """解析 CSV 格式的 SQL 结果""" + try: + # 读取 CSV 文件(尝试不同的编码) + encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1'] + df = None + + for encoding in encodings: + try: + df = pd.read_csv(file_path, encoding=encoding) + break + except UnicodeDecodeError: + continue + + if df is None: + raise ValueError("无法解析 CSV 文件,请检查文件编码") + + # 后续处理与 Excel 相同 + return parse_sql_result_excel_dataframe(df) + + except Exception as e: + raise Exception(f"CSV 解析失败: {str(e)}") + +def parse_sql_result_excel_dataframe(df: pd.DataFrame) -> List[TableInfo]: + """从 DataFrame 解析 SQL 结果(共用逻辑)""" + # 标准化列名 + column_mapping = { + '表英文名': 'table_name', + 'TABLE_NAME': 'table_name', + 'table_name': 'table_name', + '表中文名/描述': 'table_comment', + 'TABLE_COMMENT': 'table_comment', + 'table_comment': 'table_comment', + '字段英文名': 'column_name', + 'COLUMN_NAME': 'column_name', + 'column_name': 'column_name', + '字段中文名': 'column_comment', + 'COLUMN_COMMENT': 'column_comment', + 'column_comment': 'column_comment', + '字段类型': 'column_type', + 'COLUMN_TYPE': 'column_type', + 'column_type': 'column_type' + } + + df.columns = df.columns.str.strip() + df = df.rename(columns=column_mapping) + + # 验证必要列 + required_columns = ['table_name', 'column_name', 'column_type'] + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + raise ValueError(f"缺少必要列: {', '.join(missing_columns)}") + + # 清理数据 + df = df.dropna(subset=['table_name', 'column_name']) + + # 按表分组 + tables_dict = {} + for _, row in df.iterrows(): + table_name = str(row['table_name']).strip() + column_name = str(row['column_name']).strip() + + if not table_name or not column_name: + continue + + field = FieldInfo( + raw_name=column_name, + display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None, + type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)', + comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None + ) + + if table_name not in tables_dict: + tables_dict[table_name] = [] + tables_dict[table_name].append(field) + + # 构建表信息 + tables = [] + for table_name, fields in tables_dict.items(): + table_comment = None + if 'table_comment' in df.columns: + table_comment_row = df[df['table_name'] == table_name].iloc[0] + if pd.notna(table_comment_row.get('table_comment')): + table_comment = str(table_comment_row['table_comment']).strip() + + table = TableInfo( + raw_name=table_name, + display_name=table_comment if table_comment else table_name, + description=table_comment, + fields=fields, + field_count=len(fields) + ) + tables.append(table) + + return tables + +@app.post("/api/v1/inventory/parse-sql-result") +async def parse_sql_result( + file: Optional[UploadFile] = File(None), + file_path: Optional[str] = Form(None), + file_type: Optional[str] = Form(None), + project_id: str = Form(...) +): + """ + SQL 结果解析接口 + + 解析 IT 执行 SQL 脚本后导出的 Excel/CSV 结果文件 + """ + start_time = time.time() + + try: + # 验证参数 + if not file and not file_path: + raise HTTPException( + status_code=400, + detail="必须提供文件或文件路径" + ) + + # 处理文件上传 + if file: + upload_dir = Path("/tmp/uploads") + upload_dir.mkdir(exist_ok=True) + file_path = str(upload_dir / file.filename) + + with open(file_path, "wb") as f: + content = await file.read() + f.write(content) + + file_name = file.filename + file_size = len(content) + + if not file_type: + ext = Path(file_name).suffix.lower() + if ext in ['.xlsx', '.xls']: + file_type = 'excel' + elif ext == '.csv': + file_type = 'csv' + else: + raise HTTPException( + status_code=400, + detail=f"不支持的文件类型: {ext}" + ) + else: + if not os.path.exists(file_path): + raise HTTPException( + status_code=404, + detail=f"文件不存在: {file_path}" + ) + file_name = Path(file_path).name + file_size = os.path.getsize(file_path) + + if not file_type: + ext = Path(file_name).suffix.lower() + if ext in ['.xlsx', '.xls']: + file_type = 'excel' + elif ext == '.csv': + file_type = 'csv' + else: + raise HTTPException( + status_code=400, + detail=f"不支持的文件类型: {ext}" + ) + + # 根据文件类型解析 + if file_type == 'excel': + tables = parse_sql_result_excel(file_path) + elif file_type == 'csv': + tables = parse_sql_result_csv(file_path) + else: + raise HTTPException( + status_code=400, + detail=f"不支持的文件类型: {file_type}" + ) + + # 计算统计信息 + total_fields = sum(table.field_count for table in tables) + parse_time = time.time() - start_time + + # 构建响应 + response_data = { + "tables": [table.dict() for table in tables], + "total_tables": len(tables), + "total_fields": total_fields, + "parse_time": round(parse_time, 2), + "file_info": { + "file_name": file_name, + "file_size": file_size, + "file_type": file_type + } + } + + return { + "success": True, + "code": 200, + "message": "SQL 结果解析成功", + "data": response_data + } + + except HTTPException: + raise + except Exception as e: + return JSONResponse( + status_code=500, + content={ + "success": False, + "code": 500, + "message": "SQL 结果解析失败", + "error": { + "error_code": "PARSE_ERROR", + "error_detail": str(e) + } + } + ) +``` + +--- + +## 🧪 测试用例 + +### 单元测试示例 + +```python +import pytest +from fastapi.testclient import TestClient +import pandas as pd +import tempfile + +client = TestClient(app) + +def test_parse_sql_result_excel(): + """测试解析 Excel 格式的 SQL 结果""" + # 创建测试数据 + test_data = { + '表英文名': ['t_user', 't_user', 't_order'], + '表中文名/描述': ['用户表', '用户表', '订单表'], + '字段英文名': ['user_id', 'user_name', 'order_id'], + '字段中文名': ['用户ID', '用户名', '订单ID'], + '字段类型': ['varchar(64)', 'varchar(50)', 'bigint'] + } + df = pd.DataFrame(test_data) + + with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp: + df.to_excel(tmp.name, index=False) + + with open(tmp.name, 'rb') as f: + response = client.post( + "/api/v1/inventory/parse-sql-result", + files={"file": ("test.xlsx", f, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")}, + data={"project_id": "test_project"} + ) + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["data"]["total_tables"] == 2 # t_user 和 t_order + assert data["data"]["total_fields"] == 3 + +def test_invalid_file_format(): + """测试无效文件格式""" + response = client.post( + "/api/v1/inventory/parse-sql-result", + files={"file": ("test.txt", b"invalid content", "text/plain")}, + data={"project_id": "test_project"} + ) + + assert response.status_code == 400 + data = response.json() + assert data["success"] is False +``` + +--- + +## ⚠️ 注意事项 + +### 1. 列名映射 + +由于不同数据库导出的列名可能不同,需要支持多种列名映射: +- `表英文名` / `TABLE_NAME` / `table_name` +- `字段英文名` / `COLUMN_NAME` / `column_name` +- `字段类型` / `COLUMN_TYPE` / `column_type` + +### 2. CSV 编码问题 + +CSV 文件可能存在编码问题(GBK、UTF-8 等),需要尝试多种编码: +- UTF-8(优先) +- GBK +- GB2312 +- Latin-1 + +### 3. 数据清洗 + +- 去除空行和空值 +- 标准化表名和字段名(去除前后空格) +- 处理特殊字符 + +### 4. 错误处理 + +- 文件格式验证 +- 必要列验证 +- 数据完整性验证 +- 异常捕获和日志记录 + +--- + +## 📝 开发检查清单 + +- [ ] 支持 Excel (.xlsx, .xls) 格式解析 +- [ ] 支持 CSV (.csv) 格式解析 +- [ ] 支持多种列名映射 +- [ ] CSV 编码自动检测 +- [ ] 数据清洗和验证 +- [ ] 错误处理和异常捕获 +- [ ] 单元测试覆盖 +- [ ] 日志记录 + +--- + +## 🔗 相关文档 + +- [接口清单表格](../Python接口清单表格.md) +- [Python技术人员工作量文档](../Python技术人员工作量文档.md) diff --git a/docs/03-parse-business-tables.md b/docs/03-parse-business-tables.md new file mode 100644 index 0000000..622e05e --- /dev/null +++ b/docs/03-parse-business-tables.md @@ -0,0 +1,547 @@ +# 接口开发说明 - 业务表解析接口 + +## 📋 接口基本信息 + +- **接口路径**: `/api/v1/inventory/parse-business-tables` +- **请求方法**: `POST` +- **接口功能**: 解析业务人员手动导出的核心业务表(Excel/CSV),支持批量文件解析和表结构识别 +- **涉及页面**: `InventoryStep.vue` - 方案三(业务关键表导入) +- **是否涉及大模型**: ❌ 否 +- **工作量评估**: 3 人日 +- **优先级**: 中 + +--- + +## 🎯 功能描述 + +该接口用于解析业务人员手动导出的核心业务表文件,支持: +- **批量文件上传**: 一次可上传多个文件 +- **格式支持**: Excel (.xlsx, .xls)、CSV (.csv) +- **表结构识别**: 自动识别 Excel 中的表结构(通过 Sheet 名称或文件名) +- **进度反馈**: 支持批量处理时的进度反馈 + +适用场景: +- SaaS 系统(如 Salesforce、金蝶、有赞)无法直接连接数据库 +- 业务人员手动导出核心业务表 +- 需要批量处理多个文件 + +--- + +## 🔧 技术实现方案 + +### 技术栈 + +```python +# 核心依赖 +fastapi>=0.104.0 # Web 框架 +pydantic>=2.0.0 # 数据验证 +celery>=5.3.0 # 异步任务(可选) + +# 数据处理 +pandas>=2.0.0 # 批量文件处理 +openpyxl>=3.1.0 # Excel 处理 +``` + +### 实现思路 + +1. **批量文件上传**: 接收多个文件 +2. **文件解析**: 使用 `pandas` 批量读取文件 +3. **表结构识别**: 根据文件名或 Sheet 名称识别表名 +4. **字段识别**: 从 Excel/CSV 的表头识别字段名和类型 +5. **进度反馈**: 使用异步任务或进度回调 +6. **结果汇总**: 汇总所有文件的解析结果 + +--- + +## 📥 请求格式 + +### 请求方式 + +**Content-Type**: `multipart/form-data` + +### 请求参数 + +```http +POST /api/v1/inventory/parse-business-tables +Content-Type: multipart/form-data + +files: [文件1, 文件2, ...] # 多个文件 +project_id: string +``` + +或 + +```json +{ + "file_paths": ["/path/to/file1.xlsx", "/path/to/file2.csv", ...], + "project_id": "project_001" +} +``` + +### 请求参数说明 + +| 参数名 | 类型 | 必填 | 说明 | +|--------|------|------|------| +| `files` | File[] | 是 | 上传的文件列表(方式一,支持多个) | +| `file_paths` | string[] | 是 | 文件路径列表(方式二) | +| `project_id` | string | 是 | 项目ID | + +--- + +## 📤 响应格式 + +### 成功响应 + +```json +{ + "success": true, + "code": 200, + "message": "业务表解析成功", + "data": { + "tables": [ + { + "raw_name": "orders", + "display_name": "订单流水明细表", + "description": "从文件 orders.xlsx 解析", + "source_file": "orders.xlsx", + "fields": [ + { + "raw_name": "order_id", + "display_name": "订单ID", + "type": "string", + "comment": null, + "inferred_type": "varchar(64)" + } + ], + "field_count": 10, + "row_count": 10000 + } + ], + "total_tables": 5, + "total_fields": 150, + "total_files": 5, + "success_files": 5, + "failed_files": [], + "parse_time": 3.45, + "file_info": { + "processed_files": [ + { + "file_name": "orders.xlsx", + "file_size": 1024000, + "tables_extracted": 1, + "status": "success" + } + ] + } + } +} +``` + +### 异步任务响应(如果使用异步处理) + +```json +{ + "success": true, + "code": 202, + "message": "任务已提交,正在处理中", + "data": { + "task_id": "task_123456", + "total_files": 5, + "status": "processing", + "estimated_time": 30 + } +} +``` + +--- + +## 💻 代码实现示例 + +### FastAPI 实现(同步版本) + +```python +from fastapi import FastAPI, UploadFile, File, Form, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from typing import Optional, List, Dict +import pandas as pd +import os +from pathlib import Path +import time +from collections import defaultdict + +app = FastAPI() + +class FieldInfo(BaseModel): + raw_name: str + display_name: Optional[str] = None + type: str + comment: Optional[str] = None + inferred_type: Optional[str] = None + +class TableInfo(BaseModel): + raw_name: str + display_name: Optional[str] = None + description: Optional[str] = None + source_file: str + fields: List[FieldInfo] + field_count: int + row_count: Optional[int] = None + +def infer_field_type(pd_type: str) -> str: + """根据 pandas 类型推断数据库字段类型""" + type_mapping = { + 'object': 'varchar(255)', + 'int64': 'bigint', + 'int32': 'int', + 'float64': 'double', + 'float32': 'float', + 'bool': 'tinyint', + 'datetime64[ns]': 'datetime', + 'date': 'date' + } + return type_mapping.get(str(pd_type), 'varchar(255)') + +def parse_excel_file(file_path: str, file_name: str) -> List[TableInfo]: + """解析单个 Excel 文件""" + tables = [] + + try: + # 读取所有 Sheet + excel_file = pd.ExcelFile(file_path) + + for sheet_name in excel_file.sheet_names: + df = pd.read_excel(file_path, sheet_name=sheet_name) + + # 跳过空 Sheet + if df.empty: + continue + + # 识别字段 + fields = [] + for col in df.columns: + # 推断字段类型 + col_type = str(df[col].dtype) + inferred_type = infer_field_type(col_type) + + field = FieldInfo( + raw_name=str(col).strip(), + display_name=str(col).strip(), + type=inferred_type, + comment=None, + inferred_type=inferred_type + ) + fields.append(field) + + if fields: + # 使用 Sheet 名称或文件名作为表名 + table_name = sheet_name.lower().replace(' ', '_').replace('-', '_') + if not table_name: + table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_') + + table = TableInfo( + raw_name=table_name, + display_name=sheet_name, + description=f"从文件 {file_name} 的 Sheet '{sheet_name}' 解析", + source_file=file_name, + fields=fields, + field_count=len(fields), + row_count=len(df) + ) + tables.append(table) + + except Exception as e: + raise Exception(f"解析文件 {file_name} 失败: {str(e)}") + + return tables + +def parse_csv_file(file_path: str, file_name: str) -> List[TableInfo]: + """解析单个 CSV 文件""" + tables = [] + + try: + # 尝试多种编码 + encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1'] + df = None + + for encoding in encodings: + try: + df = pd.read_csv(file_path, encoding=encoding) + break + except UnicodeDecodeError: + continue + + if df is None: + raise ValueError("无法解析 CSV 文件,请检查文件编码") + + if df.empty: + return tables + + # 识别字段 + fields = [] + for col in df.columns: + col_type = str(df[col].dtype) + inferred_type = infer_field_type(col_type) + + field = FieldInfo( + raw_name=str(col).strip(), + display_name=str(col).strip(), + type=inferred_type, + comment=None, + inferred_type=inferred_type + ) + fields.append(field) + + if fields: + # 使用文件名作为表名 + table_name = Path(file_name).stem.lower().replace(' ', '_').replace('-', '_') + + table = TableInfo( + raw_name=table_name, + display_name=Path(file_name).stem, + description=f"从文件 {file_name} 解析", + source_file=file_name, + fields=fields, + field_count=len(fields), + row_count=len(df) + ) + tables.append(table) + + except Exception as e: + raise Exception(f"解析文件 {file_name} 失败: {str(e)}") + + return tables + +@app.post("/api/v1/inventory/parse-business-tables") +async def parse_business_tables( + files: List[UploadFile] = File(...), + project_id: str = Form(...) +): + """ + 业务表解析接口 + + 批量解析业务人员导出的核心业务表文件 + """ + start_time = time.time() + upload_dir = Path("/tmp/uploads") + upload_dir.mkdir(exist_ok=True) + + all_tables = [] + processed_files = [] + failed_files = [] + + try: + # 处理每个文件 + for file in files: + file_name = file.filename + file_path = str(upload_dir / file_name) + + try: + # 保存文件 + with open(file_path, "wb") as f: + content = await file.read() + f.write(content) + + file_size = len(content) + + # 根据文件扩展名选择解析方法 + ext = Path(file_name).suffix.lower() + if ext in ['.xlsx', '.xls']: + tables = parse_excel_file(file_path, file_name) + elif ext == '.csv': + tables = parse_csv_file(file_path, file_name) + else: + failed_files.append({ + "file_name": file_name, + "error": f"不支持的文件类型: {ext}" + }) + continue + + all_tables.extend(tables) + processed_files.append({ + "file_name": file_name, + "file_size": file_size, + "tables_extracted": len(tables), + "status": "success" + }) + + # 清理临时文件 + os.remove(file_path) + + except Exception as e: + failed_files.append({ + "file_name": file_name, + "error": str(e) + }) + # 清理临时文件 + if os.path.exists(file_path): + os.remove(file_path) + + # 计算统计信息 + total_fields = sum(table.field_count for table in all_tables) + parse_time = time.time() - start_time + + # 构建响应 + response_data = { + "tables": [table.dict() for table in all_tables], + "total_tables": len(all_tables), + "total_fields": total_fields, + "total_files": len(files), + "success_files": len(processed_files), + "failed_files": failed_files, + "parse_time": round(parse_time, 2), + "file_info": { + "processed_files": processed_files + } + } + + return { + "success": True, + "code": 200, + "message": f"成功解析 {len(processed_files)} 个文件,提取 {len(all_tables)} 个表", + "data": response_data + } + + except Exception as e: + return JSONResponse( + status_code=500, + content={ + "success": False, + "code": 500, + "message": "业务表解析失败", + "error": { + "error_code": "PARSE_ERROR", + "error_detail": str(e) + } + } + ) +``` + +### 异步版本(使用 Celery,可选) + +```python +from celery import Celery + +celery_app = Celery('tasks', broker='redis://localhost:6379') + +@celery_app.task +def parse_business_tables_async(file_paths: List[str], project_id: str): + """异步解析业务表""" + # 解析逻辑同上 + pass + +@app.post("/api/v1/inventory/parse-business-tables-async") +async def parse_business_tables_async_endpoint( + files: List[UploadFile] = File(...), + project_id: str = Form(...) +): + """异步业务表解析接口""" + # 保存文件 + file_paths = [] + for file in files: + file_path = f"/tmp/uploads/{file.filename}" + with open(file_path, "wb") as f: + content = await file.read() + f.write(content) + file_paths.append(file_path) + + # 提交异步任务 + task = parse_business_tables_async.delay(file_paths, project_id) + + return { + "success": True, + "code": 202, + "message": "任务已提交,正在处理中", + "data": { + "task_id": task.id, + "total_files": len(files), + "status": "processing", + "estimated_time": len(files) * 10 # 估算时间(秒) + } + } + +@app.get("/api/v1/inventory/parse-business-tables-status/{task_id}") +async def get_parse_status(task_id: str): + """查询解析任务状态""" + task = celery_app.AsyncResult(task_id) + + if task.ready(): + return { + "success": True, + "code": 200, + "data": { + "task_id": task_id, + "status": "completed", + "result": task.result + } + } + else: + return { + "success": True, + "code": 200, + "data": { + "task_id": task_id, + "status": "processing", + "progress": task.info.get('progress', 0) if task.info else 0 + } + } +``` + +--- + +## ⚠️ 注意事项 + +### 1. 批量处理性能 + +- 对于大量文件,建议使用异步处理 +- 设置合理的文件大小限制 +- 考虑并行处理以提高性能 + +### 2. 表名识别 + +由于是业务人员手动导出,表名识别可能不准确: +- 优先使用 Excel Sheet 名称 +- 其次使用文件名 +- 提供手动修正功能(可选) + +### 3. 字段类型推断 + +- 基于 pandas 类型推断,可能不够准确 +- 后续可通过 AI 识别接口进一步优化 +- 记录推断类型,便于后续验证 + +### 4. 错误处理 + +- 单个文件失败不应影响其他文件处理 +- 记录详细的错误信息 +- 提供失败文件列表 + +### 5. 资源管理 + +- 及时清理临时文件 +- 控制并发文件数量 +- 限制单个文件大小 + +--- + +## 📝 开发检查清单 + +- [ ] 支持批量文件上传 +- [ ] 支持 Excel (.xlsx, .xls) 格式 +- [ ] 支持 CSV (.csv) 格式 +- [ ] Excel 多 Sheet 支持 +- [ ] CSV 编码自动检测 +- [ ] 字段类型推断 +- [ ] 进度反馈(异步版本) +- [ ] 错误处理(单个文件失败不影响其他) +- [ ] 临时文件清理 +- [ ] 单元测试覆盖 + +--- + +## 🔗 相关文档 + +- [接口清单表格](../Python接口清单表格.md) +- [接口 1.1 - 文档解析接口](./01-parse-document.md) +- [接口 1.2 - SQL 结果解析接口](./02-parse-sql-result.md) +- [接口 1.4 - 数据资产智能识别接口](./04-ai-analyze.md) - 可进一步优化识别结果 diff --git a/docs/04-ai-analyze.md b/docs/04-ai-analyze.md new file mode 100644 index 0000000..f0f1832 --- /dev/null +++ b/docs/04-ai-analyze.md @@ -0,0 +1,751 @@ +# 接口开发说明 - 数据资产智能识别接口 ⭐⭐⭐ + +## 📋 接口基本信息 + +- **接口路径**: `/api/v1/inventory/ai-analyze` +- **请求方法**: `POST` +- **接口功能**: 使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征,并提供置信度评分 +- **涉及页面**: `InventoryStep.vue` - AI 盘点处理阶段 +- **是否涉及大模型**: ✅ **是**(核心功能) +- **工作量评估**: **15 人日** +- **优先级**: **高** + +--- + +## 🎯 功能描述 + +该接口是数据资产盘点系统的核心功能,使用大模型技术智能识别和标注数据资产,具体功能包括: + +1. **表名和字段名中文命名识别** + - 将英文表名/字段名转换为中文名称 + - 识别业务含义 + +2. **业务含义描述生成** + - 自动生成表的中文描述 + - 自动生成字段的中文描述 + +3. **PII(个人信息)识别** + - 识别敏感个人信息(SPI) + - 符合《个人信息保护法》(PIPL) 要求 + - 识别类型:手机号、身份证、姓名、邮箱、地址等 + +4. **重要数据识别** + - 识别《数据安全法》定义的重要数据 + - 涉及国家安全、公共利益的数据 + +5. **置信度评分** + - 评估识别结果的可靠性(0-100%) + - 考虑字段命名规范度、注释完整性等因素 + +--- + +## 🔧 技术实现方案 + +### 技术栈 + +```python +# 核心依赖 +fastapi>=0.104.0 # Web 框架 +pydantic>=2.0.0 # 数据验证 +httpx>=0.24.0 # HTTP 客户端(用于调用大模型 API) + +# 大模型 SDK +openai>=1.0.0 # OpenAI API (如果使用 GPT-4) +dashscope>=1.14.0 # 通义千问 API +qianfan>=0.1.0 # 文心一言 API + +# 工具库 +python-dotenv>=1.0.0 # 环境变量管理 +loguru>=0.7.0 # 日志管理 +redis>=5.0.0 # 缓存(可选) +``` + +### 大模型选择建议 + +| 场景 | 推荐模型 | 理由 | +|------|---------|------| +| 数据资产识别 | 通义千问 / GPT-4 | 需要准确理解表结构和业务含义 | + +### 实现思路 + +1. **输入数据准备**: 整理表结构信息、行业背景、业务上下文 +2. **提示词构建**: 根据输入数据构建专业的提示词 +3. **大模型调用**: 调用大模型 API 进行识别 +4. **结果解析**: 解析大模型返回的 JSON 结果 +5. **规则引擎验证**: 使用规则引擎验证和补充识别结果 +6. **置信度评分**: 计算识别结果的置信度 +7. **结果验证**: 验证数据格式和逻辑正确性 + +--- + +## 📥 请求格式 + +### 请求方式 + +**Content-Type**: `application/json` + +### 请求参数 + +```json +{ + "tables": [ + { + "raw_name": "t_user_base_01", + "fields": [ + { + "raw_name": "user_id", + "type": "varchar(64)", + "comment": "用户ID" + }, + { + "raw_name": "phone", + "type": "varchar(11)", + "comment": "手机号" + }, + { + "raw_name": "id_card", + "type": "varchar(18)", + "comment": "身份证号" + } + ] + } + ], + "project_id": "project_001", + "industry": "retail-fresh", + "context": "某连锁生鲜零售企业,主营水果、蔬菜等生鲜产品", + "options": { + "model": "qwen-max", + "temperature": 0.3, + "enable_pii_detection": true, + "enable_important_data_detection": true + } +} +``` + +### 请求参数说明 + +| 参数名 | 类型 | 必填 | 说明 | +|--------|------|------|------| +| `tables` | array | 是 | 表列表,每个表包含表名和字段列表 | +| `tables[].raw_name` | string | 是 | 表名(英文/原始名称) | +| `tables[].fields` | array | 是 | 字段列表 | +| `tables[].fields[].raw_name` | string | 是 | 字段名(英文) | +| `tables[].fields[].type` | string | 是 | 字段类型 | +| `tables[].fields[].comment` | string | 否 | 字段注释(如果有) | +| `project_id` | string | 是 | 项目ID | +| `industry` | string | 否 | 行业信息(如:retail-fresh) | +| `context` | string | 否 | 业务背景信息 | +| `options` | object | 否 | 可选配置 | +| `options.model` | string | 否 | 大模型选择(qwen-max/gpt-4/ernie-bot) | +| `options.temperature` | float | 否 | 温度参数(0.0-1.0),默认 0.3 | +| `options.enable_pii_detection` | boolean | 否 | 是否启用 PII 识别,默认 true | +| `options.enable_important_data_detection` | boolean | 否 | 是否启用重要数据识别,默认 true | + +--- + +## 📤 响应格式 + +### 成功响应 + +```json +{ + "success": true, + "code": 200, + "message": "数据资产识别成功", + "data": { + "tables": [ + { + "raw_name": "t_user_base_01", + "ai_name": "会员基础信息表", + "desc": "存储C端注册用户的核心身份信息", + "confidence": 98, + "ai_completed": true, + "fields": [ + { + "raw_name": "user_id", + "ai_name": "用户ID", + "desc": "用户的唯一标识符", + "type": "varchar(64)", + "pii": [], + "pii_type": null, + "is_important_data": false, + "confidence": 95 + }, + { + "raw_name": "phone", + "ai_name": "手机号", + "desc": "用户的联系电话", + "type": "varchar(11)", + "pii": ["手机号"], + "pii_type": "contact", + "is_important_data": false, + "confidence": 98 + }, + { + "raw_name": "id_card", + "ai_name": "身份证号", + "desc": "用户的身份证号码", + "type": "varchar(18)", + "pii": ["身份证号"], + "pii_type": "identity", + "is_important_data": false, + "confidence": 99 + } + ], + "pii": ["手机号", "身份证号"], + "important": false, + "important_data_types": [] + } + ], + "statistics": { + "total_tables": 1, + "total_fields": 3, + "pii_fields_count": 2, + "important_data_fields_count": 0, + "average_confidence": 97.3 + }, + "processing_time": 5.2, + "model_used": "qwen-max", + "token_usage": { + "prompt_tokens": 1200, + "completion_tokens": 800, + "total_tokens": 2000 + } + } +} +``` + +### 失败响应 + +```json +{ + "success": false, + "code": 500, + "message": "数据资产识别失败", + "error": { + "error_code": "AI_ANALYZE_ERROR", + "error_detail": "大模型 API 调用失败: Rate limit exceeded", + "retryable": true + } +} +``` + +--- + +## 💻 代码实现示例 + +### 提示词模板设计 + +```python +SYSTEM_PROMPT = """你是一位专业的数据资产管理专家,擅长识别数据资产的中文名称、业务含义、敏感信息和重要数据特征。 + +## 你的专业能力 +- 深入理解数据资产管理、数据合规(PIPL、数据安全法)等法规要求 +- 熟悉各种业务场景下的数据资产命名规范 +- 能够准确识别敏感个人信息(SPI)和重要数据 +- 具备优秀的文本理解和生成能力 + +## 输出要求 +1. **准确性**: 中文命名必须准确反映业务含义 +2. **合规性**: PII 识别必须符合《个人信息保护法》(PIPL) +3. **完整性**: 重要数据识别必须符合《数据安全法》 +4. **专业性**: 使用专业术语,符合行业标准 +5. **结构化**: 严格按照JSON格式输出 +""" + +USER_PROMPT_TEMPLATE = """请基于以下信息识别数据资产: + +## 行业背景 +{industry_info} + +## 业务背景 +{context_info} + +## 表结构信息 +{tables_info} + +## 识别要求 +1. 为每个表生成中文名称(ai_name)和业务描述(desc) +2. 为每个字段生成中文名称(ai_name)和业务描述(desc) +3. 识别敏感个人信息(PII): + - 手机号、身份证号、姓名、邮箱、地址等 + - 生物识别信息(人脸、指纹等) + - 医疗健康信息 + - 金融账户信息 + - 行踪轨迹信息 +4. 识别重要数据(符合《数据安全法》): + - 涉及国家安全的数据 + - 涉及公共利益的数据 + - 高精度地理信息(军事禁区周边) + - 关键物资流向(稀土、芯片等) +5. 计算置信度评分(0-100): + - 字段命名规范度 + - 注释完整性 + - 业务含义明确度 + +## 输出格式(JSON) +{json_schema} +""" + +JSON_SCHEMA = """ +{ + "tables": [ + { + "raw_name": "string", + "ai_name": "string", + "desc": "string", + "confidence": "integer (0-100)", + "fields": [ + { + "raw_name": "string", + "ai_name": "string", + "desc": "string", + "pii": ["string"], + "pii_type": "string | null", + "is_important_data": "boolean", + "confidence": "integer (0-100)" + } + ], + "pii": ["string"], + "important": "boolean", + "important_data_types": ["string"] + } + ] +} +""" +``` + +### FastAPI 实现 + +```python +from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field +from typing import Optional, List, Dict +import json +import os +from dotenv import load_dotenv +import httpx +import time +from loguru import logger + +load_dotenv() + +app = FastAPI() + +# 大模型配置 +MODEL_CONFIG = { + "qwen-max": { + "api_key": os.getenv("DASHSCOPE_API_KEY"), + "base_url": "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation", + "model_name": "qwen-max" + }, + "gpt-4": { + "api_key": os.getenv("OPENAI_API_KEY"), + "base_url": "https://api.openai.com/v1/chat/completions", + "model_name": "gpt-4" + } +} + +class FieldInput(BaseModel): + raw_name: str + type: str + comment: Optional[str] = None + +class TableInput(BaseModel): + raw_name: str + fields: List[FieldInput] + +class AnalyzeRequest(BaseModel): + tables: List[TableInput] + project_id: str + industry: Optional[str] = None + context: Optional[str] = None + options: Optional[Dict] = None + +class FieldOutput(BaseModel): + raw_name: str + ai_name: str + desc: str + type: str + pii: List[str] = [] + pii_type: Optional[str] = None + is_important_data: bool = False + confidence: int = Field(ge=0, le=100) + +class TableOutput(BaseModel): + raw_name: str + ai_name: str + desc: str + confidence: int = Field(ge=0, le=100) + ai_completed: bool = True + fields: List[FieldOutput] + pii: List[str] = [] + important: bool = False + important_data_types: List[str] = [] + +def build_prompt(tables: List[TableInput], industry: str = None, context: str = None) -> str: + """构建提示词""" + # 格式化表信息 + tables_info = [] + for table in tables: + table_info = f"表名: {table.raw_name}\n字段列表:\n" + for field in table.fields: + field_info = f" - {field.raw_name} ({field.type})" + if field.comment: + field_info += f" - {field.comment}" + table_info += field_info + "\n" + tables_info.append(table_info) + + tables_info_str = "\n\n".join(tables_info) + + # 行业信息 + industry_info = industry if industry else "未指定" + + # 业务背景 + context_info = context if context else "未提供业务背景信息" + + # 构建用户提示词 + user_prompt = USER_PROMPT_TEMPLATE.format( + industry_info=industry_info, + context_info=context_info, + tables_info=tables_info_str, + json_schema=JSON_SCHEMA + ) + + return user_prompt + +async def call_llm_api(prompt: str, model: str = "qwen-max", temperature: float = 0.3) -> str: + """调用大模型 API""" + config = MODEL_CONFIG.get(model) + if not config: + raise ValueError(f"不支持的大模型: {model}") + + headers = { + "Authorization": f"Bearer {config['api_key']}", + "Content-Type": "application/json" + } + + if model == "qwen-max": + # 通义千问 API + payload = { + "model": config["model_name"], + "input": { + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": prompt} + ] + }, + "parameters": { + "temperature": temperature, + "result_format": "message" + } + } + elif model == "gpt-4": + # OpenAI API + payload = { + "model": config["model_name"], + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": prompt} + ], + "temperature": temperature, + "response_format": {"type": "json_object"} + } + + async with httpx.AsyncClient(timeout=60.0) as client: + try: + response = await client.post( + config["base_url"], + headers=headers, + json=payload + ) + response.raise_for_status() + result = response.json() + + # 解析响应(根据不同的 API 格式) + if model == "qwen-max": + content = result["output"]["choices"][0]["message"]["content"] + elif model == "gpt-4": + content = result["choices"][0]["message"]["content"] + + return content + + except httpx.HTTPError as e: + logger.error(f"大模型 API 调用失败: {str(e)}") + raise Exception(f"大模型 API 调用失败: {str(e)}") + +def parse_llm_response(response_text: str) -> Dict: + """解析大模型返回的 JSON 结果""" + try: + # 提取 JSON 部分(如果返回的是 Markdown 格式) + if "```json" in response_text: + json_text = response_text.split("```json")[1].split("```")[0].strip() + elif "```" in response_text: + json_text = response_text.split("```")[1].split("```")[0].strip() + else: + json_text = response_text.strip() + + # 解析 JSON + result = json.loads(json_text) + return result + + except json.JSONDecodeError as e: + logger.error(f"JSON 解析失败: {str(e)}") + logger.error(f"原始响应: {response_text}") + raise Exception(f"大模型返回的 JSON 格式错误: {str(e)}") + +def validate_pii_detection(field: FieldOutput, field_input: FieldInput) -> FieldOutput: + """使用规则引擎验证和补充 PII 识别""" + # PII 关键词规则 + pii_keywords = { + "phone": ["手机", "phone", "mobile", "tel", "telephone"], + "id_card": ["身份证", "id_card", "idcard", "identity"], + "name": ["姓名", "name", "real_name"], + "email": ["邮箱", "email", "mail"], + "address": ["地址", "address", "addr"] + } + + field_name_lower = field.raw_name.lower() + + # 如果 AI 未识别,使用规则引擎识别 + if not field.pii: + for pii_type, keywords in pii_keywords.items(): + if any(keyword in field_name_lower for keyword in keywords): + field.pii = [pii_type] + field.pii_type = pii_type + break + + return field + +def calculate_confidence(field: FieldInput, field_output: FieldOutput) -> int: + """计算置信度评分""" + score = 50 # 基础分 + + # 命名规范度(30分) + if field.raw_name.islower() and '_' in field.raw_name: + score += 15 # 蛇形命名 + elif field.raw_name.islower() and field.raw_name.isalnum(): + score += 10 # 小写字母数字 + + # 注释完整性(20分) + if field.comment: + score += 20 + + # AI 识别结果(50分) + if field_output.ai_name and field_output.ai_name != field.raw_name: + score += 25 + if field_output.desc: + score += 25 + + return min(score, 100) + +@app.post("/api/v1/inventory/ai-analyze") +async def ai_analyze(request: AnalyzeRequest): + """ + 数据资产智能识别接口 + + 使用大模型识别数据资产的中文名称、业务含义、PII 敏感信息、重要数据特征 + """ + start_time = time.time() + + try: + # 获取配置 + model = request.options.get("model", "qwen-max") if request.options else "qwen-max" + temperature = request.options.get("temperature", 0.3) if request.options else 0.3 + enable_pii = request.options.get("enable_pii_detection", True) if request.options else True + enable_important = request.options.get("enable_important_data_detection", True) if request.options else True + + # 构建提示词 + prompt = build_prompt( + tables=request.tables, + industry=request.industry, + context=request.context + ) + + logger.info(f"调用大模型 {model} 进行数据资产识别") + + # 调用大模型 + response_text = await call_llm_api(prompt, model=model, temperature=temperature) + + # 解析结果 + llm_result = parse_llm_response(response_text) + + # 转换为标准格式并验证 + tables_output = [] + total_pii_fields = 0 + total_important_fields = 0 + total_confidence = 0 + + for table_result, table_input in zip(llm_result.get("tables", []), request.tables): + fields_output = [] + table_pii = [] + table_important = False + + for field_result, field_input in zip(table_result.get("fields", []), table_input.fields): + field_output = FieldOutput( + raw_name=field_result.get("raw_name", field_input.raw_name), + ai_name=field_result.get("ai_name", field_input.raw_name), + desc=field_result.get("desc", ""), + type=field_input.type, + pii=field_result.get("pii", []), + pii_type=field_result.get("pii_type"), + is_important_data=field_result.get("is_important_data", False), + confidence=field_result.get("confidence", 80) + ) + + # 规则引擎验证和补充 + if enable_pii: + field_output = validate_pii_detection(field_output, field_input) + + # 重新计算置信度 + field_output.confidence = calculate_confidence(field_input, field_output) + + # 收集 PII 信息 + if field_output.pii: + table_pii.extend(field_output.pii) + total_pii_fields += 1 + + # 收集重要数据信息 + if field_output.is_important_data: + table_important = True + total_important_fields += 1 + + fields_output.append(field_output) + total_confidence += field_output.confidence + + table_output = TableOutput( + raw_name=table_result.get("raw_name", table_input.raw_name), + ai_name=table_result.get("ai_name", table_input.raw_name), + desc=table_result.get("desc", ""), + confidence=table_result.get("confidence", 80), + ai_completed=True, + fields=fields_output, + pii=list(set(table_pii)), # 去重 + important=table_important, + important_data_types=table_result.get("important_data_types", []) + ) + + tables_output.append(table_output) + + # 计算统计信息 + total_fields = sum(len(table.fields) for table in tables_output) + avg_confidence = total_confidence / total_fields if total_fields > 0 else 0 + processing_time = time.time() - start_time + + # 构建响应 + response_data = { + "tables": [table.dict() for table in tables_output], + "statistics": { + "total_tables": len(tables_output), + "total_fields": total_fields, + "pii_fields_count": total_pii_fields, + "important_data_fields_count": total_important_fields, + "average_confidence": round(avg_confidence, 2) + }, + "processing_time": round(processing_time, 2), + "model_used": model, + "token_usage": { + "prompt_tokens": len(prompt) // 4, # 粗略估算 + "completion_tokens": len(response_text) // 4, + "total_tokens": (len(prompt) + len(response_text)) // 4 + } + } + + return { + "success": True, + "code": 200, + "message": "数据资产识别成功", + "data": response_data + } + + except Exception as e: + logger.error(f"数据资产识别失败: {str(e)}") + return JSONResponse( + status_code=500, + content={ + "success": False, + "code": 500, + "message": "数据资产识别失败", + "error": { + "error_code": "AI_ANALYZE_ERROR", + "error_detail": str(e), + "retryable": "Rate limit" in str(e) or "timeout" in str(e).lower() + } + } + ) +``` + +--- + +## ⚠️ 注意事项 + +### 1. 提示词工程 + +- **系统提示词**: 定义 AI 角色为"数据资产管理专家" +- **少样本学习**: 提供 5-10 个典型示例 +- **约束条件**: 明确 PII 和重要数据的识别标准 +- **输出格式**: 使用 JSON Schema 确保输出格式正确 + +### 2. PII 识别规则 + +必须符合《个人信息保护法》(PIPL),识别以下类型: +- **身份信息**: 姓名、身份证号、护照号 +- **联系信息**: 手机号、邮箱、地址 +- **生物识别**: 人脸、指纹、声纹 +- **医疗健康**: 体检报告、疾病信息 +- **金融账户**: 银行卡号、账户信息 +- **行踪轨迹**: GPS 位置、行程记录 + +### 3. 重要数据识别规则 + +必须符合《数据安全法》,识别以下类型: +- **国家安全**: 军事信息、国家秘密 +- **公共利益**: 关键基础设施信息 +- **高精度地理**: 军事禁区周边位置 +- **关键物资**: 稀土、芯片等关键物资流向 + +### 4. 错误处理和重试 + +- **API 限流**: 实现指数退避重试策略 +- **超时处理**: 设置合理的超时时间(60秒) +- **降级策略**: API 失败时使用规则引擎作为降级方案 +- **日志记录**: 详细记录每次 API 调用的请求和响应 + +### 5. 性能优化 + +- **批量处理**: 对于大量表,考虑批量调用 API +- **缓存机制**: 相同输入缓存结果,减少 API 调用 +- **异步处理**: 对于大量数据,考虑异步处理 + +### 6. 成本控制 + +- **Token 优化**: 优化提示词,减少 Token 消耗 +- **模型选择**: 根据需求选择合适的模型(平衡成本和质量) +- **缓存策略**: 对相同输入进行缓存 + +--- + +## 📝 开发检查清单 + +- [ ] 大模型 API 集成(通义千问/GPT-4) +- [ ] 提示词工程设计和优化 +- [ ] PII 识别规则引擎 +- [ ] 重要数据识别规则引擎 +- [ ] 置信度评分算法 +- [ ] JSON 解析和验证 +- [ ] 错误处理和重试机制 +- [ ] 缓存机制(可选) +- [ ] 日志记录 +- [ ] 单元测试覆盖 +- [ ] 性能测试 + +--- + +## 🔗 相关文档 + +- [接口清单表格](../Python接口清单表格.md) +- [Python技术人员工作量文档](../Python技术人员工作量文档.md) +- [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md) +- [通义千问 API 文档](https://help.aliyun.com/zh/model-studio/) +- [OpenAI API 文档](https://platform.openai.com/docs) diff --git a/docs/05-scenario-recommendation.md b/docs/05-scenario-recommendation.md new file mode 100644 index 0000000..ec5b2d0 --- /dev/null +++ b/docs/05-scenario-recommendation.md @@ -0,0 +1,145 @@ +# 接口开发说明 - 潜在场景推荐接口 ⭐⭐ + +## 📋 接口基本信息 + +- **接口路径**: `/api/v1/value/scenario-recommendation` +- **请求方法**: `POST` +- **接口功能**: 基于企业背景、数据资产清单和存量场景,使用 AI 推荐潜在的数据应用场景 +- **涉及页面**: `ValueStep.vue` - AI 推荐潜在场景清单 +- **是否涉及大模型**: ✅ **是** +- **工作量评估**: **12 人日** +- **优先级**: **高** + +--- + +## 🎯 功能描述 + +该接口使用大模型技术,基于企业背景、数据资产清单和存量场景,智能推荐潜在的数据应用场景,包括: + +1. **场景分类**: 降本增效、营销增长、金融服务、决策支持等 +2. **推荐指数评分**: 1-5星评分 +3. **场景依赖分析**: 分析场景依赖哪些数据资产 +4. **商业价值评估**: 评估场景的商业价值和实施难度 + +--- + +## 📥 请求格式 + +### 请求参数 + +```json +{ + "project_id": "project_001", + "company_info": { + "industry": ["retail-fresh"], + "description": "某连锁生鲜零售企业,主营水果、蔬菜等生鲜产品,拥有线下门店500家", + "data_scale": "100TB", + "data_sources": ["self-generated"] + }, + "data_assets": [ + { + "name": "会员基础信息表", + "core_tables": ["Dim_Customer"], + "description": "存储C端注册用户的核心身份信息" + }, + { + "name": "订单流水记录表", + "core_tables": ["Fact_Sales"], + "description": "全渠道销售交易明细" + } + ], + "existing_scenarios": [ + { + "name": "月度销售经营报表", + "description": "统计各区域门店的月度GMV,维度单一" + } + ], + "options": { + "model": "qwen-max", + "recommendation_count": 10, + "exclude_types": [] + } +} +``` + +--- + +## 📤 响应格式 + +### 成功响应 + +```json +{ + "success": true, + "code": 200, + "message": "场景推荐成功", + "data": { + "recommended_scenarios": [ + { + "id": 1, + "name": "精准会员营销", + "type": "营销增长", + "recommendation_index": 5, + "desc": "基于用户画像与历史交易行为,实现千人千面的优惠券发放。", + "dependencies": ["会员基础信息表", "订单流水记录表"], + "business_value": "提升复购率 15-20%", + "implementation_difficulty": "中等", + "estimated_roi": "高", + "technical_requirements": ["用户画像引擎", "推荐算法"], + "data_requirements": ["会员基础信息", "交易历史", "行为数据"] + } + ], + "total_count": 10, + "generation_time": 8.5, + "model_used": "qwen-max" + } +} +``` + +--- + +## 💻 提示词模板 + +```python +SCENARIO_RECOMMENDATION_PROMPT = """基于以下企业信息,推荐潜在的数据应用场景: + +## 企业信息 +行业: {industry} +企业描述: {company_description} +数据规模: {data_scale} +数据来源: {data_sources} + +## 可用数据资产 +{data_assets_info} + +## 存量场景(避免重复推荐) +{existing_scenarios_info} + +## 推荐要求 +1. 推荐 {count} 个潜在数据应用场景 +2. 场景分类:降本增效、营销增长、金融服务、决策支持、风险控制等 +3. 推荐指数评分:1-5星(综合考虑业务价值、实施难度、数据准备度) +4. 分析场景依赖的数据资产 +5. 评估商业价值和实施难度 +6. 避免与存量场景重复 + +## 输出格式(JSON) +{json_schema} +""" +``` + +--- + +## ⚠️ 注意事项 + +1. **场景分类**: 需要明确定义场景分类标准 +2. **推荐指数算法**: 综合考虑业务价值、实施难度、数据准备度 +3. **依赖分析**: 准确识别场景依赖的数据资产 +4. **避免重复**: 与存量场景对比,避免重复推荐 + +--- + +## 🔗 相关文档 + +- [接口清单表格](../Python接口清单表格.md) +- [Python技术人员工作量文档](../Python技术人员工作量文档.md) diff --git a/docs/06-scenario-optimization.md b/docs/06-scenario-optimization.md new file mode 100644 index 0000000..108e750 --- /dev/null +++ b/docs/06-scenario-optimization.md @@ -0,0 +1,86 @@ +# 接口开发说明 - 存量场景优化建议接口 + +## 📋 接口基本信息 + +- **接口路径**: `/api/v1/value/scenario-optimization` +- **请求方法**: `POST` +- **接口功能**: 基于存量场景信息和截图,分析场景不足,提供优化建议和改进方向 +- **涉及页面**: `ContextStep.vue` - 生成场景挖掘与优化建议按钮 +- **是否涉及大模型**: ✅ **是** +- **工作量评估**: 8 人日 +- **优先级**: 中 + +--- + +## 🎯 功能描述 + +该接口使用大模型技术分析存量场景的不足,并提供优化建议,支持: +1. **图片识别(OCR)**: 如果上传了场景截图,使用 OCR 识别内容 +2. **场景分析**: 分析现有场景的功能和不足 +3. **优化建议**: 提供具体的优化建议和改进方向 +4. **价值提升**: 识别可提升的价值点 + +--- + +## 📥 请求格式 + +```json +{ + "existing_scenarios": [ + { + "name": "月度销售经营报表", + "description": "统计各区域门店的月度GMV,维度单一", + "image_url": "https://example.com/screenshot.png" // 可选 + } + ], + "data_assets": [...], + "company_info": {...} +} +``` + +--- + +## 📤 响应格式 + +```json +{ + "success": true, + "data": { + "optimization_suggestions": [ + { + "scenario_name": "月度销售经营报表", + "current_status": "维度单一,仅统计GMV", + "suggestions": [ + "增加时间维度分析(同比、环比)", + "增加商品类别维度分析", + "增加区域对比分析" + ], + "potential_value": "提升决策支持能力 30%" + } + ] + } +} +``` + +--- + +## 💻 技术实现要点 + +1. **OCR 集成**: 使用 PaddleOCR 识别场景截图 +2. **大模型分析**: 调用大模型分析场景不足 +3. **建议生成**: 基于分析结果生成优化建议 + +--- + +## ⚠️ 注意事项 + +1. **图片处理**: 支持常见图片格式(PNG、JPG、JPEG) +2. **OCR 准确性**: 需要处理 OCR 识别错误的情况 +3. **建议可操作性**: 优化建议必须具体、可执行 + +--- + +## 🔗 相关文档 + +- [接口清单表格](../Python接口清单表格.md) +- [Python技术人员工作量文档](../Python技术人员工作量文档.md) diff --git a/docs/07-generate-report.md b/docs/07-generate-report.md new file mode 100644 index 0000000..012c374 --- /dev/null +++ b/docs/07-generate-report.md @@ -0,0 +1,150 @@ +# 接口开发说明 - 完整报告生成接口 ⭐⭐⭐ + +## 📋 接口基本信息 + +- **接口路径**: `/api/v1/delivery/generate-report` +- **请求方法**: `POST` +- **接口功能**: 基于数据盘点结果、背景调研信息和价值挖掘场景,使用大模型生成完整的数据资产盘点工作总结报告(四个章节) +- **涉及页面**: `DeliveryStep.vue` - 成果交付页面 +- **是否涉及大模型**: ✅ **是**(核心功能) +- **工作量评估**: **20 人日** +- **优先级**: **高** + +--- + +## 🎯 功能描述 + +该接口是数据资产盘点系统的核心输出功能,使用大模型生成完整的工作总结报告,包含四个章节: + +1. **章节一**: 企业数字化情况简介(企业背景、信息化建设现状、业务流与数据流) +2. **章节二**: 数据资源统计(数据总量、存储分布、数据来源结构) +3. **章节三**: 数据资产情况盘点(资产构成、应用场景、合规风险提示) +4. **章节四**: 专家建议与下一步计划(合规整改、技术演进、价值深化) + +支持功能: +- **分阶段生成**: 支持分阶段生成,提高质量和可控性 +- **内容验证**: 验证统计数据逻辑正确性(如百分比总和为100%) +- **合规性检查**: 验证合规风险分析的完整性 +- **格式化输出**: 返回结构化的 JSON 格式 + +--- + +## 📥 请求格式 + +```json +{ + "project_id": "project_001", + "project_info": { + "project_name": "数据资产盘点项目", + "industry": "retail-fresh", + "company_name": "某连锁生鲜零售企业" + }, + "inventory_data": { + "total_tables": 14582, + "total_fields": 245000, + "total_data_volume": "58 PB", + "storage_distribution": [...], + "data_source_structure": { + "structured": 35, + "semi_structured": 65 + }, + "identified_assets": [...] + }, + "context_data": { + "enterprise_background": "...", + "informatization_status": "...", + "business_flow": "..." + }, + "value_data": { + "selected_scenarios": [...] + }, + "options": { + "language": "zh-CN", + "detail_level": "standard", + "generation_mode": "full | staged" + } +} +``` + +--- + +## 📤 响应格式 + +详见《数据资产盘点报告-大模型接口设计文档.md》中的详细响应格式定义。 + +--- + +## 💻 实现要点 + +### 1. 分阶段生成策略(推荐) + +```python +# 阶段一:生成章节一和章节二 +stage1_result = await generate_sections_1_2(inventory_data, context_data) + +# 阶段二:生成章节三(重点合规风险分析) +stage2_result = await generate_section_3(identified_assets, stage1_result) + +# 阶段三:生成章节四(基于前面章节的分析结果) +stage3_result = await generate_section_4(stage1_result, stage2_result, value_data) +``` + +### 2. 数据验证 + +```python +def validate_report_data(report_data: dict) -> bool: + """验证报告数据""" + # 验证百分比总和为100% + section2 = report_data.get("section2", {}) + structured = section2.get("data_source_structure", {}).get("structured", {}).get("percentage", 0) + semi_structured = section2.get("data_source_structure", {}).get("semi_structured", {}).get("percentage", 0) + + if structured + semi_structured != 100: + raise ValueError("数据来源结构百分比总和必须为100%") + + # 验证合规风险分析完整性 + section3 = report_data.get("section3", {}) + assets = section3.get("assets", []) + + for asset in assets: + if not asset.get("compliance_risks", {}).get("warnings"): + logger.warning(f"资产 {asset.get('title')} 缺少合规风险分析") + + return True +``` + +### 3. 提示词模板 + +详见《数据资产盘点报告-大模型接口设计文档.md》中的提示词工程设计方案。 + +--- + +## ⚠️ 注意事项 + +1. **长文本生成**: 需要使用支持长文本的模型(GPT-4 / 通义千问 Max) +2. **Token 消耗**: 报告生成会消耗大量 Token,需要优化提示词 +3. **数据准确性**: 统计数据必须准确,基于输入数据 +4. **合规性**: 合规风险分析必须符合 PIPL、数据安全法等法规 +5. **建议可操作性**: 专家建议必须具体、可执行 + +--- + +## 📝 开发检查清单 + +- [ ] 大模型集成(GPT-4 / 通义千问 Max) +- [ ] 分阶段生成策略实现 +- [ ] 四个章节的提示词工程 +- [ ] 数据验证引擎 +- [ ] 合规性验证 +- [ ] 错误处理和重试机制 +- [ ] 缓存机制(可选) +- [ ] 日志记录 +- [ ] 单元测试 + +--- + +## 🔗 相关文档 + +- [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md) - **详细设计文档** +- [接口清单表格](../Python接口清单表格.md) +- [Python技术人员工作量文档](../Python技术人员工作量文档.md) diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..d062e72 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,150 @@ +# API 接口开发文档索引 + +## 📋 文档说明 + +本目录包含数据资源盘点系统中所有需要 Python 开发的接口的详细开发说明文档。每个接口都有独立的文档,包含完整的开发指导信息。 + +--- + +## 📚 接口文档列表 + +### 模块一:数据盘点智能分析服务 + +| 序号 | 接口名称 | 文档路径 | 是否大模型 | 优先级 | 工作量 | +|------|---------|---------|-----------|--------|--------| +| 1.1 | [文档解析接口](./01-parse-document.md) | `01-parse-document.md` | ❌ | 中 | 5 人日 | +| 1.2 | [SQL 结果解析接口](./02-parse-sql-result.md) | `02-parse-sql-result.md` | ❌ | 低 | 2 人日 | +| 1.3 | [业务表解析接口](./03-parse-business-tables.md) | `03-parse-business-tables.md` | ❌ | 中 | 3 人日 | +| 1.4 | [数据资产智能识别接口 ⭐⭐⭐](./04-ai-analyze.md) | `04-ai-analyze.md` | ✅ **是** | **高** | **15 人日** | + +### 模块二:场景挖掘智能推荐服务 + +| 序号 | 接口名称 | 文档路径 | 是否大模型 | 优先级 | 工作量 | +|------|---------|---------|-----------|--------|--------| +| 2.1 | [潜在场景推荐接口 ⭐⭐](./05-scenario-recommendation.md) | `05-scenario-recommendation.md` | ✅ **是** | **高** | **12 人日** | +| 2.2 | [存量场景优化建议接口](./06-scenario-optimization.md) | `06-scenario-optimization.md` | ✅ **是** | 中 | 8 人日 | + +### 模块三:数据资产盘点报告生成服务 + +| 序号 | 接口名称 | 文档路径 | 是否大模型 | 优先级 | 工作量 | +|------|---------|---------|-----------|--------|--------| +| 3.1 | [完整报告生成接口 ⭐⭐⭐](./07-generate-report.md) | `07-generate-report.md` | ✅ **是** | **高** | **20 人日** | + +--- + +## 📊 文档内容结构 + +每个接口文档包含以下内容: + +1. **接口基本信息** + - 接口路径、请求方法 + - 功能描述、涉及页面 + - 工作量评估、优先级 + +2. **功能描述** + - 详细的功能说明 + - 适用场景 + +3. **技术实现方案** + - 技术栈推荐 + - 实现思路 + - 架构设计 + +4. **请求/响应格式** + - 详细的请求参数说明 + - 响应格式定义 + - 字段说明 + +5. **代码实现示例** + - FastAPI 实现代码 + - 关键逻辑示例 + - 最佳实践 + +6. **测试用例** + - 单元测试示例 + - 集成测试建议 + +7. **注意事项** + - 常见问题和解决方案 + - 性能优化建议 + - 安全注意事项 + +8. **开发检查清单** + - 开发任务清单 + - 验收标准 + +--- + +## 🎯 快速导航 + +### 按优先级排序 + +**高优先级(核心功能)**: +1. [数据资产智能识别接口](./04-ai-analyze.md) - 15 人日 ⭐⭐⭐ +2. [完整报告生成接口](./07-generate-report.md) - 20 人日 ⭐⭐⭐ +3. [潜在场景推荐接口](./05-scenario-recommendation.md) - 12 人日 ⭐⭐ + +**中优先级**: +4. [文档解析接口](./01-parse-document.md) - 5 人日 +5. [业务表解析接口](./03-parse-business-tables.md) - 3 人日 +6. [存量场景优化建议接口](./06-scenario-optimization.md) - 8 人日 + +**低优先级**: +7. [SQL 结果解析接口](./02-parse-sql-result.md) - 2 人日 + +### 按功能分类 + +**大模型接口(4个)**: +- [数据资产智能识别接口](./04-ai-analyze.md) +- [潜在场景推荐接口](./05-scenario-recommendation.md) +- [存量场景优化建议接口](./06-scenario-optimization.md) +- [完整报告生成接口](./07-generate-report.md) + +**数据解析接口(3个)**: +- [文档解析接口](./01-parse-document.md) +- [SQL 结果解析接口](./02-parse-sql-result.md) +- [业务表解析接口](./03-parse-business-tables.md) + +--- + +## 📈 开发建议 + +### 第一阶段(MVP 版本)- 4 周 + +**推荐顺序**: +1. [数据资产智能识别接口](./04-ai-analyze.md) - 核心功能 +2. [完整报告生成接口](./07-generate-report.md) - 核心功能(简化版) +3. [文档解析接口](./01-parse-document.md) - 基础功能 + +### 第二阶段(完善版本)- 3 周 + +**推荐顺序**: +1. [潜在场景推荐接口](./05-scenario-recommendation.md) +2. [存量场景优化建议接口](./06-scenario-optimization.md) +3. [业务表解析接口](./03-parse-business-tables.md) +4. [SQL 结果解析接口](./02-parse-sql-result.md) + +--- + +## 🔗 相关文档 + +- [接口清单表格](../Python接口清单表格.md) - 接口总览和统计 +- [Python技术人员工作量文档](../Python技术人员工作量文档.md) - 详细工作量评估 +- [数据资产盘点报告-大模型接口设计文档](../数据资产盘点报告-大模型接口设计文档.md) - 报告生成接口详细设计 + +--- + +## 📞 联系方式 + +如有接口开发相关问题,请联系: +- **Python 技术负责人**: [待填写] +- **大模型技术顾问**: [待填写] +- **接口对接负责人**: [待填写] + +--- + +## 📅 更新记录 + +| 版本 | 日期 | 更新内容 | 作者 | +|------|------|---------|------| +| v1.0 | 2025-01-XX | 初始版本,包含 7 个接口的完整开发说明文档 | AI Assistant |