""" 场景推荐服务 """ import time from typing import List from app.schemas.value import ( ScenarioRecommendationRequest, ScenarioRecommendationResponse, CompanyInfo, DataAsset, ExistingScenario, RecommendedScenario, ) from app.utils.llm_client import llm_client from app.utils.logger import logger from app.core.config import settings from app.core.exceptions import LLMAPIException # ==================== 提示词模板 ==================== SYSTEM_PROMPT = """你是一位专业的数据应用场景规划专家,擅长基于企业背景、数据资产清单和存量场景,智能推荐潜在的数据应用场景。 ## 你的专业能力 - 深入理解各行业的数据应用场景和最佳实践 - 熟悉数据资产的价值评估和场景依赖分析 - 能够识别高价值的数据应用场景 - 具备优秀的场景规划和推荐能力 ## 输出要求 1. **准确性**:场景推荐必须基于实际的数据资产 2. **价值性**:推荐场景必须具有明确的商业价值 3. **可行性**:场景实施难度评估必须合理 4. **专业性**:使用专业术语,符合行业标准 5. **结构化**:严格按照JSON格式输出 """ def build_scenario_recommendation_prompt( company_info: CompanyInfo, data_assets: List[DataAsset], existing_scenarios: List[ExistingScenario], recommendation_count: int ) -> str: """构建场景推荐提示词""" # 格式化企业信息 industry_str = "、".join(company_info.industry) # 格式化数据资产 assets_info = "\n".join([ f"- {asset.name}:{asset.description}\n 核心表:{', '.join(asset.core_tables)}" for asset in data_assets ]) # 格式化存量场景 scenarios_info = "\n".join([ f"- {scenario.name}:{scenario.description}" for scenario in existing_scenarios ]) prompt = f"""请基于以下信息推荐潜在的数据应用场景: ## 企业信息 行业: {industry_str} 企业描述: {company_info.description} 数据规模: {company_info.data_scale} 数据来源: {', '.join(company_info.data_sources)} ## 可用数据资产 {assets_info} ## 存量场景(避免重复推荐) {scenarios_info} ## 推荐要求 1. 推荐 {recommendation_count} 个潜在数据应用场景 2. 场景分类:降本增效、营销增长、金融服务、决策支持、风险控制等 3. 推荐指数评分:1-5星(综合考虑业务价值、实施难度、数据准备度) 4. 分析场景依赖的数据资产 5. 评估商业价值和实施难度 6. 避免与存量场景重复 ## 输出格式(JSON) {{ "recommended_scenarios": [ {{ "id": 1, "name": "场景名称", "type": "场景分类", "recommendation_index": 5, "desc": "场景详细描述", "dependencies": ["依赖的资产1", "依赖的资产2"], "business_value": "商业价值描述", "implementation_difficulty": "实施难度(低/中/高)", "estimated_roi": "预估ROI(低/中/高)", "technical_requirements": ["技术要求1", "技术要求2"], "data_requirements": ["数据要求1", "数据要求2"] }} ] }} """ return prompt # ==================== 主要服务类 ==================== class ScenarioRecommendationService: """场景推荐服务""" @staticmethod async def recommend(request: ScenarioRecommendationRequest) -> dict: """ 推荐潜在场景 Args: request: 场景推荐请求 Returns: 推荐结果 """ start_time = time.time() logger.info( f"开始场景推荐 - 项目ID: {request.project_id}, " f"资产数: {len(request.data_assets)}, 存量场景数: {len(request.existing_scenarios)}" ) try: # 获取配置 model = request.options.model if request.options else settings.DEFAULT_LLM_MODEL temperature = settings.DEFAULT_TEMPERATURE count = request.options.recommendation_count if request.options else 10 exclude_types = request.options.exclude_types if request.options else [] logger.info(f"使用模型: {model}, 推荐数量: {count}") # 构建提示词 prompt = build_scenario_recommendation_prompt( company_info=request.company_info, data_assets=request.data_assets, existing_scenarios=request.existing_scenarios, recommendation_count=count ) logger.debug(f"提示词长度: {len(prompt)} 字符") # 调用大模型 response_text = await llm_client.call( prompt=prompt, system_prompt=SYSTEM_PROMPT, temperature=temperature, model=model ) # 解析结果 llm_result = llm_client.parse_json_response(response_text) logger.info("大模型返回结果解析成功") # 转换为标准格式 recommended_scenarios = [] scenarios_data = llm_result.get("recommended_scenarios", []) for idx, scenario_data in enumerate(scenarios_data): # 过滤排除的场景类型 if exclude_types and scenario_data.get("type") in exclude_types: continue scenario = RecommendedScenario( id=scenario_data.get("id", idx + 1), name=scenario_data.get("name", ""), type=scenario_data.get("type", ""), recommendation_index=scenario_data.get("recommendation_index", 3), desc=scenario_data.get("desc", ""), dependencies=scenario_data.get("dependencies", []), business_value=scenario_data.get("business_value", ""), implementation_difficulty=scenario_data.get("implementation_difficulty", "中等"), estimated_roi=scenario_data.get("estimated_roi", "中"), technical_requirements=scenario_data.get("technical_requirements", []), data_requirements=scenario_data.get("data_requirements", []) ) recommended_scenarios.append(scenario) # 计算生成时间 generation_time = time.time() - start_time # 构建响应数据 response_data = { "recommended_scenarios": [scenario.dict() for scenario in recommended_scenarios], "total_count": len(recommended_scenarios), "generation_time": round(generation_time, 2), "model_used": model } logger.info( f"场景推荐完成 - 推荐数: {len(recommended_scenarios)}, " f"耗时: {generation_time:.2f}秒" ) return response_data except Exception as e: logger.exception(f"场景推荐失败: {str(e)}") raise LLMAPIException( f"场景推荐失败: {str(e)}", error_detail=str(e), retryable="Rate limit" in str(e) or "timeout" in str(e).lower() )