Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """ | |
| Example demonstrating parallel execution capabilities. | |
| This script shows how to use the ParallelExecutor for: | |
| 1. Parallel tool execution | |
| 2. Parallel agent execution | |
| 3. Map-reduce operations | |
| 4. Performance monitoring | |
| """ | |
| import asyncio | |
| import sys | |
| import os | |
| # Add src to Python path | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) | |
| from src.application.executors.parallel_executor import ParallelExecutor, ParallelFSMReactAgent | |
| from src.infrastructure.monitoring.decorators import get_metrics_summary, reset_metrics | |
| async def demo_parallel_tool_execution(): | |
| """Demonstrate parallel tool execution""" | |
| print("\n=== Parallel Tool Execution Demo ===") | |
| # Create parallel executor | |
| executor = ParallelExecutor(max_workers=5) | |
| # Define mock tools that simulate real operations | |
| async def web_search(query: str) -> str: | |
| await asyncio.sleep(1) # Simulate API call | |
| return f"Search results for: {query}" | |
| async def calculate(expression: str) -> float: | |
| await asyncio.sleep(0.5) # Simulate calculation | |
| return eval(expression) # Note: unsafe in production | |
| async def analyze_text(text: str) -> dict: | |
| await asyncio.sleep(2) # Simulate analysis | |
| return { | |
| "length": len(text), | |
| "words": len(text.split()), | |
| "sentences": len(text.split('.')), | |
| "avg_word_length": sum(len(word) for word in text.split()) / len(text.split()) if text.split() else 0 | |
| } | |
| async def fetch_weather(city: str) -> dict: | |
| await asyncio.sleep(1.5) # Simulate API call | |
| return { | |
| "city": city, | |
| "temperature": 22.5, | |
| "condition": "sunny", | |
| "humidity": 65 | |
| } | |
| async def translate_text(text: str, target_language: str) -> str: | |
| await asyncio.sleep(1) # Simulate translation | |
| return f"Translated '{text}' to {target_language}" | |
| # Execute tools in parallel | |
| tools = [web_search, calculate, analyze_text, fetch_weather, translate_text] | |
| inputs = [ | |
| {"query": "parallel execution python"}, | |
| {"expression": "2 + 2 * 3"}, | |
| {"text": "This is a sample text for analysis. It contains multiple sentences."}, | |
| {"city": "New York"}, | |
| {"text": "Hello world", "target_language": "Spanish"} | |
| ] | |
| print("Executing 5 tools in parallel...") | |
| start_time = asyncio.get_event_loop().time() | |
| results = await executor.execute_tools_parallel(tools, inputs, timeout=10.0) | |
| end_time = asyncio.get_event_loop().time() | |
| total_time = end_time - start_time | |
| print(f"Completed in {total_time:.2f} seconds") | |
| print("Results:") | |
| for i, (success, result) in enumerate(results): | |
| tool_name = tools[i].__name__ | |
| if success: | |
| print(f" ✓ {tool_name}: {result}") | |
| else: | |
| print(f" ✗ {tool_name}: Error - {result}") | |
| # Cleanup | |
| executor.shutdown() | |
| async def demo_map_reduce(): | |
| """Demonstrate map-reduce operations""" | |
| print("\n=== Map-Reduce Demo ===") | |
| executor = ParallelExecutor(max_workers=8) | |
| # Define map and reduce functions | |
| async def process_number(num: int) -> int: | |
| await asyncio.sleep(0.1) # Simulate processing | |
| return num * num | |
| def sum_results(results: list) -> int: | |
| return sum(results) | |
| # Process a large dataset | |
| items = list(range(100)) | |
| print(f"Processing {len(items)} items with map-reduce...") | |
| start_time = asyncio.get_event_loop().time() | |
| final_result = await executor.map_reduce( | |
| process_number, sum_results, items, chunk_size=10 | |
| ) | |
| end_time = asyncio.get_event_loop().time() | |
| total_time = end_time - start_time | |
| print(f"Sum of squares: {final_result}") | |
| print(f"Completed in {total_time:.2f} seconds") | |
| # Cleanup | |
| executor.shutdown() | |
| async def demo_parallel_agent_execution(): | |
| """Demonstrate parallel agent execution""" | |
| print("\n=== Parallel Agent Execution Demo ===") | |
| executor = ParallelExecutor(max_workers=3) | |
| # Mock agents | |
| class MockAgent: | |
| def __init__(self, agent_id: str, name: str): | |
| self.agent_id = agent_id | |
| self.name = name | |
| async def execute(self, task: dict) -> dict: | |
| await asyncio.sleep(1) # Simulate agent processing | |
| return { | |
| "agent_id": self.agent_id, | |
| "agent_name": self.name, | |
| "task": task["description"], | |
| "result": f"Processed by {self.name}", | |
| "status": "completed" | |
| } | |
| # Create mock agents | |
| agents = [ | |
| MockAgent("agent_1", "Research Agent"), | |
| MockAgent("agent_2", "Analysis Agent"), | |
| MockAgent("agent_3", "Synthesis Agent") | |
| ] | |
| # Define tasks | |
| tasks = [ | |
| {"description": "Research market trends"}, | |
| {"description": "Analyze competitor data"}, | |
| {"description": "Synthesize findings"} | |
| ] | |
| print("Executing 3 agents in parallel...") | |
| start_time = asyncio.get_event_loop().time() | |
| results = await executor.execute_agents_parallel(agents, tasks, max_concurrent=2) | |
| end_time = asyncio.get_event_loop().time() | |
| total_time = end_time - start_time | |
| print(f"Completed in {total_time:.2f} seconds") | |
| print("Results:") | |
| for agent_id, result in results: | |
| if "error" not in result: | |
| print(f" ✓ {agent_id}: {result['result']}") | |
| else: | |
| print(f" ✗ {agent_id}: Error - {result['error']}") | |
| # Cleanup | |
| executor.shutdown() | |
| async def demo_performance_monitoring(): | |
| """Demonstrate performance monitoring""" | |
| print("\n=== Performance Monitoring Demo ===") | |
| # Reset metrics | |
| reset_metrics() | |
| # Run some operations to generate metrics | |
| executor = ParallelExecutor(max_workers=4) | |
| async def monitored_operation(name: str, duration: float): | |
| await asyncio.sleep(duration) | |
| return f"Operation {name} completed" | |
| # Execute multiple monitored operations | |
| operations = [ | |
| ("A", 0.5), | |
| ("B", 1.0), | |
| ("C", 0.3), | |
| ("D", 0.8) | |
| ] | |
| tasks = [monitored_operation(name, duration) for name, duration in operations] | |
| await asyncio.gather(*tasks) | |
| # Get metrics summary | |
| summary = get_metrics_summary() | |
| print("Performance Metrics Summary:") | |
| for key, value in summary.items(): | |
| if key != "timestamp": | |
| print(f" {key}: {value}") | |
| # Cleanup | |
| executor.shutdown() | |
| async def demo_parallel_fsm_agent(): | |
| """Demonstrate parallel FSM agent""" | |
| print("\n=== Parallel FSM Agent Demo ===") | |
| # Mock tools for the FSM agent | |
| class MockTool: | |
| def __init__(self, name: str, func): | |
| self.name = name | |
| self.func = func | |
| async def search_tool(query: str) -> str: | |
| await asyncio.sleep(1) | |
| return f"Search results for: {query}" | |
| async def calculate_tool(expression: str) -> float: | |
| await asyncio.sleep(0.5) | |
| return eval(expression) | |
| async def analyze_tool(text: str) -> dict: | |
| await asyncio.sleep(1.5) | |
| return {"word_count": len(text.split()), "char_count": len(text)} | |
| # Create tools | |
| tools = [ | |
| MockTool("search", search_tool), | |
| MockTool("calculate", calculate_tool), | |
| MockTool("analyze", analyze_tool) | |
| ] | |
| # Create parallel FSM agent | |
| agent = ParallelFSMReactAgent(tools, max_parallel_tools=3) | |
| # Define tool calls | |
| tool_calls = [ | |
| {"tool_name": "search", "arguments": {"query": "parallel processing"}}, | |
| {"tool_name": "calculate", "arguments": {"expression": "10 * 5 + 2"}}, | |
| {"tool_name": "analyze", "arguments": {"text": "This is a sample text for analysis."}} | |
| ] | |
| print("Executing tool calls in parallel with FSM agent...") | |
| start_time = asyncio.get_event_loop().time() | |
| results = await agent.execute_tools_parallel(tool_calls) | |
| end_time = asyncio.get_event_loop().time() | |
| total_time = end_time - start_time | |
| print(f"Completed in {total_time:.2f} seconds") | |
| print("Results:") | |
| for result in results: | |
| tool_name = result["tool_name"] | |
| if result["success"]: | |
| print(f" ✓ {tool_name}: {result['result']}") | |
| else: | |
| print(f" ✗ {tool_name}: Error - {result['error']}") | |
| async def main(): | |
| """Run all demos""" | |
| print("🚀 Parallel Execution Demo Suite") | |
| print("=" * 50) | |
| try: | |
| await demo_parallel_tool_execution() | |
| await demo_map_reduce() | |
| await demo_parallel_agent_execution() | |
| await demo_performance_monitoring() | |
| await demo_parallel_fsm_agent() | |
| print("\n✅ All demos completed successfully!") | |
| except Exception as e: | |
| print(f"\n❌ Demo failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |