Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| FastAPI application for the API Integration Debugging Environment. | |
| Endpoints: | |
| - POST /reset: Reset the environment | |
| - POST /step: Execute an action | |
| - GET /state: Get current environment state | |
| - GET /schema: Get action/observation schemas | |
| - WS /ws: WebSocket endpoint for persistent sessions | |
| - GET /tasks: List all tasks with action schema | |
| - POST /grader: Get grader score for current episode | |
| - POST /baseline: Run baseline inference on all tasks | |
| Usage: | |
| uvicorn server.app:app --reload --host 0.0.0.0 --port 8000 | |
| """ | |
| import os | |
| from typing import Dict, Any, Optional | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| try: | |
| from openenv.core.env_server.http_server import create_app | |
| except Exception as e: | |
| raise ImportError( | |
| "openenv is required. Install with: uv sync" | |
| ) from e | |
| try: | |
| from ..models import ApiDebugAction, ApiDebugObservation | |
| from .api_debug_env_environment import ApiDebugEnvironment | |
| except ImportError: | |
| from models import ApiDebugAction, ApiDebugObservation | |
| from server.api_debug_env_environment import ApiDebugEnvironment | |
| try: | |
| from ..scenarios import get_all_task_ids, get_scenario | |
| except ImportError: | |
| from scenarios import get_all_task_ids, get_scenario | |
| # βββ Create the core OpenEnv app βββββββββββββββββββββββββββββββββββββββββββββ | |
| app = create_app( | |
| ApiDebugEnvironment, | |
| ApiDebugAction, | |
| ApiDebugObservation, | |
| env_name="api_debug_env", | |
| max_concurrent_envs=3, | |
| ) | |
| # βββ Root endpoint (required: hackathon validator pings / and expects 200) ββββ | |
| async def root(): | |
| """Root endpoint β returns environment info and available endpoints.""" | |
| return { | |
| "name": "api_debug_env", | |
| "description": "API Integration Debugging Environment β diagnose and fix broken API integrations", | |
| "version": "2.0.0", | |
| "status": "running", | |
| "features": [ | |
| "Cascading failure simulation", | |
| "Dynamic service health tracking", | |
| "Multi-dimensional rubric grading", | |
| "Seed-based scenario randomization", | |
| ], | |
| "endpoints": ["/reset", "/step", "/state", "/tasks", "/grader", "/baseline", "/health", "/schema", "/docs"], | |
| } | |
| # βββ Hackathon-required endpoints βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Store environment instances per task for grading | |
| _grading_envs: Dict[str, ApiDebugEnvironment] = {} | |
| class GraderRequest(BaseModel): | |
| task_id: str = "easy" | |
| class BaselineRequest(BaseModel): | |
| api_key: Optional[str] = None | |
| async def list_tasks(): | |
| """Return list of all tasks with action schema and dependency info.""" | |
| tasks = [] | |
| for task_id in get_all_task_ids(): | |
| scenario = get_scenario(task_id) | |
| tasks.append({ | |
| "task_id": task_id, | |
| "difficulty": scenario.difficulty, | |
| "description": scenario.description, | |
| "max_steps": scenario.max_steps, | |
| "issues_count": len(scenario.issues), | |
| "services": scenario.services, | |
| "service_dependencies": { | |
| svc: node.depends_on | |
| for svc, node in scenario.service_graph.items() | |
| }, | |
| "context": scenario.context, | |
| "action_schema": { | |
| "action_type": { | |
| "type": "string", | |
| "enum": ["inspect_logs", "inspect_config", "inspect_endpoint", "submit_fix"], | |
| }, | |
| "target": { | |
| "type": "string", | |
| "enum": scenario.services, | |
| }, | |
| "fix_payload": { | |
| "type": "object", | |
| "required": False, | |
| }, | |
| }, | |
| }) | |
| return {"tasks": tasks} | |
| async def run_grader(request: GraderRequest): | |
| """Return grader score for a completed episode.""" | |
| task_id = request.task_id | |
| if task_id in _grading_envs: | |
| env = _grading_envs[task_id] | |
| score = env.grade() | |
| return { | |
| "task_id": task_id, | |
| "score": score, | |
| "issues_fixed": len(env._issues_fixed), | |
| "issues_total": len(env._scenario.issues) if env._scenario else 0, | |
| "steps_used": env._state.step_count, | |
| "grading_rubric": { | |
| "fix_score_weight": 0.40, | |
| "diagnosis_score_weight": 0.20, | |
| "efficiency_score_weight": 0.15, | |
| "strategy_score_weight": 0.25, | |
| }, | |
| } | |
| return { | |
| "task_id": task_id, | |
| "score": 0.001, | |
| "message": "No completed episode found. Run the environment first.", | |
| } | |
| async def run_baseline(request: Optional[BaselineRequest] = None): | |
| """ | |
| Run a rule-based baseline agent on all tasks. | |
| The baseline follows a proper debugging strategy: | |
| 1. Inspect logs for each service (diagnosis phase) | |
| 2. Inspect configs for services with issues (investigation phase) | |
| 3. Submit known fixes (resolution phase) | |
| Returns baseline scores for each task. | |
| """ | |
| # Known fixes for each task (a heuristic baseline, not an LLM) | |
| known_fixes = { | |
| "easy": [ | |
| {"target": "payment_client", "fix": {"headers.Authorization": "Bearer sk_live_token123"}}, | |
| {"target": "payment_client", "fix": {"headers.Content-Type": "application/json"}}, | |
| ], | |
| "medium": [ | |
| {"target": "webhook_sender", "fix": {"rate_limit.requests_per_second": 10}}, | |
| {"target": "webhook_sender", "fix": {"retry": {"max_retries": 3, "backoff_factor": 2, "retry_on_status": [429, 500]}}}, | |
| {"target": "webhook_sender", "fix": {"headers.X-Webhook-Signature": "sha256=computed_signature"}}, | |
| ], | |
| "hard": [ | |
| {"target": "order_service", "fix": {"inventory_url": "https://inventory.internal/v2/reserve"}}, | |
| {"target": "order_service", "fix": {"timeout": 10}}, | |
| {"target": "order_service", "fix": {"async_mode": True}}, | |
| {"target": "inventory_service", "fix": {"headers.Authorization": "Bearer valid_token_789"}}, | |
| {"target": "inventory_service", "fix": {"token_refresh_url": "https://auth.internal/refresh", "auto_refresh": True}}, | |
| ], | |
| } | |
| results = {} | |
| for task_id in get_all_task_ids(): | |
| env = ApiDebugEnvironment(task_id=task_id) | |
| obs = env.reset() | |
| # Phase 1: Inspect all logs (proper diagnosis strategy) | |
| for service in obs.available_targets: | |
| if env._done: | |
| break | |
| obs = env.step(ApiDebugAction( | |
| action_type="inspect_logs", | |
| target=service, | |
| )) | |
| # Phase 2: Inspect configs for services that have issues | |
| for service in obs.available_targets: | |
| if env._done: | |
| break | |
| obs = env.step(ApiDebugAction( | |
| action_type="inspect_config", | |
| target=service, | |
| )) | |
| # Phase 3: Test endpoints to observe failures | |
| for service in obs.available_targets[:2]: # Just test a couple | |
| if env._done: | |
| break | |
| obs = env.step(ApiDebugAction( | |
| action_type="inspect_endpoint", | |
| target=service, | |
| )) | |
| # Phase 4: Submit fixes | |
| for fix_info in known_fixes.get(task_id, []): | |
| if env._done: | |
| break | |
| obs = env.step(ApiDebugAction( | |
| action_type="submit_fix", | |
| target=fix_info["target"], | |
| fix_payload=fix_info["fix"], | |
| )) | |
| # Store for grading | |
| _grading_envs[task_id] = env | |
| score = env.grade() | |
| results[task_id] = { | |
| "score": score, | |
| "steps_used": env._state.step_count, | |
| "issues_found": len(env._issues_found), | |
| "issues_fixed": len(env._issues_fixed), | |
| "issues_total": len(env._scenario.issues) if env._scenario else 0, | |
| } | |
| return {"baseline_scores": results} | |
| # βββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(host: str = "0.0.0.0", port: int = 8000): | |
| """Run the server directly.""" | |
| import argparse | |
| import uvicorn | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--host", type=str, default=host) | |
| parser.add_argument("--port", type=int, default=port) | |
| args = parser.parse_args() | |
| uvicorn.run(app, host=args.host, port=args.port) | |
| if __name__ == "__main__": | |
| main() | |