project_naka / code /cap_agent.py
ilessio-aiflowlab's picture
Upload folder using huggingface_hub
665e529 verified
"""CaPAgent — the core code-as-policy agent.
Generates Python robot control code, executes in sandbox, iterates
with multi-turn feedback. Orchestrates VDM, ensemble, and skill library.
"""
from __future__ import annotations
import argparse
import logging
import time
from pathlib import Path
from typing import Any
import numpy as np
from anima_naka.agent.ensemble import EnsembleReasoner
from anima_naka.agent.llm_client import LLMClient
from anima_naka.agent.prompts import build_initial_prompt, build_multiturn_prompt
from anima_naka.agent.vdm import VisualDifferencingModule
from anima_naka.config import NakaConfig
from anima_naka.constants import SINGLE_TURN_TIERS
from anima_naka.executor.code_parser import CodeParser
from anima_naka.skills.library import SkillLibrary
from anima_naka.types import TrialResult
logger = logging.getLogger("anima_naka.agent")
class CaPAgent:
"""Code-as-Policy agent — NAKA's heart.
Takes task description + API docs -> generates Python code -> executes -> iterates.
"""
def __init__(self, config: NakaConfig):
self.config = config
self.llm = LLMClient(model=config.agent_model, temperature=config.agent_temperature)
self.vdm: VisualDifferencingModule | None = None
if config.use_visual_differencing:
self.vdm = VisualDifferencingModule(self.llm)
self.skill_library: SkillLibrary | None = None
if config.use_skill_library:
self.skill_library = SkillLibrary(config.skill_library_path)
self.ensemble: EnsembleReasoner | None = None
if config.use_parallel_ensemble:
self.ensemble = EnsembleReasoner()
def run_trial(self, env: Any) -> TrialResult:
"""Execute a complete trial: reset -> code gen -> execute -> multi-turn -> result."""
start_time = time.perf_counter()
obs, info = env.reset()
skill_code = ""
if self.skill_library:
skill_code = self.skill_library.get_prompt_injection()
messages = build_initial_prompt(
task_description=info["task_description"],
api_documentation=info["api_documentation"],
skill_library_code=skill_code,
)
if self.vdm and "robot0_robotview" in obs:
rgb = self._get_rgb(obs)
if rgb is not None:
scene_desc = self.vdm.describe_scene(rgb, info["task_description"])
messages[-1]["content"] += f"\n\nCurrent scene observation:\n{scene_desc}"
response = self._generate(messages)
code_blocks: list[str] = []
execution_results = []
prev_rgb = self._get_rgb(obs)
reward = 0.0
for _ in range(self.config.multiturn_limit):
code_blocks_raw = CodeParser.extract_code_blocks(response)
if not code_blocks_raw:
break
code = code_blocks_raw[0]
code_blocks.append(code)
obs, reward, terminated, truncated, exec_info = env.step(code)
execution_results.append(exec_info["execution_result"])
if terminated or reward >= 1.0:
break
if truncated:
break
if self.config.tier in SINGLE_TURN_TIERS:
break
visual_diff = None
if self.vdm and prev_rgb is not None:
curr_rgb = self._get_rgb(obs)
if curr_rgb is not None:
visual_diff = self.vdm.describe_changes(
prev_rgb,
curr_rgb,
info["task_description"],
)
decision_messages = build_multiturn_prompt(
executed_code=code,
stdout=exec_info["stdout"],
stderr=exec_info["stderr"],
visual_diff=visual_diff,
reward=reward,
task_completed=exec_info.get("task_completed", False),
)
decision_response = self._generate(decision_messages)
decision, new_code = CodeParser.parse_decision(decision_response)
if decision == "finish" or new_code is None:
break
response = new_code
prev_rgb = self._get_rgb(obs)
if reward >= 1.0 and self.skill_library:
self.skill_library.extract_and_add(code_blocks, info.get("task_description", ""))
duration = time.perf_counter() - start_time
return TrialResult(
trial_id=0,
reward=reward,
task_completed=reward >= 1.0,
turns=len(code_blocks),
code_blocks=code_blocks,
execution_results=execution_results,
duration_s=duration,
)
def _generate(self, messages: list[dict[str, str]]) -> str:
"""Generate code using LLM (single or ensemble)."""
if self.ensemble:
return self.ensemble.generate(messages)
return self.llm.query(messages)
@staticmethod
def _get_rgb(obs: dict) -> np.ndarray | None:
"""Extract RGB image from observation dict."""
try:
return obs["robot0_robotview"]["images"]["rgb"]
except (KeyError, TypeError):
return None
def _normalize_simulator(value: str | None) -> str:
if value is None:
return "mock"
normalized = value.strip().lower()
if normalized in {"real", "robosuite"}:
return "robosuite"
if normalized in {"mock", "none", "sim", ""}:
return "mock"
raise ValueError(f"Unsupported simulator '{value}'. Use mock or robosuite.")
def main() -> None:
"""Entrypoint for module execution.
Performs a smoke-load check and prints resolved configuration.
"""
parser = argparse.ArgumentParser(description="Run CaP-Agent smoke check")
parser.add_argument("--config", type=str, default="configs/debug.toml")
parser.add_argument("--sim", type=_normalize_simulator, default=None)
args = parser.parse_args()
cfg_path = Path(args.config)
config = NakaConfig.from_toml(cfg_path) if cfg_path.exists() else NakaConfig()
if args.sim is not None:
config = config.model_copy(update={"simulator": args.sim})
print(
f"CaP-Agent initialized | config={args.config} | "
f"simulator={config.simulator} | task={config.task} | tier={config.tier}"
)
if __name__ == "__main__":
main()