| """ |
| Simple data loader for OpenHands Index leaderboard. |
| Loads JSONL files from local directory or GitHub repository. |
| Uses pydantic models from openhands-index-results for validation. |
| """ |
| import os |
| import sys |
| import logging |
| import pandas as pd |
| import json |
| from pathlib import Path |
| from typing import Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| _schema_models_loaded = False |
| Metadata = None |
| ScoreEntry = None |
|
|
|
|
| def _ensure_schema_models(): |
| """Lazily import pydantic schema models from openhands-index-results.""" |
| global _schema_models_loaded, Metadata, ScoreEntry |
| |
| if _schema_models_loaded: |
| return _schema_models_loaded |
| |
| try: |
| |
| from validate_schema import Metadata as _Metadata, ScoreEntry as _ScoreEntry |
| Metadata = _Metadata |
| ScoreEntry = _ScoreEntry |
| _schema_models_loaded = True |
| logger.info("Successfully loaded pydantic schema models from openhands-index-results") |
| except ImportError as e: |
| logger.warning(f"Could not import pydantic schema models: {e}") |
| logger.warning("Data will be loaded without schema validation") |
| _schema_models_loaded = False |
| |
| return _schema_models_loaded |
|
|
|
|
| def load_and_validate_agent_data(agent_dir: Path) -> tuple[Optional[dict], Optional[list], list[str]]: |
| """ |
| Load and validate agent data using pydantic models if available. |
| |
| Returns: |
| Tuple of (metadata_dict, scores_list, validation_errors) |
| """ |
| errors = [] |
| metadata_file = agent_dir / "metadata.json" |
| scores_file = agent_dir / "scores.json" |
| |
| if not metadata_file.exists() or not scores_file.exists(): |
| return None, None, [f"Missing metadata.json or scores.json in {agent_dir}"] |
| |
| |
| with open(metadata_file) as f: |
| metadata_raw = json.load(f) |
| with open(scores_file) as f: |
| scores_raw = json.load(f) |
| |
| |
| if _ensure_schema_models() and Metadata and ScoreEntry: |
| try: |
| validated_metadata = Metadata(**metadata_raw) |
| |
| metadata_dict = validated_metadata.model_dump(mode='json') |
| except Exception as e: |
| errors.append(f"Metadata validation error in {agent_dir.name}: {e}") |
| metadata_dict = metadata_raw |
| |
| validated_scores = [] |
| for i, score in enumerate(scores_raw): |
| try: |
| validated_score = ScoreEntry(**score) |
| |
| validated_dict = validated_score.model_dump(mode='json') |
| |
| for key, value in score.items(): |
| if key not in validated_dict: |
| validated_dict[key] = value |
| validated_scores.append(validated_dict) |
| except Exception as e: |
| errors.append(f"Score entry {i} validation error in {agent_dir.name}: {e}") |
| validated_scores.append(score) |
| scores_list = validated_scores |
| else: |
| |
| metadata_dict = metadata_raw |
| scores_list = scores_raw |
| |
| return metadata_dict, scores_list, errors |
|
|
|
|
| class SimpleLeaderboardViewer: |
| """Simple replacement for agent-eval's LeaderboardViewer.""" |
|
|
| AGENT_FILTER_OPENHANDS = "openhands" |
| AGENT_FILTER_ALTERNATIVE = "alternative" |
|
|
| def __init__( |
| self, |
| data_dir: str, |
| config: str, |
| split: str, |
| agent_filter: str = AGENT_FILTER_OPENHANDS, |
| ): |
| """ |
| Args: |
| data_dir: Path to data directory |
| config: Config name (e.g., "1.0.0-dev1") |
| split: Split name (e.g., "validation" or "test") |
| agent_filter: Which submissions to include. |
| ``"openhands"`` (default) loads only the default OpenHands |
| agent runs from ``results/{model}/`` — the canonical |
| leaderboard. ``"alternative"`` loads only third-party |
| harnesses (Claude Code / Codex / Gemini CLI / OpenHands |
| Sub-agents) from ``alternative_agents/{type}/{model}/``, |
| which power the standalone Alternative Agents page. |
| The two are kept on separate pages because their |
| cost/runtime numbers aren't apples-to-apples and mixing |
| them in one ranking would be misleading. |
| """ |
| if agent_filter not in (self.AGENT_FILTER_OPENHANDS, self.AGENT_FILTER_ALTERNATIVE): |
| raise ValueError( |
| f"agent_filter must be one of " |
| f"{{{self.AGENT_FILTER_OPENHANDS!r}, {self.AGENT_FILTER_ALTERNATIVE!r}}}, " |
| f"got {agent_filter!r}" |
| ) |
| self.data_dir = Path(data_dir) |
| self.config = config |
| self.split = split |
| self.agent_filter = agent_filter |
| self.config_path = self.data_dir / config |
| |
| |
| self.benchmark_to_categories = { |
| 'swe-bench': ['Issue Resolution'], |
| 'swe-bench-multimodal': ['Frontend'], |
| 'commit0': ['Greenfield'], |
| 'swt-bench': ['Testing'], |
| 'gaia': ['Information Gathering'], |
| } |
| |
| |
| self.tag_map = {} |
| for benchmark, categories in self.benchmark_to_categories.items(): |
| for category in categories: |
| if category not in self.tag_map: |
| self.tag_map[category] = [] |
| if benchmark not in self.tag_map[category]: |
| self.tag_map[category].append(benchmark) |
| |
| |
| |
| |
| DEFAULT_AGENT_NAME = "OpenHands" |
|
|
| def _records_from_agent_dir(self, agent_dir: Path, default_agent_name: str | None = None) -> tuple[list[dict], list[str]]: |
| """Build per-benchmark records from a single agent directory. |
| |
| Shared by ``_load_from_agent_dirs`` (default OpenHands results) and |
| ``_load_from_alternative_agents_dirs`` (acp-claude / acp-codex / etc.). |
| Returns ``(records, validation_errors)``. Returns an empty list of |
| records when the directory has no scores or is hidden from the |
| leaderboard. |
| """ |
| records: list[dict] = [] |
| metadata, scores, errors = load_and_validate_agent_data(agent_dir) |
|
|
| if metadata is None or scores is None: |
| return records, errors |
|
|
| if metadata.get('hide_from_leaderboard', False): |
| logger.info(f"Skipping {agent_dir.name}: hide_from_leaderboard is True") |
| return records, errors |
|
|
| |
| |
| |
| |
| agent_name = ( |
| metadata.get('agent_name') |
| or default_agent_name |
| or self.DEFAULT_AGENT_NAME |
| ) |
|
|
| for score_entry in scores: |
| record = { |
| 'agent_name': agent_name, |
| 'agent_version': metadata.get('agent_version', 'Unknown'), |
| 'llm_base': metadata.get('model', 'unknown'), |
| 'openness': metadata.get('openness', 'unknown'), |
| 'submission_time': score_entry.get('submission_time', metadata.get('submission_time', '')), |
| 'release_date': metadata.get('release_date', ''), |
| 'parameter_count_b': metadata.get('parameter_count_b'), |
| 'active_parameter_count_b': metadata.get('active_parameter_count_b'), |
| 'score': score_entry.get('score'), |
| 'metric': score_entry.get('metric', 'unknown'), |
| 'cost_per_instance': score_entry.get('cost_per_instance'), |
| 'average_runtime': score_entry.get('average_runtime'), |
| 'tags': [score_entry.get('benchmark')], |
| 'full_archive': score_entry.get('full_archive', ''), |
| 'eval_visualization_page': score_entry.get('eval_visualization_page', ''), |
| } |
| records.append(record) |
| return records, errors |
|
|
| def _load_from_agent_dirs(self): |
| """Load agent records based on ``self.agent_filter``. |
| |
| - ``"openhands"`` (default): only ``{config}/results/{model}/``, |
| which is the canonical OpenHands leaderboard. The Home page and |
| the per-category subpages use this. |
| - ``"alternative"``: only |
| ``{config}/alternative_agents/{type}/{model}/`` (acp-claude, |
| acp-codex, acp-gemini, openhands_subagents, ...). The dedicated |
| Alternative Agents page uses this. |
| |
| Returns ``None`` if no records were found (which makes the caller |
| render an empty-state placeholder). |
| """ |
| all_records = [] |
| all_validation_errors = [] |
|
|
| if self.agent_filter == self.AGENT_FILTER_OPENHANDS: |
| |
| results_dir = self.config_path / "results" |
| if results_dir.exists(): |
| for agent_dir in results_dir.iterdir(): |
| if not agent_dir.is_dir(): |
| continue |
| records, errors = self._records_from_agent_dir(agent_dir) |
| all_records.extend(records) |
| all_validation_errors.extend(errors) |
| else: |
| |
| |
| |
| |
| |
| agent_type_default_name = { |
| 'acp-claude': 'Claude Code', |
| 'acp-codex': 'Codex', |
| 'acp-gemini': 'Gemini CLI', |
| 'openhands_subagents': 'OpenHands Sub-agents', |
| } |
| alt_dir = self.config_path / "alternative_agents" |
| if alt_dir.exists(): |
| for type_dir in alt_dir.iterdir(): |
| if not type_dir.is_dir(): |
| continue |
| default_name = agent_type_default_name.get(type_dir.name) |
| for agent_dir in type_dir.iterdir(): |
| if not agent_dir.is_dir(): |
| continue |
| records, errors = self._records_from_agent_dir( |
| agent_dir, default_agent_name=default_name |
| ) |
| all_records.extend(records) |
| all_validation_errors.extend(errors) |
|
|
| |
| if all_validation_errors: |
| logger.warning(f"Schema validation errors ({len(all_validation_errors)} total):") |
| for error in all_validation_errors[:5]: |
| logger.warning(f" - {error}") |
| if len(all_validation_errors) > 5: |
| logger.warning(f" ... and {len(all_validation_errors) - 5} more") |
|
|
| if not all_records: |
| return None |
|
|
| return pd.DataFrame(all_records) |
| |
| def _load(self): |
| """Load data from agent-centric directories and return DataFrame and tag map.""" |
| df = self._load_from_agent_dirs() |
| |
| if df is None: |
| |
| return pd.DataFrame({ |
| "Message": [f"No data found for split '{self.split}' in results directory"] |
| }), {} |
| |
| |
| try: |
| |
| |
| |
| transformed_records = [] |
| |
| |
| |
| |
| |
| df['agent_name'] = df['agent_name'].fillna(self.DEFAULT_AGENT_NAME) |
| df['agent_id'] = ( |
| df['agent_name'].astype(str) |
| + '_' + df['agent_version'].astype(str) |
| + '_' + df['llm_base'].astype(str) |
| ) |
|
|
| for agent_id in df['agent_id'].unique(): |
| agent_records = df[df['agent_id'] == agent_id] |
|
|
| |
| first_record = agent_records.iloc[0] |
| agent_version = first_record['agent_version'] |
| agent_name = first_record['agent_name'] |
|
|
| |
| from aliases import OPENNESS_MAPPING |
| raw_openness = first_record['openness'] |
| normalized_openness = OPENNESS_MAPPING.get(raw_openness, raw_openness) |
|
|
| |
| ALL_CATEGORIES = ['Issue Resolution', 'Frontend', 'Greenfield', 'Testing', 'Information Gathering'] |
|
|
| record = { |
| |
| 'agent_name': agent_name, |
| 'SDK version': agent_version, |
| 'Language model': first_record['llm_base'], |
| 'openness': normalized_openness, |
| 'date': first_record['submission_time'], |
| |
| 'release_date': first_record.get('release_date', ''), |
| 'parameter_count_b': first_record.get('parameter_count_b'), |
| 'active_parameter_count_b': first_record.get('active_parameter_count_b'), |
| |
| |
| 'id': agent_id, |
| 'source': first_record.get('source', ''), |
| 'logs': first_record.get('logs', ''), |
| 'visualization': '', |
| } |
| |
| |
| dataset_scores = [] |
| dataset_costs = [] |
| |
| |
| category_data = {} |
| |
| for _, row in agent_records.iterrows(): |
| tags = row['tags'] if isinstance(row['tags'], list) else [row['tags']] |
| for tag in tags: |
| |
| record[f'{tag} score'] = row['score'] |
| record[f'{tag} cost'] = row['cost_per_instance'] |
| record[f'{tag} runtime'] = row.get('average_runtime') |
| dataset_scores.append(row['score']) |
| dataset_costs.append(row['cost_per_instance']) |
| |
| |
| full_archive_url = row.get('full_archive', '') if hasattr(row, 'get') else row['full_archive'] if 'full_archive' in row.index else '' |
| if full_archive_url: |
| record[f'{tag} download'] = full_archive_url |
| |
| |
| viz_url = row.get('eval_visualization_page', '') if hasattr(row, 'get') else row['eval_visualization_page'] if 'eval_visualization_page' in row.index else '' |
| if viz_url: |
| record[f'{tag} visualization'] = viz_url |
| |
| |
| if tag in self.benchmark_to_categories: |
| for category in self.benchmark_to_categories[tag]: |
| if category not in category_data: |
| category_data[category] = {'scores': [], 'costs': [], 'runtimes': []} |
| category_data[category]['scores'].append(row['score']) |
| category_data[category]['costs'].append(row['cost_per_instance']) |
| category_data[category]['runtimes'].append(row.get('average_runtime')) |
| |
| |
| all_costs = [] |
| all_runtimes = [] |
| categories_with_scores = 0 |
| for category in ALL_CATEGORIES: |
| if category in category_data and category_data[category]['scores']: |
| data = category_data[category] |
| avg_score = sum(data['scores']) / len(data['scores']) |
| record[f'{category} score'] = avg_score |
| categories_with_scores += 1 |
| if data['costs']: |
| valid_costs = [c for c in data['costs'] if c is not None] |
| if valid_costs: |
| avg_cost = sum(valid_costs) / len(valid_costs) |
| record[f'{category} cost'] = avg_cost |
| all_costs.extend(valid_costs) |
| if data['runtimes']: |
| valid_runtimes = [r for r in data['runtimes'] if r is not None] |
| if valid_runtimes: |
| avg_runtime = sum(valid_runtimes) / len(valid_runtimes) |
| record[f'{category} runtime'] = avg_runtime |
| all_runtimes.extend(valid_runtimes) |
| else: |
| |
| pass |
| |
| |
| |
| score_sum = sum( |
| record.get(f'{cat} score', 0) or 0 |
| for cat in ALL_CATEGORIES |
| ) |
| record['average score'] = score_sum / 5 |
| |
| |
| record['average cost'] = sum(all_costs) / len(all_costs) if all_costs else None |
| |
| |
| record['average runtime'] = sum(all_runtimes) / len(all_runtimes) if all_runtimes else None |
| |
| |
| record['categories_completed'] = categories_with_scores |
| |
| transformed_records.append(record) |
| |
| transformed_df = pd.DataFrame(transformed_records) |
| |
| |
| if not self.tag_map: |
| |
| all_tags = set() |
| for _, row in df.iterrows(): |
| tags = row['tags'] if isinstance(row['tags'], list) else [row['tags']] |
| all_tags.update(tags) |
| |
| |
| self.tag_map = {tag: [tag] for tag in sorted(all_tags)} |
| |
| |
| print(f"[DATA_LOADER] Loaded {len(transformed_df)} agents") |
| if len(transformed_df) > 0: |
| sample_cols = ['agent_name', 'overall_score', 'overall_cost'] |
| available_cols = [c for c in sample_cols if c in transformed_df.columns] |
| print(f"[DATA_LOADER] Sample row: {transformed_df[available_cols].iloc[0].to_dict()}") |
| |
| return transformed_df, self.tag_map |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return pd.DataFrame({ |
| "Message": [f"Error loading data: {e}"] |
| }), {} |
| |
| def get_dataframe(self): |
| """Get the raw dataframe.""" |
| df, _ = self._load() |
| return df |
|
|
|
|
| def load_mock_data_locally(data_dir: str = "mock_results"): |
| """ |
| Load mock data from local directory for testing. |
| |
| Args: |
| data_dir: Path to mock results directory |
| |
| Returns: |
| Dictionary mapping split names to SimpleLeaderboardViewer instances |
| """ |
| viewers = {} |
| data_path = Path(data_dir) |
| |
| if not data_path.exists(): |
| print(f"Warning: Mock data directory '{data_dir}' not found") |
| return viewers |
| |
| |
| for config_dir in data_path.iterdir(): |
| if config_dir.is_dir(): |
| config_name = config_dir.name |
| |
| |
| for jsonl_file in config_dir.glob("*.jsonl"): |
| split_name = jsonl_file.stem |
| viewer = SimpleLeaderboardViewer( |
| data_dir=str(data_path), |
| config=config_name, |
| split=split_name |
| ) |
| viewers[split_name] = viewer |
| |
| return viewers |
|
|