Spaces:
Sleeping
Sleeping
| """ | |
| Complete Multi-Stage MCP Pipeline | |
| Orchestrates: Router β Executor β Compiler β Translator | |
| """ | |
| import time | |
| from typing import Dict, Any | |
| from openai import OpenAI | |
| from .router import QueryRouter | |
| from .executor import MCPExecutor, MCP_SERVER_REGISTRY | |
| from .compiler import ResponseCompiler | |
| from .translator import FarmerTranslator | |
| class FarmerChatPipeline: | |
| """Complete multi-stage MCP pipeline""" | |
| def __init__(self, openai_client: OpenAI, location: Dict[str, Any]): | |
| self.location = location | |
| self.router = QueryRouter(openai_client, MCP_SERVER_REGISTRY) | |
| self.executor = MCPExecutor() | |
| self.compiler = ResponseCompiler() | |
| self.translator = FarmerTranslator(openai_client) | |
| async def process_query(self, query: str, verbose: bool = False) -> Dict[str, Any]: | |
| """ | |
| Process farmer query through complete pipeline | |
| Returns: | |
| { | |
| "query": str, | |
| "routing": dict, | |
| "compiled_data": dict, | |
| "advice": str, | |
| "pipeline_time_seconds": float | |
| } | |
| """ | |
| if verbose: | |
| print(f"\nπΎ Processing: {query}") | |
| print(f"π Location: {self.location['name']}") | |
| pipeline_start = time.time() | |
| # STAGE 1: Query Routing | |
| if verbose: | |
| print("π― Stage 1: Routing...") | |
| routing = self.router.route(query, self.location) | |
| if verbose: | |
| print(f" β Servers: {', '.join(routing['required_servers'])}") | |
| # STAGE 2: MCP Execution (Parallel) | |
| if verbose: | |
| print("βοΈ Stage 2: Executing MCP servers...") | |
| raw_results = await self.executor.execute_parallel( | |
| routing['required_servers'], | |
| self.location['lat'], | |
| self.location['lon'] | |
| ) | |
| if verbose: | |
| print(f" β Completed in {raw_results['execution_time_seconds']}s") | |
| # STAGE 3: Response Compilation | |
| if verbose: | |
| print("π Stage 3: Compiling results...") | |
| compiled = self.compiler.compile(raw_results) | |
| if verbose: | |
| print(f" β {compiled['completeness']}") | |
| # STAGE 4: Farmer Translation | |
| if verbose: | |
| print("πΎ Stage 4: Generating advice...") | |
| farmer_advice = self.translator.translate(query, compiled, self.location) | |
| pipeline_time = time.time() - pipeline_start | |
| if verbose: | |
| print(f"β Complete! Total: {pipeline_time:.2f}s\n") | |
| return { | |
| "query": query, | |
| "routing": routing, | |
| "compiled_data": compiled, | |
| "advice": farmer_advice, | |
| "pipeline_time_seconds": round(pipeline_time, 2) | |
| } |