Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Git Task Environment - Optimized for task-based isolation. | |
| This module provides an optimized Git environment for scenarios where: | |
| - Multiple tasks share the same base repository states | |
| - Tasks need fast reset() to reproducible states | |
| - Each task has an isolated workspace | |
| - A shared Gitea service provides repository storage | |
| """ | |
| import uuid | |
| from openenv.core.env_server import Action, Environment, Observation | |
| from openenv.core.tools.git_server_client import GitServerClient | |
| from ..models import GitAction, GitObservation, GitState | |
| class GitTaskEnvironment(Environment): | |
| """ | |
| Git Environment optimized for task-based isolation. | |
| This environment connects to a shared Gitea service and provides: | |
| - Fast reset() via git operations (no server restart) | |
| - Isolated workspace per environment instance | |
| - Shared repository cache across tasks | |
| - Reproducible base states from specific commits | |
| Architecture: | |
| Shared Gitea Service (external) | |
| ↓ | |
| GitTaskEnvironment instances (many) | |
| ↓ | |
| Isolated workspaces (/workspace) | |
| Args: | |
| gitea_url: URL of shared Gitea service (e.g., "http://gitea:3000") | |
| username: Gitea username for authentication | |
| password: Gitea password for authentication | |
| workspace_dir: Directory for git operations (default: /workspace) | |
| task_repos: Dict mapping task names to (repo_name, commit) tuples | |
| for pre-configuring task base states | |
| Example (Basic): | |
| >>> env = GitTaskEnvironment(gitea_url="http://localhost:3000") | |
| >>> obs = env.reset() | |
| >>> # Clone and work | |
| >>> from ..models import GitAction | |
| >>> obs = env.step(GitAction(action_type="clone_repo", repo_name="my-repo")) | |
| >>> obs = env.step(GitAction(action_type="execute_git_command", command="status", working_dir="my-repo")) | |
| Example (Task-based): | |
| >>> # Pre-configure tasks with specific repo states | |
| >>> env = GitTaskEnvironment( | |
| ... gitea_url="http://localhost:3000", | |
| ... task_repos={ | |
| ... "task1": ("my-repo", "abc123"), # Specific commit | |
| ... "task2": ("my-repo", "def456"), # Different commit | |
| ... } | |
| ... ) | |
| >>> # Reset to task1 base state | |
| >>> obs = env.reset(task_id="task1") # Fast! Just git reset | |
| >>> # Work on task... | |
| >>> # Reset to task2 base state | |
| >>> obs = env.reset(task_id="task2") # Fast reset to different state | |
| """ | |
| def __init__( | |
| self, | |
| gitea_url: str, | |
| username: str, | |
| password: str, | |
| workspace_dir: str = "/workspace", | |
| task_repos: dict[str, tuple[str, str]] | None = None, | |
| ): | |
| """Initialize Git Task Environment.""" | |
| super().__init__() | |
| self.workspace_dir = workspace_dir | |
| self.task_repos = task_repos or {} | |
| # Initialize Git server client (connects to external Gitea) | |
| self._git_client = GitServerClient( | |
| gitea_url=gitea_url, | |
| username=username, | |
| password=password, | |
| workspace_dir=workspace_dir, | |
| ) | |
| # Initialize state | |
| self._state = GitState(workspace_path=workspace_dir) | |
| self._current_task_id: str | None = None | |
| # Wait for Gitea to be ready | |
| if self._git_client.wait_for_ready(): | |
| self._state.gitea_ready = True | |
| else: | |
| print("Warning: Gitea server not ready") | |
| self._state.gitea_ready = False | |
| def reset(self, task_id: str | None = None) -> Observation: | |
| """ | |
| Reset environment to clean state. | |
| This is optimized for task-based workflows: | |
| - If task_id specified and configured: fast reset to that task's base state | |
| - If workspace exists: git reset --hard (very fast, <1s) | |
| - Otherwise: clone from Gitea (slower, ~5-10s) | |
| Args: | |
| task_id: Optional task identifier for task-specific base states | |
| Returns: | |
| Initial observation indicating environment is ready | |
| """ | |
| # Initialize fresh state | |
| self._state = GitState( | |
| episode_id=str(uuid.uuid4()), | |
| step_count=0, | |
| gitea_ready=self._git_client.is_ready, | |
| workspace_path=self.workspace_dir, | |
| ) | |
| self._current_task_id = task_id | |
| # If task_id provided and configured, set up task base state | |
| if task_id and task_id in self.task_repos: | |
| repo_name, commit = self.task_repos[task_id] | |
| try: | |
| if self._git_client.workspace_exists(repo_name): | |
| # Fast path: workspace exists, just reset | |
| self._git_client.reset_workspace(repo_name, commit) | |
| message = f"Reset to task '{task_id}' base state (repo: {repo_name}@{commit})" | |
| else: | |
| # Slower path: clone fresh | |
| self._git_client.clone_to_workspace(repo_name, commit=commit) | |
| message = f"Initialized task '{task_id}' (repo: {repo_name}@{commit})" | |
| current_commit = self._git_client.get_current_commit(repo_name) | |
| return GitObservation( | |
| success=True, | |
| message=message, | |
| output=f"Workspace: {self.workspace_dir}/{repo_name}\nCommit: {current_commit}\nTask: {task_id}", | |
| ) | |
| except Exception as e: | |
| return GitObservation( | |
| success=False, | |
| message=f"Failed to reset task '{task_id}'", | |
| error=str(e), | |
| ) | |
| # Default reset: just ready state, no pre-configured repos | |
| return GitObservation( | |
| success=True, | |
| message="Git task environment ready.", | |
| output=f"Workspace: {self.workspace_dir}\nGitea: {self._git_client.gitea_url}\nUse GitAction with action_type='clone_repo' to clone repositories.", | |
| ) | |
| def step(self, action: Action) -> Observation: | |
| """ | |
| Execute a Git action and return observation. | |
| Supported action types: | |
| - "clone_repo": Clone repository to workspace | |
| - "execute_git_command": Execute git command | |
| - "list_repos": List available repositories | |
| Args: | |
| action: GitAction to execute | |
| Returns: | |
| GitObservation with execution results | |
| """ | |
| if not isinstance(action, GitAction): | |
| raise ValueError(f"Expected GitAction, got {type(action)}") | |
| # Update step count | |
| self._state.step_count += 1 | |
| # Route to appropriate handler based on action_type | |
| try: | |
| if action.action_type == "clone_repo": | |
| return self._handle_clone_repo(action) | |
| elif action.action_type == "list_repos": | |
| return self._handle_list_repos(action) | |
| elif action.action_type == "execute_git_command": | |
| return self._handle_git_command(action) | |
| else: | |
| return GitObservation( | |
| success=False, | |
| message=f"Action not supported in task mode: {type(action).__name__}", | |
| error="Use shared Gitea for repository migration/creation", | |
| ) | |
| except Exception as e: | |
| return GitObservation( | |
| success=False, message=f"Action failed: {str(e)}", error=str(e) | |
| ) | |
| def _handle_clone_repo(self, action: GitAction) -> GitObservation: | |
| """Handle repository clone action.""" | |
| try: | |
| # Determine commit to use | |
| commit = "main" # Default | |
| # If this repo is part of current task config, use that commit | |
| if ( | |
| self._current_task_id | |
| and self._current_task_id in self.task_repos | |
| ): | |
| task_repo, task_commit = self.task_repos[self._current_task_id] | |
| if task_repo == action.repo_name: | |
| commit = task_commit | |
| clone_path = self._git_client.clone_to_workspace( | |
| action.repo_name, action.target_dir, commit=commit | |
| ) | |
| return GitObservation( | |
| success=True, | |
| message=f"Successfully cloned {action.repo_name}", | |
| output=f"Cloned to: {clone_path}\nCommit: {commit}", | |
| ) | |
| except Exception as e: | |
| return GitObservation( | |
| success=False, | |
| message=f"Failed to clone repository: {action.repo_name}", | |
| error=str(e), | |
| ) | |
| def _handle_list_repos(self, action: GitAction) -> GitObservation: | |
| """Handle list repositories action.""" | |
| try: | |
| repos = self._git_client.list_repositories() | |
| # Format output | |
| if not repos: | |
| output = "No repositories available." | |
| else: | |
| output = "Available repositories:\n" | |
| for repo in repos: | |
| output += f" - {repo['name']}: {repo['clone_url']}\n" | |
| if repo.get("description"): | |
| output += f" {repo['description']}\n" | |
| return GitObservation( | |
| success=True, | |
| message=f"Found {len(repos)} repositories", | |
| output=output, | |
| repos=repos, | |
| ) | |
| except Exception as e: | |
| return GitObservation( | |
| success=False, message="Failed to list repositories", error=str(e) | |
| ) | |
| def _handle_git_command(self, action: GitAction) -> GitObservation: | |
| """Handle git command execution action.""" | |
| try: | |
| exit_code, stdout, stderr = self._git_client.execute_git_command( | |
| action.command, action.working_dir | |
| ) | |
| success = exit_code == 0 | |
| message = f"Git command {'succeeded' if success else 'failed'}" | |
| return GitObservation( | |
| success=success, message=message, output=stdout, error=stderr | |
| ) | |
| except Exception as e: | |
| return GitObservation( | |
| success=False, | |
| message=f"Failed to execute git command: {action.command}", | |
| error=str(e), | |
| ) | |
| def state(self) -> GitState: | |
| """Get current environment state.""" | |
| return self._state | |