596 lines
18 KiB
Markdown
596 lines
18 KiB
Markdown
# 接口开发说明 - SQL 结果解析接口
|
||
|
||
## 📋 接口基本信息
|
||
|
||
- **接口路径**: `/api/v1/inventory/parse-sql-result`
|
||
- **请求方法**: `POST`
|
||
- **接口功能**: 解析 IT 执行 SQL 脚本后导出的 Excel/CSV 结果文件,提取表名、字段名、字段类型等信息
|
||
- **涉及页面**: `InventoryStep.vue` - 方案二(IT 脚本提取)
|
||
- **是否涉及大模型**: ❌ 否
|
||
- **工作量评估**: 2 人日
|
||
- **优先级**: 低
|
||
|
||
---
|
||
|
||
## 🎯 功能描述
|
||
|
||
该接口用于解析 IT 部门执行标准 SQL 脚本后导出的结果文件。SQL 脚本通常查询 `information_schema.COLUMNS` 表,导出的结果文件包含以下列:
|
||
- 表英文名 (TABLE_NAME)
|
||
- 表中文名/描述 (TABLE_COMMENT)
|
||
- 字段英文名 (COLUMN_NAME)
|
||
- 字段中文名 (COLUMN_COMMENT)
|
||
- 字段类型 (COLUMN_TYPE)
|
||
|
||
支持的文件格式:
|
||
- **Excel**: `.xlsx`, `.xls`
|
||
- **CSV**: `.csv`
|
||
|
||
---
|
||
|
||
## 🔧 技术实现方案
|
||
|
||
### 技术栈
|
||
|
||
```python
|
||
# 核心依赖
|
||
fastapi>=0.104.0 # Web 框架
|
||
pydantic>=2.0.0 # 数据验证
|
||
|
||
# 数据处理
|
||
pandas>=2.0.0 # CSV/Excel 解析
|
||
openpyxl>=3.1.0 # Excel 处理(如果使用 openpyxl)
|
||
```
|
||
|
||
### 实现思路
|
||
|
||
1. **文件上传/路径**: 接收 Excel 或 CSV 文件
|
||
2. **文件解析**: 使用 `pandas` 读取文件
|
||
3. **数据清洗**: 清理空行、空值,标准化数据格式
|
||
4. **表结构提取**: 根据列名提取表名、字段名、类型等信息
|
||
5. **数据验证**: 验证数据完整性和格式正确性
|
||
6. **结果返回**: 返回标准化的表结构数据
|
||
|
||
---
|
||
|
||
## 📥 请求格式
|
||
|
||
### 请求方式
|
||
|
||
**Content-Type**: `multipart/form-data` 或 `application/json`
|
||
|
||
### 请求参数
|
||
|
||
```http
|
||
POST /api/v1/inventory/parse-sql-result
|
||
Content-Type: multipart/form-data
|
||
|
||
file: [二进制文件]
|
||
project_id: string
|
||
file_type: excel | csv (可选,自动识别)
|
||
```
|
||
|
||
或
|
||
|
||
```json
|
||
{
|
||
"file_path": "/path/to/result.xlsx",
|
||
"file_type": "excel | csv",
|
||
"project_id": "project_001"
|
||
}
|
||
```
|
||
|
||
### 请求参数说明
|
||
|
||
| 参数名 | 类型 | 必填 | 说明 |
|
||
|--------|------|------|------|
|
||
| `file` | File | 是 | 上传的文件(方式一) |
|
||
| `file_path` | string | 是 | 文件路径(方式二) |
|
||
| `file_type` | string | 否 | 文件类型:`excel` / `csv`,如果不传则根据文件扩展名自动识别 |
|
||
| `project_id` | string | 是 | 项目ID |
|
||
|
||
### 标准 SQL 脚本示例
|
||
|
||
IT 部门需要执行的 SQL 脚本:
|
||
|
||
```sql
|
||
SELECT
|
||
TABLE_NAME AS '表英文名',
|
||
TABLE_COMMENT AS '表中文名/描述',
|
||
COLUMN_NAME AS '字段英文名',
|
||
COLUMN_COMMENT AS '字段中文名',
|
||
COLUMN_TYPE AS '字段类型'
|
||
FROM information_schema.COLUMNS
|
||
WHERE TABLE_SCHEMA = '您的数据库名';
|
||
```
|
||
|
||
---
|
||
|
||
## 📤 响应格式
|
||
|
||
### 成功响应
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"code": 200,
|
||
"message": "SQL 结果解析成功",
|
||
"data": {
|
||
"tables": [
|
||
{
|
||
"raw_name": "t_user_base_01",
|
||
"display_name": "用户基础信息表",
|
||
"description": "存储用户基本信息的表",
|
||
"fields": [
|
||
{
|
||
"raw_name": "user_id",
|
||
"display_name": "用户ID",
|
||
"type": "varchar(64)",
|
||
"comment": "用户的唯一标识符"
|
||
}
|
||
],
|
||
"field_count": 10
|
||
}
|
||
],
|
||
"total_tables": 5,
|
||
"total_fields": 245,
|
||
"parse_time": 0.45,
|
||
"file_info": {
|
||
"file_name": "schema_export.xlsx",
|
||
"file_size": 512000,
|
||
"file_type": "excel"
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 失败响应
|
||
|
||
```json
|
||
{
|
||
"success": false,
|
||
"code": 400,
|
||
"message": "文件格式错误或缺少必要列",
|
||
"error": {
|
||
"error_code": "INVALID_FILE_FORMAT",
|
||
"error_detail": "文件缺少必要列:表英文名、字段英文名、字段类型"
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 💻 代码实现示例
|
||
|
||
### FastAPI 实现
|
||
|
||
```python
|
||
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
|
||
from fastapi.responses import JSONResponse
|
||
from pydantic import BaseModel
|
||
from typing import Optional, List, Dict
|
||
import pandas as pd
|
||
import os
|
||
from pathlib import Path
|
||
import time
|
||
|
||
app = FastAPI()
|
||
|
||
class FieldInfo(BaseModel):
|
||
raw_name: str
|
||
display_name: Optional[str] = None
|
||
type: str
|
||
comment: Optional[str] = None
|
||
|
||
class TableInfo(BaseModel):
|
||
raw_name: str
|
||
display_name: Optional[str] = None
|
||
description: Optional[str] = None
|
||
fields: List[FieldInfo]
|
||
field_count: int
|
||
|
||
def parse_sql_result_excel(file_path: str) -> List[TableInfo]:
|
||
"""解析 Excel 格式的 SQL 结果"""
|
||
try:
|
||
# 读取 Excel 文件
|
||
df = pd.read_excel(file_path)
|
||
|
||
# 标准化列名(支持多种可能的列名)
|
||
column_mapping = {
|
||
'表英文名': 'table_name',
|
||
'TABLE_NAME': 'table_name',
|
||
'table_name': 'table_name',
|
||
'表中文名/描述': 'table_comment',
|
||
'TABLE_COMMENT': 'table_comment',
|
||
'table_comment': 'table_comment',
|
||
'字段英文名': 'column_name',
|
||
'COLUMN_NAME': 'column_name',
|
||
'column_name': 'column_name',
|
||
'字段中文名': 'column_comment',
|
||
'COLUMN_COMMENT': 'column_comment',
|
||
'column_comment': 'column_comment',
|
||
'字段类型': 'column_type',
|
||
'COLUMN_TYPE': 'column_type',
|
||
'column_type': 'column_type'
|
||
}
|
||
|
||
# 重命名列
|
||
df.columns = df.columns.str.strip()
|
||
df = df.rename(columns=column_mapping)
|
||
|
||
# 验证必要列是否存在
|
||
required_columns = ['table_name', 'column_name', 'column_type']
|
||
missing_columns = [col for col in required_columns if col not in df.columns]
|
||
if missing_columns:
|
||
raise ValueError(f"缺少必要列: {', '.join(missing_columns)}")
|
||
|
||
# 清理数据(去除空值)
|
||
df = df.dropna(subset=['table_name', 'column_name'])
|
||
|
||
# 按表名分组
|
||
tables_dict: Dict[str, List[FieldInfo]] = {}
|
||
for _, row in df.iterrows():
|
||
table_name = str(row['table_name']).strip()
|
||
column_name = str(row['column_name']).strip()
|
||
|
||
if not table_name or not column_name:
|
||
continue
|
||
|
||
# 获取字段信息
|
||
field = FieldInfo(
|
||
raw_name=column_name,
|
||
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
|
||
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
|
||
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
|
||
)
|
||
|
||
# 按表分组
|
||
if table_name not in tables_dict:
|
||
tables_dict[table_name] = []
|
||
tables_dict[table_name].append(field)
|
||
|
||
# 构建表信息
|
||
tables = []
|
||
for table_name, fields in tables_dict.items():
|
||
# 获取表的描述信息(取第一个字段的表描述,或使用表名)
|
||
table_comment = None
|
||
if 'table_comment' in df.columns:
|
||
table_comment_row = df[df['table_name'] == table_name].iloc[0]
|
||
if pd.notna(table_comment_row.get('table_comment')):
|
||
table_comment = str(table_comment_row['table_comment']).strip()
|
||
|
||
table = TableInfo(
|
||
raw_name=table_name,
|
||
display_name=table_comment if table_comment else table_name,
|
||
description=table_comment,
|
||
fields=fields,
|
||
field_count=len(fields)
|
||
)
|
||
tables.append(table)
|
||
|
||
return tables
|
||
|
||
except Exception as e:
|
||
raise Exception(f"Excel 解析失败: {str(e)}")
|
||
|
||
def parse_sql_result_csv(file_path: str) -> List[TableInfo]:
|
||
"""解析 CSV 格式的 SQL 结果"""
|
||
try:
|
||
# 读取 CSV 文件(尝试不同的编码)
|
||
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
|
||
df = None
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
df = pd.read_csv(file_path, encoding=encoding)
|
||
break
|
||
except UnicodeDecodeError:
|
||
continue
|
||
|
||
if df is None:
|
||
raise ValueError("无法解析 CSV 文件,请检查文件编码")
|
||
|
||
# 后续处理与 Excel 相同
|
||
return parse_sql_result_excel_dataframe(df)
|
||
|
||
except Exception as e:
|
||
raise Exception(f"CSV 解析失败: {str(e)}")
|
||
|
||
def parse_sql_result_excel_dataframe(df: pd.DataFrame) -> List[TableInfo]:
|
||
"""从 DataFrame 解析 SQL 结果(共用逻辑)"""
|
||
# 标准化列名
|
||
column_mapping = {
|
||
'表英文名': 'table_name',
|
||
'TABLE_NAME': 'table_name',
|
||
'table_name': 'table_name',
|
||
'表中文名/描述': 'table_comment',
|
||
'TABLE_COMMENT': 'table_comment',
|
||
'table_comment': 'table_comment',
|
||
'字段英文名': 'column_name',
|
||
'COLUMN_NAME': 'column_name',
|
||
'column_name': 'column_name',
|
||
'字段中文名': 'column_comment',
|
||
'COLUMN_COMMENT': 'column_comment',
|
||
'column_comment': 'column_comment',
|
||
'字段类型': 'column_type',
|
||
'COLUMN_TYPE': 'column_type',
|
||
'column_type': 'column_type'
|
||
}
|
||
|
||
df.columns = df.columns.str.strip()
|
||
df = df.rename(columns=column_mapping)
|
||
|
||
# 验证必要列
|
||
required_columns = ['table_name', 'column_name', 'column_type']
|
||
missing_columns = [col for col in required_columns if col not in df.columns]
|
||
if missing_columns:
|
||
raise ValueError(f"缺少必要列: {', '.join(missing_columns)}")
|
||
|
||
# 清理数据
|
||
df = df.dropna(subset=['table_name', 'column_name'])
|
||
|
||
# 按表分组
|
||
tables_dict = {}
|
||
for _, row in df.iterrows():
|
||
table_name = str(row['table_name']).strip()
|
||
column_name = str(row['column_name']).strip()
|
||
|
||
if not table_name or not column_name:
|
||
continue
|
||
|
||
field = FieldInfo(
|
||
raw_name=column_name,
|
||
display_name=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None,
|
||
type=str(row.get('column_type', 'varchar(255)')).strip() if pd.notna(row.get('column_type')) else 'varchar(255)',
|
||
comment=str(row.get('column_comment', '')).strip() if pd.notna(row.get('column_comment')) else None
|
||
)
|
||
|
||
if table_name not in tables_dict:
|
||
tables_dict[table_name] = []
|
||
tables_dict[table_name].append(field)
|
||
|
||
# 构建表信息
|
||
tables = []
|
||
for table_name, fields in tables_dict.items():
|
||
table_comment = None
|
||
if 'table_comment' in df.columns:
|
||
table_comment_row = df[df['table_name'] == table_name].iloc[0]
|
||
if pd.notna(table_comment_row.get('table_comment')):
|
||
table_comment = str(table_comment_row['table_comment']).strip()
|
||
|
||
table = TableInfo(
|
||
raw_name=table_name,
|
||
display_name=table_comment if table_comment else table_name,
|
||
description=table_comment,
|
||
fields=fields,
|
||
field_count=len(fields)
|
||
)
|
||
tables.append(table)
|
||
|
||
return tables
|
||
|
||
@app.post("/api/v1/inventory/parse-sql-result")
|
||
async def parse_sql_result(
|
||
file: Optional[UploadFile] = File(None),
|
||
file_path: Optional[str] = Form(None),
|
||
file_type: Optional[str] = Form(None),
|
||
project_id: str = Form(...)
|
||
):
|
||
"""
|
||
SQL 结果解析接口
|
||
|
||
解析 IT 执行 SQL 脚本后导出的 Excel/CSV 结果文件
|
||
"""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# 验证参数
|
||
if not file and not file_path:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="必须提供文件或文件路径"
|
||
)
|
||
|
||
# 处理文件上传
|
||
if file:
|
||
upload_dir = Path("/tmp/uploads")
|
||
upload_dir.mkdir(exist_ok=True)
|
||
file_path = str(upload_dir / file.filename)
|
||
|
||
with open(file_path, "wb") as f:
|
||
content = await file.read()
|
||
f.write(content)
|
||
|
||
file_name = file.filename
|
||
file_size = len(content)
|
||
|
||
if not file_type:
|
||
ext = Path(file_name).suffix.lower()
|
||
if ext in ['.xlsx', '.xls']:
|
||
file_type = 'excel'
|
||
elif ext == '.csv':
|
||
file_type = 'csv'
|
||
else:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail=f"不支持的文件类型: {ext}"
|
||
)
|
||
else:
|
||
if not os.path.exists(file_path):
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"文件不存在: {file_path}"
|
||
)
|
||
file_name = Path(file_path).name
|
||
file_size = os.path.getsize(file_path)
|
||
|
||
if not file_type:
|
||
ext = Path(file_name).suffix.lower()
|
||
if ext in ['.xlsx', '.xls']:
|
||
file_type = 'excel'
|
||
elif ext == '.csv':
|
||
file_type = 'csv'
|
||
else:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail=f"不支持的文件类型: {ext}"
|
||
)
|
||
|
||
# 根据文件类型解析
|
||
if file_type == 'excel':
|
||
tables = parse_sql_result_excel(file_path)
|
||
elif file_type == 'csv':
|
||
tables = parse_sql_result_csv(file_path)
|
||
else:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail=f"不支持的文件类型: {file_type}"
|
||
)
|
||
|
||
# 计算统计信息
|
||
total_fields = sum(table.field_count for table in tables)
|
||
parse_time = time.time() - start_time
|
||
|
||
# 构建响应
|
||
response_data = {
|
||
"tables": [table.dict() for table in tables],
|
||
"total_tables": len(tables),
|
||
"total_fields": total_fields,
|
||
"parse_time": round(parse_time, 2),
|
||
"file_info": {
|
||
"file_name": file_name,
|
||
"file_size": file_size,
|
||
"file_type": file_type
|
||
}
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"code": 200,
|
||
"message": "SQL 结果解析成功",
|
||
"data": response_data
|
||
}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={
|
||
"success": False,
|
||
"code": 500,
|
||
"message": "SQL 结果解析失败",
|
||
"error": {
|
||
"error_code": "PARSE_ERROR",
|
||
"error_detail": str(e)
|
||
}
|
||
}
|
||
)
|
||
```
|
||
|
||
---
|
||
|
||
## 🧪 测试用例
|
||
|
||
### 单元测试示例
|
||
|
||
```python
|
||
import pytest
|
||
from fastapi.testclient import TestClient
|
||
import pandas as pd
|
||
import tempfile
|
||
|
||
client = TestClient(app)
|
||
|
||
def test_parse_sql_result_excel():
|
||
"""测试解析 Excel 格式的 SQL 结果"""
|
||
# 创建测试数据
|
||
test_data = {
|
||
'表英文名': ['t_user', 't_user', 't_order'],
|
||
'表中文名/描述': ['用户表', '用户表', '订单表'],
|
||
'字段英文名': ['user_id', 'user_name', 'order_id'],
|
||
'字段中文名': ['用户ID', '用户名', '订单ID'],
|
||
'字段类型': ['varchar(64)', 'varchar(50)', 'bigint']
|
||
}
|
||
df = pd.DataFrame(test_data)
|
||
|
||
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp:
|
||
df.to_excel(tmp.name, index=False)
|
||
|
||
with open(tmp.name, 'rb') as f:
|
||
response = client.post(
|
||
"/api/v1/inventory/parse-sql-result",
|
||
files={"file": ("test.xlsx", f, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")},
|
||
data={"project_id": "test_project"}
|
||
)
|
||
|
||
assert response.status_code == 200
|
||
data = response.json()
|
||
assert data["success"] is True
|
||
assert data["data"]["total_tables"] == 2 # t_user 和 t_order
|
||
assert data["data"]["total_fields"] == 3
|
||
|
||
def test_invalid_file_format():
|
||
"""测试无效文件格式"""
|
||
response = client.post(
|
||
"/api/v1/inventory/parse-sql-result",
|
||
files={"file": ("test.txt", b"invalid content", "text/plain")},
|
||
data={"project_id": "test_project"}
|
||
)
|
||
|
||
assert response.status_code == 400
|
||
data = response.json()
|
||
assert data["success"] is False
|
||
```
|
||
|
||
---
|
||
|
||
## ⚠️ 注意事项
|
||
|
||
### 1. 列名映射
|
||
|
||
由于不同数据库导出的列名可能不同,需要支持多种列名映射:
|
||
- `表英文名` / `TABLE_NAME` / `table_name`
|
||
- `字段英文名` / `COLUMN_NAME` / `column_name`
|
||
- `字段类型` / `COLUMN_TYPE` / `column_type`
|
||
|
||
### 2. CSV 编码问题
|
||
|
||
CSV 文件可能存在编码问题(GBK、UTF-8 等),需要尝试多种编码:
|
||
- UTF-8(优先)
|
||
- GBK
|
||
- GB2312
|
||
- Latin-1
|
||
|
||
### 3. 数据清洗
|
||
|
||
- 去除空行和空值
|
||
- 标准化表名和字段名(去除前后空格)
|
||
- 处理特殊字符
|
||
|
||
### 4. 错误处理
|
||
|
||
- 文件格式验证
|
||
- 必要列验证
|
||
- 数据完整性验证
|
||
- 异常捕获和日志记录
|
||
|
||
---
|
||
|
||
## 📝 开发检查清单
|
||
|
||
- [ ] 支持 Excel (.xlsx, .xls) 格式解析
|
||
- [ ] 支持 CSV (.csv) 格式解析
|
||
- [ ] 支持多种列名映射
|
||
- [ ] CSV 编码自动检测
|
||
- [ ] 数据清洗和验证
|
||
- [ ] 错误处理和异常捕获
|
||
- [ ] 单元测试覆盖
|
||
- [ ] 日志记录
|
||
|
||
---
|
||
|
||
## 🔗 相关文档
|
||
|
||
- [接口清单表格](../Python接口清单表格.md)
|
||
- [Python技术人员工作量文档](../Python技术人员工作量文档.md)
|