Spaces:
Paused
Paused
| from __future__ import annotations | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| from osint_env.agents.single_agent import SingleAgentRunner | |
| from osint_env.agents.swarm_agent import SwarmAgentRunner | |
| from osint_env.config import clone_environment_config, load_seeding_config, load_shared_config | |
| from osint_env.domain.models import EnvironmentConfig | |
| from osint_env.env.environment import OSINTEnvironment | |
| from osint_env.env.reward import compute_graph_f1 | |
| from osint_env.eval.leaderboard import append_leaderboard_record, load_leaderboard | |
| from osint_env.eval.metrics import EvalMetrics | |
| from osint_env.llm import build_llm_client | |
| from osint_env.viz import export_dashboard | |
| CONFIG_PATH = os.getenv("CONFIG_PATH", "datasets/fixed_levels/shared_config_fixed_levels.json") | |
| SEED_FILE = os.getenv("SEED_FILE", "datasets/fixed_levels/seed_fixed_levels.json") | |
| AGENT_MODE = os.getenv("AGENT_MODE", "swarm") | |
| LLM_PROVIDER = os.getenv("LLM_PROVIDER", "openai") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "gpt-5.4") | |
| OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "") | |
| OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
| OPENAI_API_KEY_ENV = os.getenv("OPENAI_API_KEY_ENV", "OPENAI_API_KEY") | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1") | |
| API_KEY = os.getenv("API_KEY", "") | |
| HF_SPACE_URL = os.getenv("HF_SPACE_URL", "") | |
| HF_TOKEN = os.getenv("HF_TOKEN","") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME", "") | |
| LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", "0")) | |
| EPISODES = int(os.getenv("EPISODES", "1")) | |
| SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.67")) | |
| TASK_INDICES_RAW = os.getenv("TASK_INDICES", "") | |
| DATASET_MODE = os.getenv("DATASET_MODE", "") | |
| METAQA_ROOT = os.getenv("METAQA_ROOT", "") | |
| METAQA_KB_PATH = os.getenv("METAQA_KB_PATH", "") | |
| METAQA_VARIANT = os.getenv("METAQA_VARIANT", "") | |
| METAQA_HOPS_RAW = os.getenv("METAQA_HOPS", "") | |
| METAQA_SPLITS_RAW = os.getenv("METAQA_SPLITS", "") | |
| WRITE_BENCHMARK_ARTIFACTS = os.getenv("WRITE_BENCHMARK_ARTIFACTS", "1").strip().lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| "y", | |
| "on", | |
| } | |
| LEADERBOARD_PATH = os.getenv("LEADERBOARD_PATH", "datasets/fixed_levels/leaderboard_fixed_levels.json") | |
| DASHBOARD_PATH = os.getenv("DASHBOARD_PATH", "datasets/fixed_levels/dashboard_fixed_levels.html") | |
| RUN_NAME = os.getenv("RUN_NAME", "fixed_levels_qwen_swarm") | |
| BENCHMARK = "osint-openenv" | |
| TASK_NAME = "fixed_levels_easy_mid_hard" | |
| def _parse_task_indices(raw: str) -> list[int]: | |
| out: list[int] = [] | |
| for token in str(raw or "").split(","): | |
| stripped = token.strip() | |
| if not stripped: | |
| continue | |
| try: | |
| out.append(int(stripped)) | |
| except ValueError: | |
| continue | |
| return out | |
| def _parse_csv_tokens(raw: str) -> list[str]: | |
| return [token.strip() for token in str(raw or "").split(",") if token.strip()] | |
| def _normalize_ollama_base_url(url: str) -> str: | |
| normalized = str(url or "").strip().rstrip("/") | |
| if normalized.endswith("/v1"): | |
| normalized = normalized[:-3].rstrip("/") | |
| return normalized or "http://127.0.0.1:11434" | |
| def _normalize_openai_base_url(url: str) -> str: | |
| normalized = str(url or "").strip().rstrip("/") | |
| if not normalized: | |
| return "" | |
| if normalized.endswith("/v1"): | |
| return normalized | |
| return f"{normalized}/v1" | |
| TASK_INDICES = _parse_task_indices(TASK_INDICES_RAW) | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: str | None) -> None: | |
| error_text = "null" if error is None else str(error) | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={str(bool(done)).lower()} error={error_text}", | |
| flush=True, | |
| ) | |
| def log_end(task: str, success: bool, steps: int, score: float, rewards: list[float]) -> None: | |
| rewards_text = ",".join(f"{value:.2f}" for value in rewards) | |
| print( | |
| f"[END] success={str(bool(success)).lower()} steps={steps} score={score:.2f} rewards={rewards_text}", | |
| flush=True, | |
| ) | |
| def _looks_like_placeholder_api_key(value: str) -> bool: | |
| token = str(value or "").strip().lower() | |
| if not token: | |
| return True | |
| placeholder_markers = [ | |
| "your_openai_api_key", | |
| "your-key", | |
| "your_key", | |
| "your real", | |
| "real-openai-key", | |
| "replace-me", | |
| "changeme", | |
| "example", | |
| "<api-key>", | |
| ] | |
| if token.startswith("your_") or token.startswith("sk-your-"): | |
| return True | |
| return any(marker in token for marker in placeholder_markers) | |
| def _format_action(action: dict[str, Any]) -> str: | |
| action_type = str(action.get("action_type", "")).upper() | |
| payload = dict(action.get("payload", {})) | |
| if action_type == "ANSWER": | |
| return f"answer({str(payload.get('answer', 'unknown')).strip()})" | |
| if action_type == "ADD_EDGE": | |
| try: | |
| conf = float(payload.get("confidence", 1.0)) | |
| except (TypeError, ValueError): | |
| conf = 1.0 | |
| return ( | |
| "add_edge(" | |
| f"{payload.get('src', '')}," | |
| f"{payload.get('rel', '')}," | |
| f"{payload.get('dst', '')}," | |
| f"{conf:.2f}" | |
| ")" | |
| ) | |
| tool_name = str(payload.get("tool_name", "tool")).strip() or "tool" | |
| args = payload.get("args", {}) | |
| if not isinstance(args, dict) or not args: | |
| return f"{tool_name}()" | |
| args_text = ",".join(f"{key}={value}" for key, value in sorted(args.items())) | |
| return f"{tool_name}({args_text})" | |
| def _assistant_tool_call_id(message: dict[str, Any]) -> str | None: | |
| tool_calls = list(message.get("tool_calls", [])) | |
| if not tool_calls: | |
| return None | |
| tool_call_id = tool_calls[0].get("id") | |
| return str(tool_call_id) if tool_call_id else None | |
| def _tool_result_message(assistant_message: dict[str, Any], result: dict[str, Any]) -> dict[str, Any] | None: | |
| tool_call_id = _assistant_tool_call_id(assistant_message) | |
| if not tool_call_id: | |
| return None | |
| return { | |
| "role": "tool", | |
| "tool_call_id": tool_call_id, | |
| "content": json.dumps(result, sort_keys=True), | |
| } | |
| def _resolve_environment_config() -> EnvironmentConfig: | |
| shared = load_shared_config(CONFIG_PATH) | |
| env_cfg = clone_environment_config(shared.environment) | |
| if SEED_FILE and Path(SEED_FILE).exists(): | |
| env_cfg.seeding = load_seeding_config(SEED_FILE) | |
| mode = AGENT_MODE.strip().lower() | |
| if mode == "single": | |
| env_cfg.swarm.enabled = False | |
| elif mode == "swarm": | |
| env_cfg.swarm.enabled = True | |
| # Inference submissions must route all calls through OpenAI-compatible client config. | |
| env_cfg.llm.provider = "openai" | |
| env_cfg.llm.model = MODEL_NAME.strip() | |
| if LLM_TIMEOUT_SECONDS > 0: | |
| env_cfg.llm.timeout_seconds = int(LLM_TIMEOUT_SECONDS) | |
| # Evaluation harnesses inject API_BASE_URL/HF_TOKEN for proxy-enforced requests. | |
| resolved_openai_base = API_BASE_URL.strip() or OPENAI_BASE_URL.strip() or HF_SPACE_URL.strip() | |
| if resolved_openai_base: | |
| env_cfg.llm.openai_base_url = _normalize_openai_base_url(resolved_openai_base) | |
| if HF_TOKEN.strip(): | |
| env_cfg.llm.openai_api_key = HF_TOKEN.strip() | |
| elif API_KEY.strip(): | |
| env_cfg.llm.openai_api_key = API_KEY.strip() | |
| elif OPENAI_API_KEY.strip(): | |
| env_cfg.llm.openai_api_key = OPENAI_API_KEY.strip() | |
| if OPENAI_API_KEY_ENV.strip(): | |
| env_cfg.llm.openai_api_key_env = OPENAI_API_KEY_ENV.strip() | |
| dataset_mode = DATASET_MODE.strip().lower() | |
| if dataset_mode in {"canonical", "metaqa"}: | |
| env_cfg.dataset_mode = dataset_mode | |
| if METAQA_ROOT.strip(): | |
| env_cfg.metaqa_root = METAQA_ROOT.strip() | |
| if METAQA_KB_PATH.strip(): | |
| env_cfg.metaqa_kb_path = METAQA_KB_PATH.strip() | |
| metaqa_variant = METAQA_VARIANT.strip().lower() | |
| if metaqa_variant in {"vanilla", "ntm"}: | |
| env_cfg.metaqa_variant = metaqa_variant | |
| metaqa_hops = _parse_csv_tokens(METAQA_HOPS_RAW) | |
| if metaqa_hops: | |
| env_cfg.metaqa_hops = metaqa_hops | |
| metaqa_splits = _parse_csv_tokens(METAQA_SPLITS_RAW) | |
| if metaqa_splits: | |
| env_cfg.metaqa_splits = metaqa_splits | |
| return env_cfg | |
| def _runner_for(env: OSINTEnvironment, llm: Any) -> SingleAgentRunner | SwarmAgentRunner: | |
| if env.config.swarm.enabled: | |
| return SwarmAgentRunner(env=env, llm=llm) | |
| return SingleAgentRunner(env=env, llm=llm) | |
| def _normalize_difficulty(value: str) -> str: | |
| token = str(value or "").strip().lower() | |
| if token in {"easy", "e"}: | |
| return "easy" | |
| if token in {"mid", "medium", "m"}: | |
| return "medium" | |
| if token in {"high", "hard", "h"}: | |
| return "hard" | |
| return "hard" | |
| def _task_difficulty(env: OSINTEnvironment, task_index: int) -> str: | |
| idx = int(task_index) % max(1, len(env.tasks)) | |
| task = env.tasks[idx] | |
| if isinstance(task.metadata, dict) and "difficulty" in task.metadata: | |
| return _normalize_difficulty(str(task.metadata.get("difficulty", ""))) | |
| if idx < 10: | |
| return "easy" | |
| if idx < 20: | |
| return "medium" | |
| return "hard" | |
| def _episode_row(env: OSINTEnvironment, info: dict[str, Any]) -> dict[str, Any]: | |
| if env.state is None: | |
| return { | |
| "task_id": "unknown", | |
| "task_type": "unknown", | |
| "question": "", | |
| "task_answer": str(info.get("task_answer", "")), | |
| "agent_answer": str(info.get("agent_answer", "")), | |
| "graph_f1": 0.0, | |
| "reward": float(info.get("total_reward", 0.0) or 0.0), | |
| "steps": int(info.get("step_count", 0) or 0), | |
| "tool_calls": int(info.get("tool_calls", 0) or 0), | |
| "success": int(info.get("agent_answer") == info.get("task_answer")), | |
| "reward_components": dict(info.get("reward_components", {})), | |
| "pred_edges": [], | |
| "truth_edges": [], | |
| } | |
| graph_f1 = compute_graph_f1(env.memory_graph.edges, env.state.task.supporting_edges) | |
| return { | |
| "task_id": env.state.task.task_id, | |
| "task_type": env.state.task.task_type, | |
| "question": env.state.task.question, | |
| "task_answer": str(info.get("task_answer", "")), | |
| "agent_answer": str(info.get("agent_answer", "")) if info.get("agent_answer") is not None else "", | |
| "graph_f1": graph_f1, | |
| "reward": float(info.get("total_reward", 0.0) or 0.0), | |
| "steps": int(info.get("step_count", 0) or 0), | |
| "tool_calls": int(info.get("tool_calls", 0) or 0), | |
| "success": int(info.get("agent_answer") == info.get("task_answer")), | |
| "reward_components": dict(info.get("reward_components", {})), | |
| "spawn_count": int(info.get("spawn_count", 0) or 0), | |
| "spawn_critical_steps": int(info.get("spawn_critical_steps", 0) or 0), | |
| "pred_edges": [ | |
| { | |
| "src": edge.src, | |
| "rel": edge.rel, | |
| "dst": edge.dst, | |
| "confidence": float(edge.confidence), | |
| } | |
| for edge in env.memory_graph.edges | |
| ], | |
| "truth_edges": [ | |
| { | |
| "src": edge.src, | |
| "rel": edge.rel, | |
| "dst": edge.dst, | |
| "confidence": float(edge.confidence), | |
| } | |
| for edge in env.state.task.supporting_edges | |
| ], | |
| } | |
| def _last_action_error(observation: Any, info: dict[str, Any]) -> str | None: | |
| raw = info.get("last_action_error") if isinstance(info, dict) else None | |
| if raw is not None: | |
| return str(raw) | |
| tool_outputs = getattr(observation, "tool_outputs", None) | |
| if isinstance(tool_outputs, list) and tool_outputs: | |
| last = tool_outputs[-1] | |
| if isinstance(last, dict): | |
| output = last.get("output") | |
| if isinstance(output, dict) and output.get("error") is not None: | |
| return str(output.get("error")) | |
| return None | |
| def _install_step_logger(env: OSINTEnvironment) -> tuple[list[float], dict[str, int], Any]: | |
| rewards: list[float] = [] | |
| counters = {"steps": 0} | |
| original_step = env.step | |
| def _logged_step(action: Any): | |
| observation, reward, done, info = original_step(action) | |
| counters["steps"] += 1 | |
| reward_value = float(reward or 0.0) | |
| rewards.append(reward_value) | |
| action_type = getattr(action, "action_type", "") | |
| action_type_value = str(getattr(action_type, "value", action_type)) | |
| action_text = _format_action( | |
| { | |
| "action_type": action_type_value, | |
| "payload": dict(getattr(action, "payload", {}) or {}), | |
| } | |
| ) | |
| log_step( | |
| step=counters["steps"], | |
| action=action_text, | |
| reward=reward_value, | |
| done=bool(done), | |
| error=_last_action_error(observation, info if isinstance(info, dict) else {}), | |
| ) | |
| return observation, reward, done, info | |
| env.step = _logged_step | |
| return rewards, counters, original_step | |
| def _validate_required_configuration() -> None: | |
| missing: list[str] = [] | |
| api_base = API_BASE_URL.strip() | |
| model_name = MODEL_NAME.strip() | |
| hf_token = HF_TOKEN.strip() | |
| api_key = API_KEY.strip() | |
| openai_key = OPENAI_API_KEY.strip() | |
| if not api_base or api_base == "<your-active-endpoint>": | |
| missing.append("API_BASE_URL") | |
| if not model_name or model_name == "<your-active-model>": | |
| missing.append("MODEL_NAME") | |
| if not (hf_token or api_key or openai_key): | |
| missing.append("HF_TOKEN|API_KEY|OPENAI_API_KEY") | |
| # Required when using docker-image based env construction. | |
| if os.getenv("REQUIRE_LOCAL_IMAGE_NAME", "0").strip().lower() in {"1", "true", "yes", "on"}: | |
| if not LOCAL_IMAGE_NAME.strip(): | |
| missing.append("LOCAL_IMAGE_NAME") | |
| if missing: | |
| raise RuntimeError(f"Missing required environment variables: {', '.join(sorted(set(missing)))}") | |
| def _task_targets(env: OSINTEnvironment, episodes: int, task_indices: list[int]) -> list[int | None]: | |
| if task_indices: | |
| task_count = max(1, len(env.tasks)) | |
| return [index % task_count for index in task_indices] | |
| return [None] * max(1, episodes) | |
| def _run_with_runner( | |
| env: OSINTEnvironment, | |
| llm: Any, | |
| episodes: int, | |
| task_indices: list[int], | |
| ) -> tuple[dict[str, Any], list[dict[str, Any]], list[float], int]: | |
| metrics = EvalMetrics() | |
| episode_rows: list[dict[str, Any]] = [] | |
| rewards, counters, original_step = _install_step_logger(env) | |
| single_runner = SingleAgentRunner(env=env, llm=llm) | |
| swarm_runner = SwarmAgentRunner(env=env, llm=llm) if env.config.swarm.enabled else None | |
| try: | |
| for task_index in _task_targets(env, episodes, task_indices): | |
| task_count = max(1, len(env.tasks)) | |
| selected_index = env._task_idx % task_count if task_index is None else int(task_index) % task_count | |
| if task_index is not None: | |
| # Keep compatibility with explicit task selection from the previous inference script. | |
| env._task_idx = selected_index | |
| difficulty = _task_difficulty(env, selected_index) | |
| if difficulty == "easy": | |
| runner: SingleAgentRunner | SwarmAgentRunner = single_runner | |
| elif swarm_runner is not None: | |
| runner = swarm_runner | |
| else: | |
| runner = single_runner | |
| info = runner.run_episode() | |
| if env.state is None: | |
| continue | |
| graph_f1 = compute_graph_f1(env.memory_graph.edges, env.state.task.supporting_edges) | |
| metrics.add(info, task_type=env.state.task.task_type, graph_f1=graph_f1) | |
| episode_rows.append(_episode_row(env, info)) | |
| finally: | |
| env.step = original_step | |
| return metrics.summary(), episode_rows, rewards, int(counters["steps"]) | |
| def _maybe_write_artifacts( | |
| env: OSINTEnvironment, | |
| summary: dict[str, Any], | |
| episodes: int, | |
| episode_rows: list[dict[str, Any]], | |
| ) -> tuple[dict[str, Any] | None, str | None]: | |
| if not WRITE_BENCHMARK_ARTIFACTS: | |
| return None, None | |
| record = append_leaderboard_record( | |
| path=LEADERBOARD_PATH, | |
| summary=summary, | |
| episodes=episodes, | |
| run_name=RUN_NAME or None, | |
| config={ | |
| "seed": env.config.seed, | |
| "max_steps": env.config.max_steps, | |
| "swarm_enabled": env.config.swarm.enabled, | |
| "max_agents": env.config.swarm.max_agents, | |
| "max_breadth": env.config.swarm.max_breadth, | |
| "max_width": env.config.swarm.max_width, | |
| "max_depth": env.config.swarm.max_depth, | |
| "seeded_questions": len(env.config.seeding.seeded_questions), | |
| "llm_provider": env.config.llm.provider, | |
| "llm_model": env.config.llm.model, | |
| }, | |
| ) | |
| leaderboard = load_leaderboard(LEADERBOARD_PATH) | |
| dashboard = export_dashboard( | |
| env=env, | |
| evaluation={"summary": summary, "episodes": episode_rows}, | |
| leaderboard_records=leaderboard, | |
| output_path=DASHBOARD_PATH, | |
| ) | |
| return record, dashboard | |
| def main() -> None: | |
| _validate_required_configuration() | |
| env_cfg = _resolve_environment_config() | |
| llm_client = build_llm_client(env_cfg.llm) | |
| episodes_given = "EPISODES" in os.environ and str(os.getenv("EPISODES", "")).strip() != "" | |
| task_indices_given = bool(TASK_INDICES) | |
| if not episodes_given and not task_indices_given: | |
| runs: list[tuple[str, list[int], int]] = [ | |
| ("easy", list(range(0, 10)), 10), | |
| ("mid", list(range(10, 20)), 10), | |
| ("hard", list(range(20, 30)), 10), | |
| ] | |
| else: | |
| selected_indices = TASK_INDICES if task_indices_given else [] | |
| episodes = len(selected_indices) if selected_indices else max(1, EPISODES) | |
| runs = [(TASK_NAME, selected_indices, episodes)] | |
| for task_name, run_indices, run_episodes in runs: | |
| env: OSINTEnvironment | None = None | |
| rewards: list[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| env = OSINTEnvironment(env_cfg, llm=llm_client) | |
| log_start(task=task_name, env=BENCHMARK, model=env_cfg.llm.model) | |
| try: | |
| summary, episode_rows, rewards, steps_taken = _run_with_runner( | |
| env=env, | |
| llm=llm_client, | |
| episodes=run_episodes, | |
| task_indices=run_indices, | |
| ) | |
| score = float(summary.get("avg_reward", 0.0) or 0.0) | |
| score = max(0.0, min(1.0, score)) | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| _maybe_write_artifacts( | |
| env=env, | |
| summary=summary, | |
| episodes=run_episodes, | |
| episode_rows=episode_rows, | |
| ) | |
| finally: | |
| if env is not None: | |
| close_fn = getattr(env, "close", None) | |
| if callable(close_fn): | |
| close_fn() | |
| log_end(task=task_name, success=success, steps=steps_taken, score=score, rewards=rewards) | |
| if __name__ == "__main__": | |
| main() | |