Agent2Robot / src /core /orchestrator.py
sam133
Refactor: Restructure codebase with modular design patterns and fix orchestrator implementation
9529bc2
from typing import List, Dict, Any, Tuple
import imageio
import tempfile
from pathlib import Path
from ..llm.design_generator import LLMFactory
from ..simulation.physics_simulator import SimulationFactory
from .robot_design import RobotDesign
class DesignOrchestrator:
def __init__(self, llm_type: str = "local", simulator_type: str = "pybullet"):
self.llm = LLMFactory.create_llm(llm_type)
self.simulator = SimulationFactory.create_simulator(simulator_type)
def process_design_request(
self,
prompt: str,
vehicle_type: str = "wheeled",
max_iterations: int = 3
) -> Tuple[str, str, str, Dict[str, Any]]:
"""
Process a design request and return the results.
Returns: (design_json, process_log, gif_path, results)
"""
process_log = []
best_design = None
best_score = -1
best_results = None
best_frames = None
for iteration in range(max_iterations):
process_log.append(f"Iteration {iteration + 1}/{max_iterations}")
# Generate design using LLM
design = self.llm.generate_design(prompt)
process_log.append(f"Generated design: {design.to_json()}")
# Run simulation
self.simulator.setup_environment()
self.simulator.create_robot(design)
frames, results = self.simulator.run_simulation()
# Calculate score
score = self._calculate_score(results)
process_log.append(f"Simulation results: {results}")
process_log.append(f"Score: {score}")
# Update best design if better
if score > best_score:
best_score = score
best_design = design
best_results = results
best_frames = frames
# Early exit if perfect score
if score >= 1.0:
process_log.append("Perfect score achieved, stopping iterations")
break
# Generate GIF from best frames
gif_path = self._generate_gif_from_frames(best_frames)
return (
best_design.to_json(),
"\n".join(process_log),
gif_path,
best_results
)
def _calculate_score(self, results: Dict[str, Any]) -> float:
"""Calculate a score between 0 and 1 based on simulation results"""
if not results["success"]:
return 0.0
# Base score for success
score = 0.5
# Additional points for distance
distance = results["final_position"][0]
if distance > 0.8:
score += 0.3
elif distance > 0.6:
score += 0.2
elif distance > 0.4:
score += 0.1
# Additional points for stability
orientation = results["final_orientation"]
if abs(orientation[0]) < 0.1 and abs(orientation[1]) < 0.1:
score += 0.2
return min(score, 1.0)
def _generate_gif_from_frames(self, frames: List[bytes]) -> str:
"""Generate a GIF from simulation frames"""
if not frames:
return ""
# Create temporary directory for GIF
temp_dir = Path(tempfile.gettempdir()) / "robot_sim"
temp_dir.mkdir(exist_ok=True)
gif_path = str(temp_dir / "simulation.gif")
# Convert frames to images and save as GIF
images = []
for frame in frames:
img = imageio.imread(frame)
images.append(img)
imageio.mimsave(gif_path, images, fps=24)
return gif_path