""" 场景优化服务 """ import time import base64 from typing import List, Optional from app.schemas.scenario_optimization import ( ScenarioOptimizationRequest, ScenarioOptimizationResponse, OptimizationSuggestion, ) from app.utils.llm_client import llm_client from app.utils.logger import logger from app.core.config import settings from app.core.exceptions import LLMAPIException # ==================== 提示词模板 ==================== SYSTEM_PROMPT = """你是一位专业的数据应用场景优化专家,擅长分析现有数据应用场景的不足,并提供具体的优化建议。 ## 你的专业能力 - 深入理解各行业的数据应用场景和最佳实践 - 熟悉场景优化和价值提升的方法 - 能够识别场景中的痛点和改进空间 - 具备优秀的场景分析和优化建议能力 ## 输出要求 1. **准确性**:分析必须基于实际场景信息 2. **价值性**:优化建议必须具有明确的商业价值 3. **可操作性**:建议必须具体、可执行 4. **专业性**:使用专业术语,符合行业标准 5. **结构化**:严格按照JSON格式输出 """ def build_scenario_optimization_prompt( existing_scenarios: List[dict], data_assets: List[dict], company_info: dict = None, screenshot_analysis: Optional[str] = None ) -> str: """构建场景优化提示词""" # 格式化存量场景 scenarios_info = "\n".join([ f"- {scenario.get('name', '')}:{scenario.get('description', '')}" for scenario in existing_scenarios ]) # 格式化数据资产 assets_info = "\n".join([ f"- {asset.get('name', '')}:{asset.get('description', '')}\n 核心表:{', '.join(asset.get('core_tables', []))}" for asset in data_assets ]) if data_assets else "无数据资产信息" # 格式化企业信息 company_str = "" if company_info: company_str = f""" ## 企业信息 行业: {', '.join(company_info.get('industry', []))} 描述: {company_info.get('description', '')} 数据规模: {company_info.get('data_scale', '')} 数据来源: {', '.join(company_info.get('data_sources', []))} """ # 添加截图分析(如果有) screenshot_str = "" if screenshot_analysis: screenshot_str = f""" ## 场景截图分析 {screenshot_analysis} """ prompt = f"""请基于以下信息分析存量场景并提供优化建议: {company_str} ## 存量场景 {scenarios_info} ## 可用数据资产 {assets_info} {screenshot_str} ## 输出要求 1. 分析每个场景的当前状态和不足 2. 提供具体的优化建议(至少 3 条) 3. 识别可提升的价值点 4. 建议必须具体、可执行 ## 输出格式(JSON) {{ "optimization_suggestions": [ {{ "scenario_name": "场景名称", "current_status": "当前状态描述", "suggestions": ["建议1", "建议2", "建议3"], "potential_value": "潜在价值描述" }} ] }} """ return prompt async def analyze_scenario_screenshots(screenshots: List[str]) -> str: """ 使用视觉大模型分析场景截图 Args: screenshots: 场景截图列表(Base64 编码的图片数据) Returns: 截图分析结果 """ if not screenshots: return "" try: # 检查是否配置了视觉大模型 if not settings.VISION_MODEL: logger.warning("未配置视觉大模型,跳过截图分析") return "" # 构建视觉大模型的提示词 vision_prompt = """请分析以下数据应用场景截图,重点关注: 1. **界面布局**: 界面设计是否合理、美观 2. **数据展示**: 数据展示是否清晰、直观 3. **交互体验**: 交互设计是否流畅、易用 4. **功能完整性**: 功能是否完整、实用 5. **用户体验**: 整体用户体验如何 请用中文详细描述你的分析结果,包括发现的不足和改进建议。""" # 构建消息(包含图片) messages = [ { "role": "user", "content": [ {"type": "text", "text": vision_prompt} ] } ] # 添加图片到消息 for idx, screenshot in enumerate(screenshots): # 验证 Base64 格式 if "," in screenshot: # 移除 data URL 前缀(如 "data:image/png;base64,") image_data = screenshot.split(",")[1] else: image_data = screenshot # 添加图片 messages[0]["content"].append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}" } }) # 调用视觉大模型 logger.info(f"调用视觉大模型分析 {len(screenshots)} 张截图") # 使用 LLM 客户端调用视觉模型 # 注意:这里需要特殊处理,因为视觉模型需要传递图片 # 我们直接调用硅基流动 API,因为视觉模型部署在硅基流动 import httpx import json payload = { "model": settings.VISION_MODEL, "messages": messages, "temperature": 0.3 } headers = { "Authorization": f"Bearer {settings.SILICONFLOW_API_KEY}", "Content-Type": "application/json" } async with httpx.AsyncClient(timeout=60) as client: response = await client.post( settings.VISION_MODEL_BASE_URL, headers=headers, json=payload ) response.raise_for_status() result = response.json() # 解析响应 analysis_text = result["choices"][0]["message"]["content"] logger.info(f"视觉大模型分析完成,结果长度: {len(analysis_text)} 字符") return analysis_text except Exception as e: logger.error(f"视觉大模型分析截图失败: {str(e)}") # 返回空字符串,不影响主流程 return "" # ==================== 主要服务类 ==================== class ScenarioOptimizationService: """场景优化服务""" @staticmethod async def optimize(request: ScenarioOptimizationRequest) -> dict: """ 优化存量场景 Args: request: 场景优化请求,包含存量场景、数据资产、企业信息等 Returns: 优化建议结果 """ start_time = time.time() logger.info( f"开始场景优化 - 存量场景数: {len(request.existing_scenarios)}, " f"数据资产数: {len(request.data_assets) if request.data_assets else 0}, " f"场景截图数: {len(request.scenario_screenshots) if request.scenario_screenshots else 0}" ) try: # 获取配置 model = settings.DEFAULT_LLM_MODEL temperature = settings.DEFAULT_TEMPERATURE logger.info(f"使用模型: {model}") # 分析场景截图(如果有) screenshot_analysis = None if request.scenario_screenshots: screenshot_analysis = await analyze_scenario_screenshots(request.scenario_screenshots) if screenshot_analysis: logger.info(f"截图分析完成,结果长度: {len(screenshot_analysis)} 字符") # 构建提示词 prompt = build_scenario_optimization_prompt( existing_scenarios=request.existing_scenarios, data_assets=request.data_assets or [], company_info=request.company_info, screenshot_analysis=screenshot_analysis ) logger.debug(f"提示词长度: {len(prompt)} 字符") # 调用大模型 response_text = await llm_client.call( prompt=prompt, system_prompt=SYSTEM_PROMPT, temperature=temperature, model=model ) # 解析结果 llm_result = llm_client.parse_json_response(response_text) logger.info("大模型返回结果解析成功") # 转换为标准格式 optimization_suggestions = [] suggestions_data = llm_result.get("optimization_suggestions", []) for idx, suggestion_data in enumerate(suggestions_data): suggestion = OptimizationSuggestion( scenario_name=suggestion_data.get("scenario_name", ""), current_status=suggestion_data.get("current_status", ""), suggestions=suggestion_data.get("suggestions", []), potential_value=suggestion_data.get("potential_value", "") ) optimization_suggestions.append(suggestion) # 计算生成时间 generation_time = time.time() - start_time # 构建响应数据 response_data = { "optimization_suggestions": [suggestion.dict() for suggestion in optimization_suggestions], "generation_time": round(generation_time, 2), "model_used": model } logger.info( f"场景优化完成 - 建议数: {len(optimization_suggestions)}, " f"耗时: {generation_time:.2f}秒" ) return response_data except Exception as e: logger.exception(f"场景优化失败: {str(e)}") raise LLMAPIException( f"场景优化失败: {str(e)}", error_detail=str(e), retryable="Rate limit" in str(e) or "timeout" in str(e).lower() )