Spaces:
Sleeping
Sleeping
fix(deploy): resolve multi-mode deployment validator issues with uv, openenv requirements, and standard main() loop
e40cf69 | # server/app.py | |
| """ | |
| FastAPI server β v4.0 | |
| Core endpoints: POST /reset, POST /step, GET /state, GET /health | |
| Evaluation endpoints: GET /trajectory, GET /evaluate, GET /metrics | |
| Control endpoints: POST /fault-config | |
| Intelligence (v3): GET /classify, GET /strategy, GET /advanced-metrics, | |
| POST /compare-agents, GET /improvement-plan, GET /viz-data | |
| Research (v4 NEW): GET /causal-probe, GET /counterfactual, GET /confidence, | |
| POST /benchmark, GET /analytics | |
| """ | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.staticfiles import StaticFiles | |
| from contextlib import asynccontextmanager | |
| import os | |
| from .environment import CodebaseNavEnvironment | |
| from .models import ( | |
| RepoAction, StepResult, ResetResult, StateResult, | |
| TrajectoryResponse, EvaluationResponse, MetricsResponse, | |
| FaultConfigRequest, | |
| ) | |
| from .failure_classifier import FailureClassifier | |
| from .strategy_detector import StrategyDetector | |
| from .advanced_metrics import AdvancedMetricsEngine | |
| from .self_improvement import SelfImprovementEngine | |
| from .multi_agent import MultiAgentComparison | |
| # Global instances | |
| env = CodebaseNavEnvironment() | |
| failure_clf = FailureClassifier() | |
| strategy_det = StrategyDetector() | |
| adv_metrics = AdvancedMetricsEngine() | |
| improvement = SelfImprovementEngine() | |
| multi_agent = MultiAgentComparison() | |
| async def lifespan(app: FastAPI): | |
| yield | |
| env.close() | |
| app = FastAPI( | |
| title="Codebase Navigation & Repair β OpenEnv v3", | |
| description=( | |
| "RL environment for AI coding agents β extended with process-based evaluation, " | |
| "failure classification, strategy detection, self-improvement loops, " | |
| "multi-agent comparison, 3D visualization, and advanced metrics." | |
| ), | |
| version="3.0.0", | |
| lifespan=lifespan, | |
| ) | |
| # Serve static files (3D visualizer HTML) | |
| _static_dir = os.path.join(os.path.dirname(__file__), "..", "static") | |
| if os.path.exists(_static_dir): | |
| app.mount("/static", StaticFiles(directory=_static_dir), name="static") | |
| # ββ Core OpenEnv Endpoints ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def reset(task: str = "task1"): | |
| valid_tasks = ["task1", "task2", "task3"] | |
| if task not in valid_tasks: | |
| raise HTTPException(status_code=400, detail=f"task must be one of {valid_tasks}") | |
| try: | |
| return env.reset(task=task) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def step(action: RepoAction): | |
| if env.done: | |
| raise HTTPException(status_code=400, detail="Episode is done. POST /reset to start.") | |
| try: | |
| return env.step(action) | |
| except RuntimeError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def state(): | |
| return StateResult( | |
| observation=env.get_state(), | |
| current_score=env.final_score, | |
| total_steps_taken=env.steps_taken, | |
| ) | |
| async def health(): | |
| return {"status": "ok", "environment": "codebase-nav-env", "version": "3.0.0"} | |
| # ββ Evaluation Endpoints ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_trajectory(): | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return TrajectoryResponse() | |
| return TrajectoryResponse(**traj) | |
| async def get_evaluation(): | |
| evaluation = env.get_evaluation() | |
| if "error" in evaluation: | |
| return EvaluationResponse() | |
| return EvaluationResponse(**evaluation) | |
| async def get_metrics(): | |
| return MetricsResponse(**env.get_metrics()) | |
| async def set_fault_config(config: FaultConfigRequest): | |
| env.set_fault_config(config.level) | |
| return { | |
| "status": "ok", | |
| "fault_level": config.level, | |
| "message": f"Fault injection set to '{config.level}'. Takes effect on next /reset.", | |
| } | |
| # ββ Intelligence Endpoints (NEW in v3) ββββββββββββββββββββββββββββββββββββββββ | |
| async def classify_failure(): | |
| """ | |
| Classify the failure type of the current/latest episode. | |
| Returns typed failure taxonomy with root cause and remediation. | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available. Run an episode first."} | |
| steps = traj.get("steps", []) | |
| meta = env.variant.meta if env.variant else {} | |
| report = failure_clf.classify( | |
| episode_id=traj.get("episode_id", ""), | |
| task=env.current_task or "unknown", | |
| trajectory_steps=steps, | |
| variant_meta=meta, | |
| files_read=list(env.files_read), | |
| files_written=list(env.files_written), | |
| final_score=env.final_score, | |
| security_violations=env.security_violations, | |
| ) | |
| return report.to_dict() | |
| async def detect_strategy(): | |
| """ | |
| Detect the behavioral strategy pattern used by the agent. | |
| Returns: TARGETED_DEBUGGING | SYSTEMATIC_SEARCH | BRUTE_FORCE | | |
| RANDOM_EXPLORATION | SPEC_DRIVEN | MINIMAL_EFFORT | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| steps = traj.get("steps", []) | |
| meta = env.variant.meta if env.variant else {} | |
| report = strategy_det.detect( | |
| trajectory_steps=steps, | |
| task=env.current_task or "unknown", | |
| variant_meta=meta, | |
| files_read=list(env.files_read), | |
| final_score=env.final_score, | |
| ) | |
| return report.to_dict() | |
| async def get_advanced_metrics(): | |
| """ | |
| Compute advanced metrics: reasoning efficiency, decision entropy, | |
| exploration ratio, reliability index, consistency, pivot rate. | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| steps = traj.get("steps", []) | |
| meta = env.variant.meta if env.variant else {} | |
| report = adv_metrics.compute( | |
| trajectory_steps=steps, | |
| variant_meta=meta, | |
| final_score=env.final_score, | |
| files_read=list(env.files_read), | |
| files_written=list(env.files_written), | |
| ) | |
| return report.to_dict() | |
| async def get_improvement_plan(): | |
| """ | |
| Generate a self-improvement plan based on failure classification. | |
| Returns: what_went_wrong, improved_strategy, step-by-step plan, | |
| system_prompt_addon (for injecting into next agent run). | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| steps = traj.get("steps", []) | |
| meta = env.variant.meta if env.variant else {} | |
| # Classify first | |
| fail_report = failure_clf.classify( | |
| episode_id=traj.get("episode_id", ""), | |
| task=env.current_task or "unknown", | |
| trajectory_steps=steps, | |
| variant_meta=meta, | |
| files_read=list(env.files_read), | |
| files_written=list(env.files_written), | |
| final_score=env.final_score, | |
| security_violations=env.security_violations, | |
| ) | |
| plan = improvement.generate_improvement_plan( | |
| episode_id=traj.get("episode_id", ""), | |
| task=env.current_task or "unknown", | |
| failure_type=fail_report.primary_failure, | |
| failure_evidence=[f.evidence for f in fail_report.failures], | |
| original_score=env.final_score, | |
| trajectory_steps=steps, | |
| files_read=list(env.files_read), | |
| files_written=list(env.files_written), | |
| ) | |
| return plan.to_dict() | |
| async def compare_agents(task: str = "task1", agents: str = "all"): | |
| """ | |
| Run multiple agent strategies on the same task and compare side-by-side. | |
| agents: "all" | comma-separated list of: test-first,search-first,minimal,exhaustive | |
| """ | |
| valid_tasks = ["task1", "task2", "task3"] | |
| if task not in valid_tasks: | |
| raise HTTPException(status_code=400, detail=f"task must be one of {valid_tasks}") | |
| if agents == "all": | |
| agent_list = None | |
| else: | |
| agent_list = [a.strip() for a in agents.split(",")] | |
| try: | |
| report = multi_agent.compare(env, task=task, agents=agent_list) | |
| return report.to_dict() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_viz_data(): | |
| """ | |
| Get structured 3D visualization data for the current/latest episode. | |
| Returns nodes (files), edges (dependencies), and step trajectory | |
| in the format expected by the Three.js visualizer. | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| # Build file nodes | |
| files = [] | |
| visited = set(env.files_read) | |
| modified = set(env.files_written) | |
| meta = env.variant.meta if env.variant else {} | |
| bug_files = set(meta.get("bug_files", [])) | |
| if env.variant: | |
| tree = env.variant.get_tree() | |
| for f in tree: | |
| ftype = "test" if f.startswith("tests/") else \ | |
| "spec" if f.endswith(".md") else "src" | |
| files.append({ | |
| "name": f, | |
| "type": ftype, | |
| "is_bug_file": f in bug_files, | |
| "visited": f in visited, | |
| "modified": f in modified, | |
| }) | |
| # Build dependency edges from known patterns | |
| deps = [] | |
| test_files = [f["name"] for f in files if f["type"] == "test"] | |
| src_files = [f["name"] for f in files if f["type"] == "src"] | |
| # Simple heuristic: connect tests to src files | |
| for tf in test_files: | |
| for sf in src_files: | |
| deps.append({"from": tf, "to": sf}) | |
| # Build step data for trajectory | |
| steps_data = [] | |
| for step in traj.get("steps", []): | |
| steps_data.append({ | |
| "step": step.get("step_number", 0), | |
| "action": step.get("action_type", ""), | |
| "path": step.get("action_path"), | |
| "reward": step.get("reward", 0.0), | |
| "error": step.get("error"), | |
| "pass_rate": step.get("test_pass_rate"), | |
| }) | |
| # Get strategy | |
| strategy_info = strategy_det.detect( | |
| traj.get("steps", []), | |
| env.current_task or "unknown", | |
| meta, | |
| list(env.files_read), | |
| env.final_score, | |
| ) if traj.get("steps") else None | |
| return { | |
| "task": env.current_task or "unknown", | |
| "variant_id": traj.get("variant_id", "unknown"), | |
| "final_score": env.final_score, | |
| "strategy": strategy_info.strategy if strategy_info else "UNKNOWN", | |
| "failure_type": "β", | |
| "files": files, | |
| "dependencies": deps, | |
| "steps": steps_data, | |
| } | |
| # ββ Research Endpoints (NEW in v4) ββββββββββββββββββββββββββββββββββββββββββββ | |
| from .causal_probe import CausalProbe | |
| from .counterfactual_engine import CounterfactualEngine | |
| from .confidence_calibrator import ConfidenceCalibrator | |
| from .benchmark_runner import BenchmarkRunner | |
| from .analytics_engine import AnalyticsEngine | |
| _causal = CausalProbe() | |
| _counter = CounterfactualEngine() | |
| _calibrator = ConfidenceCalibrator() | |
| _benchmark = BenchmarkRunner() | |
| _analytics = AnalyticsEngine() | |
| async def causal_probe(): | |
| """ | |
| Causal reasoning probe β did the agent understand WHY the bug exists? | |
| Returns: causal_score, understanding_level, chain_coverage, shortcut_detection. | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| steps = traj.get("steps", []) | |
| meta = env.variant.meta if env.variant else {} | |
| report = _causal.probe( | |
| episode_id=traj.get("episode_id", ""), | |
| task=env.current_task or "unknown", | |
| trajectory_steps=steps, | |
| variant_meta=meta, | |
| files_read=list(env.files_read), | |
| files_written=list(env.files_written), | |
| final_score=env.final_score, | |
| ) | |
| return report.to_dict() | |
| async def counterfactual(): | |
| """ | |
| Counterfactual robustness test β is the agent's strategy brittle? | |
| Simulates 6 mutations and measures how many the strategy survives. | |
| Returns: robustness_score, brittleness_level, mutations analysis. | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| steps = traj.get("steps", []) | |
| meta = env.variant.meta if env.variant else {} | |
| report = _counter.analyze( | |
| episode_id=traj.get("episode_id", ""), | |
| task=env.current_task or "unknown", | |
| trajectory_steps=steps, | |
| variant_meta=meta, | |
| files_read=list(env.files_read), | |
| files_written=list(env.files_written), | |
| final_score=env.final_score, | |
| ) | |
| return report.to_dict() | |
| async def confidence_calibration(): | |
| """ | |
| Confidence calibration β is the agent appropriately confident? | |
| Infers confidence from behavioral proxies and compares to actual performance. | |
| Returns: profile (WELL_CALIBRATED|OVERCONFIDENT|UNDERCONFIDENT), calibration_score. | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| steps = traj.get("steps", []) | |
| report = _calibrator.calibrate( | |
| episode_id=traj.get("episode_id", ""), | |
| task=env.current_task or "unknown", | |
| trajectory_steps=steps, | |
| final_score=env.final_score, | |
| ) | |
| return report.to_dict() | |
| async def run_benchmark( | |
| tasks: str = "task1,task2", | |
| agents: str = "all", | |
| benchmark_id: str = None, | |
| ): | |
| """ | |
| Automated benchmark leaderboard. | |
| Runs all selected agents Γ tasks. Returns ranked leaderboard. | |
| tasks: comma-separated task IDs. agents: "all" or comma-separated strategy names. | |
| """ | |
| task_list = [t.strip() for t in tasks.split(",") if t.strip()] | |
| valid_tasks = ["task1", "task2", "task3"] | |
| task_list = [t for t in task_list if t in valid_tasks] | |
| if not task_list: | |
| raise HTTPException(status_code=400, detail=f"tasks must be one of {valid_tasks}") | |
| agent_list = None if agents == "all" else [a.strip() for a in agents.split(",")] | |
| try: | |
| report = _benchmark.run(env, tasks=task_list, agents=agent_list, benchmark_id=benchmark_id) | |
| return report.to_dict() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_analytics(): | |
| """ | |
| Unified research-grade analytics report. | |
| Synthesizes all v3+v4 evaluation dimensions into one report with: | |
| reasoning graph, root cause tree, alternative paths, profile tags, | |
| composite score, executive summary, researcher notes. | |
| """ | |
| traj = env.get_trajectory() | |
| if not traj: | |
| return {"error": "No trajectory available."} | |
| try: | |
| report = _analytics.analyze(env) | |
| return report.to_dict() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def health_v4(): | |
| return { | |
| "status": "ok", | |
| "environment": "codebase-nav-env", | |
| "version": "4.0.0", | |
| "endpoints": [ | |
| "/reset", "/step", "/state", "/health", | |
| "/trajectory", "/evaluate", "/metrics", "/fault-config", | |
| "/classify", "/strategy", "/advanced-metrics", | |
| "/improvement-plan", "/compare-agents", "/viz-data", | |
| "/causal-probe", "/counterfactual", "/confidence", | |
| "/benchmark", "/analytics", | |
| ], | |
| } | |
| def main(): | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 8000)) | |
| uvicorn.run("server.app:app", host="0.0.0.0", port=port, reload=True) | |
| if __name__ == "__main__": | |
| main() | |