Spaces:
No application file
No application file
sam133
Refactor: Restructure codebase with modular design patterns and fix orchestrator implementation
9529bc2
| from typing import List, Dict, Any, Tuple | |
| import imageio | |
| import tempfile | |
| from pathlib import Path | |
| from ..llm.design_generator import LLMFactory | |
| from ..simulation.physics_simulator import SimulationFactory | |
| from .robot_design import RobotDesign | |
| class DesignOrchestrator: | |
| def __init__(self, llm_type: str = "local", simulator_type: str = "pybullet"): | |
| self.llm = LLMFactory.create_llm(llm_type) | |
| self.simulator = SimulationFactory.create_simulator(simulator_type) | |
| def process_design_request( | |
| self, | |
| prompt: str, | |
| vehicle_type: str = "wheeled", | |
| max_iterations: int = 3 | |
| ) -> Tuple[str, str, str, Dict[str, Any]]: | |
| """ | |
| Process a design request and return the results. | |
| Returns: (design_json, process_log, gif_path, results) | |
| """ | |
| process_log = [] | |
| best_design = None | |
| best_score = -1 | |
| best_results = None | |
| best_frames = None | |
| for iteration in range(max_iterations): | |
| process_log.append(f"Iteration {iteration + 1}/{max_iterations}") | |
| # Generate design using LLM | |
| design = self.llm.generate_design(prompt) | |
| process_log.append(f"Generated design: {design.to_json()}") | |
| # Run simulation | |
| self.simulator.setup_environment() | |
| self.simulator.create_robot(design) | |
| frames, results = self.simulator.run_simulation() | |
| # Calculate score | |
| score = self._calculate_score(results) | |
| process_log.append(f"Simulation results: {results}") | |
| process_log.append(f"Score: {score}") | |
| # Update best design if better | |
| if score > best_score: | |
| best_score = score | |
| best_design = design | |
| best_results = results | |
| best_frames = frames | |
| # Early exit if perfect score | |
| if score >= 1.0: | |
| process_log.append("Perfect score achieved, stopping iterations") | |
| break | |
| # Generate GIF from best frames | |
| gif_path = self._generate_gif_from_frames(best_frames) | |
| return ( | |
| best_design.to_json(), | |
| "\n".join(process_log), | |
| gif_path, | |
| best_results | |
| ) | |
| def _calculate_score(self, results: Dict[str, Any]) -> float: | |
| """Calculate a score between 0 and 1 based on simulation results""" | |
| if not results["success"]: | |
| return 0.0 | |
| # Base score for success | |
| score = 0.5 | |
| # Additional points for distance | |
| distance = results["final_position"][0] | |
| if distance > 0.8: | |
| score += 0.3 | |
| elif distance > 0.6: | |
| score += 0.2 | |
| elif distance > 0.4: | |
| score += 0.1 | |
| # Additional points for stability | |
| orientation = results["final_orientation"] | |
| if abs(orientation[0]) < 0.1 and abs(orientation[1]) < 0.1: | |
| score += 0.2 | |
| return min(score, 1.0) | |
| def _generate_gif_from_frames(self, frames: List[bytes]) -> str: | |
| """Generate a GIF from simulation frames""" | |
| if not frames: | |
| return "" | |
| # Create temporary directory for GIF | |
| temp_dir = Path(tempfile.gettempdir()) / "robot_sim" | |
| temp_dir.mkdir(exist_ok=True) | |
| gif_path = str(temp_dir / "simulation.gif") | |
| # Convert frames to images and save as GIF | |
| images = [] | |
| for frame in frames: | |
| img = imageio.imread(frame) | |
| images.append(img) | |
| imageio.mimsave(gif_path, images, fps=24) | |
| return gif_path |