finyx_data_ai/app/services/scenario_optimization_service.py
2026-01-11 07:48:19 +08:00

303 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
场景优化服务
"""
import time
import base64
from typing import List, Optional
from app.schemas.scenario_optimization import (
ScenarioOptimizationRequest,
ScenarioOptimizationResponse,
OptimizationSuggestion,
)
from app.utils.llm_client import llm_client
from app.utils.logger import logger
from app.core.config import settings
from app.core.exceptions import LLMAPIException
# ==================== 提示词模板 ====================
SYSTEM_PROMPT = """你是一位专业的数据应用场景优化专家,擅长分析现有数据应用场景的不足,并提供具体的优化建议。
## 你的专业能力
- 深入理解各行业的数据应用场景和最佳实践
- 熟悉场景优化和价值提升的方法
- 能够识别场景中的痛点和改进空间
- 具备优秀的场景分析和优化建议能力
## 输出要求
1. **准确性**:分析必须基于实际场景信息
2. **价值性**:优化建议必须具有明确的商业价值
3. **可操作性**:建议必须具体、可执行
4. **专业性**:使用专业术语,符合行业标准
5. **结构化**严格按照JSON格式输出
"""
def build_scenario_optimization_prompt(
existing_scenarios: List[dict],
data_assets: List[dict],
company_info: dict = None,
screenshot_analysis: Optional[str] = None
) -> str:
"""构建场景优化提示词"""
# 格式化存量场景
scenarios_info = "\n".join([
f"- {scenario.get('name', '')}{scenario.get('description', '')}"
for scenario in existing_scenarios
])
# 格式化数据资产
assets_info = "\n".join([
f"- {asset.get('name', '')}{asset.get('description', '')}\n 核心表:{', '.join(asset.get('core_tables', []))}"
for asset in data_assets
]) if data_assets else "无数据资产信息"
# 格式化企业信息
company_str = ""
if company_info:
company_str = f"""
## 企业信息
行业: {', '.join(company_info.get('industry', []))}
描述: {company_info.get('description', '')}
数据规模: {company_info.get('data_scale', '')}
数据来源: {', '.join(company_info.get('data_sources', []))}
"""
# 添加截图分析(如果有)
screenshot_str = ""
if screenshot_analysis:
screenshot_str = f"""
## 场景截图分析
{screenshot_analysis}
"""
prompt = f"""请基于以下信息分析存量场景并提供优化建议:
{company_str}
## 存量场景
{scenarios_info}
## 可用数据资产
{assets_info}
{screenshot_str}
## 输出要求
1. 分析每个场景的当前状态和不足
2. 提供具体的优化建议(至少 3 条)
3. 识别可提升的价值点
4. 建议必须具体、可执行
## 输出格式JSON
{{
"optimization_suggestions": [
{{
"scenario_name": "场景名称",
"current_status": "当前状态描述",
"suggestions": ["建议1", "建议2", "建议3"],
"potential_value": "潜在价值描述"
}}
]
}}
"""
return prompt
async def analyze_scenario_screenshots(screenshots: List[str]) -> str:
"""
使用视觉大模型分析场景截图
Args:
screenshots: 场景截图列表Base64 编码的图片数据)
Returns:
截图分析结果
"""
if not screenshots:
return ""
try:
# 检查是否配置了视觉大模型
if not settings.VISION_MODEL:
logger.warning("未配置视觉大模型,跳过截图分析")
return ""
# 构建视觉大模型的提示词
vision_prompt = """请分析以下数据应用场景截图,重点关注:
1. **界面布局**: 界面设计是否合理、美观
2. **数据展示**: 数据展示是否清晰、直观
3. **交互体验**: 交互设计是否流畅、易用
4. **功能完整性**: 功能是否完整、实用
5. **用户体验**: 整体用户体验如何
请用中文详细描述你的分析结果,包括发现的不足和改进建议。"""
# 构建消息(包含图片)
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": vision_prompt}
]
}
]
# 添加图片到消息
for idx, screenshot in enumerate(screenshots):
# 验证 Base64 格式
if "," in screenshot:
# 移除 data URL 前缀(如 "data:image/png;base64,"
image_data = screenshot.split(",")[1]
else:
image_data = screenshot
# 添加图片
messages[0]["content"].append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
}
})
# 调用视觉大模型
logger.info(f"调用视觉大模型分析 {len(screenshots)} 张截图")
# 使用 LLM 客户端调用视觉模型
# 注意:这里需要特殊处理,因为视觉模型需要传递图片
# 我们直接调用硅基流动 API因为视觉模型部署在硅基流动
import httpx
import json
payload = {
"model": settings.VISION_MODEL,
"messages": messages,
"temperature": 0.3
}
headers = {
"Authorization": f"Bearer {settings.SILICONFLOW_API_KEY}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
settings.VISION_MODEL_BASE_URL,
headers=headers,
json=payload
)
response.raise_for_status()
result = response.json()
# 解析响应
analysis_text = result["choices"][0]["message"]["content"]
logger.info(f"视觉大模型分析完成,结果长度: {len(analysis_text)} 字符")
return analysis_text
except Exception as e:
logger.error(f"视觉大模型分析截图失败: {str(e)}")
# 返回空字符串,不影响主流程
return ""
# ==================== 主要服务类 ====================
class ScenarioOptimizationService:
"""场景优化服务"""
@staticmethod
async def optimize(request: ScenarioOptimizationRequest) -> dict:
"""
优化存量场景
Args:
request: 场景优化请求,包含存量场景、数据资产、企业信息等
Returns:
优化建议结果
"""
start_time = time.time()
logger.info(
f"开始场景优化 - 存量场景数: {len(request.existing_scenarios)}, "
f"数据资产数: {len(request.data_assets) if request.data_assets else 0}, "
f"场景截图数: {len(request.scenario_screenshots) if request.scenario_screenshots else 0}"
)
try:
# 获取配置
model = settings.DEFAULT_LLM_MODEL
temperature = settings.DEFAULT_TEMPERATURE
logger.info(f"使用模型: {model}")
# 分析场景截图(如果有)
screenshot_analysis = None
if request.scenario_screenshots:
screenshot_analysis = await analyze_scenario_screenshots(request.scenario_screenshots)
if screenshot_analysis:
logger.info(f"截图分析完成,结果长度: {len(screenshot_analysis)} 字符")
# 构建提示词
prompt = build_scenario_optimization_prompt(
existing_scenarios=request.existing_scenarios,
data_assets=request.data_assets or [],
company_info=request.company_info,
screenshot_analysis=screenshot_analysis
)
logger.debug(f"提示词长度: {len(prompt)} 字符")
# 调用大模型
response_text = await llm_client.call(
prompt=prompt,
system_prompt=SYSTEM_PROMPT,
temperature=temperature,
model=model
)
# 解析结果
llm_result = llm_client.parse_json_response(response_text)
logger.info("大模型返回结果解析成功")
# 转换为标准格式
optimization_suggestions = []
suggestions_data = llm_result.get("optimization_suggestions", [])
for idx, suggestion_data in enumerate(suggestions_data):
suggestion = OptimizationSuggestion(
scenario_name=suggestion_data.get("scenario_name", ""),
current_status=suggestion_data.get("current_status", ""),
suggestions=suggestion_data.get("suggestions", []),
potential_value=suggestion_data.get("potential_value", "")
)
optimization_suggestions.append(suggestion)
# 计算生成时间
generation_time = time.time() - start_time
# 构建响应数据
response_data = {
"optimization_suggestions": [suggestion.dict() for suggestion in optimization_suggestions],
"generation_time": round(generation_time, 2),
"model_used": model
}
logger.info(
f"场景优化完成 - 建议数: {len(optimization_suggestions)}, "
f"耗时: {generation_time:.2f}"
)
return response_data
except Exception as e:
logger.exception(f"场景优化失败: {str(e)}")
raise LLMAPIException(
f"场景优化失败: {str(e)}",
error_detail=str(e),
retryable="Rate limit" in str(e) or "timeout" in str(e).lower()
)