303 lines
10 KiB
Python
303 lines
10 KiB
Python
"""
|
||
场景优化服务
|
||
"""
|
||
import time
|
||
import base64
|
||
from typing import List, Optional
|
||
from app.schemas.scenario_optimization import (
|
||
ScenarioOptimizationRequest,
|
||
ScenarioOptimizationResponse,
|
||
OptimizationSuggestion,
|
||
)
|
||
from app.utils.llm_client import llm_client
|
||
from app.utils.logger import logger
|
||
from app.core.config import settings
|
||
from app.core.exceptions import LLMAPIException
|
||
|
||
|
||
# ==================== 提示词模板 ====================
|
||
|
||
SYSTEM_PROMPT = """你是一位专业的数据应用场景优化专家,擅长分析现有数据应用场景的不足,并提供具体的优化建议。
|
||
|
||
## 你的专业能力
|
||
- 深入理解各行业的数据应用场景和最佳实践
|
||
- 熟悉场景优化和价值提升的方法
|
||
- 能够识别场景中的痛点和改进空间
|
||
- 具备优秀的场景分析和优化建议能力
|
||
|
||
## 输出要求
|
||
1. **准确性**:分析必须基于实际场景信息
|
||
2. **价值性**:优化建议必须具有明确的商业价值
|
||
3. **可操作性**:建议必须具体、可执行
|
||
4. **专业性**:使用专业术语,符合行业标准
|
||
5. **结构化**:严格按照JSON格式输出
|
||
"""
|
||
|
||
|
||
def build_scenario_optimization_prompt(
|
||
existing_scenarios: List[dict],
|
||
data_assets: List[dict],
|
||
company_info: dict = None,
|
||
screenshot_analysis: Optional[str] = None
|
||
) -> str:
|
||
"""构建场景优化提示词"""
|
||
|
||
# 格式化存量场景
|
||
scenarios_info = "\n".join([
|
||
f"- {scenario.get('name', '')}:{scenario.get('description', '')}"
|
||
for scenario in existing_scenarios
|
||
])
|
||
|
||
# 格式化数据资产
|
||
assets_info = "\n".join([
|
||
f"- {asset.get('name', '')}:{asset.get('description', '')}\n 核心表:{', '.join(asset.get('core_tables', []))}"
|
||
for asset in data_assets
|
||
]) if data_assets else "无数据资产信息"
|
||
|
||
# 格式化企业信息
|
||
company_str = ""
|
||
if company_info:
|
||
company_str = f"""
|
||
## 企业信息
|
||
行业: {', '.join(company_info.get('industry', []))}
|
||
描述: {company_info.get('description', '')}
|
||
数据规模: {company_info.get('data_scale', '')}
|
||
数据来源: {', '.join(company_info.get('data_sources', []))}
|
||
"""
|
||
|
||
# 添加截图分析(如果有)
|
||
screenshot_str = ""
|
||
if screenshot_analysis:
|
||
screenshot_str = f"""
|
||
## 场景截图分析
|
||
{screenshot_analysis}
|
||
"""
|
||
|
||
prompt = f"""请基于以下信息分析存量场景并提供优化建议:
|
||
|
||
{company_str}
|
||
|
||
## 存量场景
|
||
{scenarios_info}
|
||
|
||
## 可用数据资产
|
||
{assets_info}
|
||
{screenshot_str}
|
||
|
||
## 输出要求
|
||
1. 分析每个场景的当前状态和不足
|
||
2. 提供具体的优化建议(至少 3 条)
|
||
3. 识别可提升的价值点
|
||
4. 建议必须具体、可执行
|
||
|
||
## 输出格式(JSON)
|
||
{{
|
||
"optimization_suggestions": [
|
||
{{
|
||
"scenario_name": "场景名称",
|
||
"current_status": "当前状态描述",
|
||
"suggestions": ["建议1", "建议2", "建议3"],
|
||
"potential_value": "潜在价值描述"
|
||
}}
|
||
]
|
||
}}
|
||
"""
|
||
return prompt
|
||
|
||
|
||
async def analyze_scenario_screenshots(screenshots: List[str]) -> str:
|
||
"""
|
||
使用视觉大模型分析场景截图
|
||
|
||
Args:
|
||
screenshots: 场景截图列表(Base64 编码的图片数据)
|
||
|
||
Returns:
|
||
截图分析结果
|
||
"""
|
||
if not screenshots:
|
||
return ""
|
||
|
||
try:
|
||
# 检查是否配置了视觉大模型
|
||
if not settings.VISION_MODEL:
|
||
logger.warning("未配置视觉大模型,跳过截图分析")
|
||
return ""
|
||
|
||
# 构建视觉大模型的提示词
|
||
vision_prompt = """请分析以下数据应用场景截图,重点关注:
|
||
|
||
1. **界面布局**: 界面设计是否合理、美观
|
||
2. **数据展示**: 数据展示是否清晰、直观
|
||
3. **交互体验**: 交互设计是否流畅、易用
|
||
4. **功能完整性**: 功能是否完整、实用
|
||
5. **用户体验**: 整体用户体验如何
|
||
|
||
请用中文详细描述你的分析结果,包括发现的不足和改进建议。"""
|
||
|
||
# 构建消息(包含图片)
|
||
messages = [
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "text", "text": vision_prompt}
|
||
]
|
||
}
|
||
]
|
||
|
||
# 添加图片到消息
|
||
for idx, screenshot in enumerate(screenshots):
|
||
# 验证 Base64 格式
|
||
if "," in screenshot:
|
||
# 移除 data URL 前缀(如 "data:image/png;base64,")
|
||
image_data = screenshot.split(",")[1]
|
||
else:
|
||
image_data = screenshot
|
||
|
||
# 添加图片
|
||
messages[0]["content"].append({
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": f"data:image/jpeg;base64,{image_data}"
|
||
}
|
||
})
|
||
|
||
# 调用视觉大模型
|
||
logger.info(f"调用视觉大模型分析 {len(screenshots)} 张截图")
|
||
|
||
# 使用 LLM 客户端调用视觉模型
|
||
# 注意:这里需要特殊处理,因为视觉模型需要传递图片
|
||
# 我们直接调用硅基流动 API,因为视觉模型部署在硅基流动
|
||
import httpx
|
||
import json
|
||
|
||
payload = {
|
||
"model": settings.VISION_MODEL,
|
||
"messages": messages,
|
||
"temperature": 0.3
|
||
}
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {settings.SILICONFLOW_API_KEY}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
response = await client.post(
|
||
settings.VISION_MODEL_BASE_URL,
|
||
headers=headers,
|
||
json=payload
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
|
||
# 解析响应
|
||
analysis_text = result["choices"][0]["message"]["content"]
|
||
logger.info(f"视觉大模型分析完成,结果长度: {len(analysis_text)} 字符")
|
||
|
||
return analysis_text
|
||
|
||
except Exception as e:
|
||
logger.error(f"视觉大模型分析截图失败: {str(e)}")
|
||
# 返回空字符串,不影响主流程
|
||
return ""
|
||
|
||
|
||
# ==================== 主要服务类 ====================
|
||
|
||
class ScenarioOptimizationService:
|
||
"""场景优化服务"""
|
||
|
||
@staticmethod
|
||
async def optimize(request: ScenarioOptimizationRequest) -> dict:
|
||
"""
|
||
优化存量场景
|
||
|
||
Args:
|
||
request: 场景优化请求,包含存量场景、数据资产、企业信息等
|
||
|
||
Returns:
|
||
优化建议结果
|
||
"""
|
||
start_time = time.time()
|
||
|
||
logger.info(
|
||
f"开始场景优化 - 存量场景数: {len(request.existing_scenarios)}, "
|
||
f"数据资产数: {len(request.data_assets) if request.data_assets else 0}, "
|
||
f"场景截图数: {len(request.scenario_screenshots) if request.scenario_screenshots else 0}"
|
||
)
|
||
|
||
try:
|
||
# 获取配置
|
||
model = settings.DEFAULT_LLM_MODEL
|
||
temperature = settings.DEFAULT_TEMPERATURE
|
||
|
||
logger.info(f"使用模型: {model}")
|
||
|
||
# 分析场景截图(如果有)
|
||
screenshot_analysis = None
|
||
if request.scenario_screenshots:
|
||
screenshot_analysis = await analyze_scenario_screenshots(request.scenario_screenshots)
|
||
if screenshot_analysis:
|
||
logger.info(f"截图分析完成,结果长度: {len(screenshot_analysis)} 字符")
|
||
|
||
# 构建提示词
|
||
prompt = build_scenario_optimization_prompt(
|
||
existing_scenarios=request.existing_scenarios,
|
||
data_assets=request.data_assets or [],
|
||
company_info=request.company_info,
|
||
screenshot_analysis=screenshot_analysis
|
||
)
|
||
|
||
logger.debug(f"提示词长度: {len(prompt)} 字符")
|
||
|
||
# 调用大模型
|
||
response_text = await llm_client.call(
|
||
prompt=prompt,
|
||
system_prompt=SYSTEM_PROMPT,
|
||
temperature=temperature,
|
||
model=model
|
||
)
|
||
|
||
# 解析结果
|
||
llm_result = llm_client.parse_json_response(response_text)
|
||
logger.info("大模型返回结果解析成功")
|
||
|
||
# 转换为标准格式
|
||
optimization_suggestions = []
|
||
suggestions_data = llm_result.get("optimization_suggestions", [])
|
||
|
||
for idx, suggestion_data in enumerate(suggestions_data):
|
||
suggestion = OptimizationSuggestion(
|
||
scenario_name=suggestion_data.get("scenario_name", ""),
|
||
current_status=suggestion_data.get("current_status", ""),
|
||
suggestions=suggestion_data.get("suggestions", []),
|
||
potential_value=suggestion_data.get("potential_value", "")
|
||
)
|
||
optimization_suggestions.append(suggestion)
|
||
|
||
# 计算生成时间
|
||
generation_time = time.time() - start_time
|
||
|
||
# 构建响应数据
|
||
response_data = {
|
||
"optimization_suggestions": [suggestion.dict() for suggestion in optimization_suggestions],
|
||
"generation_time": round(generation_time, 2),
|
||
"model_used": model
|
||
}
|
||
|
||
logger.info(
|
||
f"场景优化完成 - 建议数: {len(optimization_suggestions)}, "
|
||
f"耗时: {generation_time:.2f}秒"
|
||
)
|
||
|
||
return response_data
|
||
|
||
except Exception as e:
|
||
logger.exception(f"场景优化失败: {str(e)}")
|
||
raise LLMAPIException(
|
||
f"场景优化失败: {str(e)}",
|
||
error_detail=str(e),
|
||
retryable="Rate limit" in str(e) or "timeout" in str(e).lower()
|
||
)
|