Spaces:
Running
Running
| """ | |
| Cloud Coder Agent - Uses opencode CLI exclusively for code generation. | |
| This agent receives A2A tasks, uses the opencode CLI to generate code, | |
| commits to GitHub, and optionally creates PRs. | |
| """ | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional | |
| from app.github_ops import GitHubOps | |
| from app.prompt_templates import get_agent_prompt | |
| from app.utils.logger import logger | |
| # Configure paths - will be set by HF Space or environment | |
| REPO_DIR = Path(os.getenv("REPO_DIR", "/workspace/OpenSIN-Code")) | |
| if not REPO_DIR.exists(): | |
| # Try fallback paths | |
| for path in [ | |
| "/home/user/project/OpenSIN-Code", | |
| "/home/user/OpenSIN-Code", | |
| Path.cwd().parent / "OpenSIN-Code", | |
| ]: | |
| if Path(path).exists(): | |
| REPO_DIR = Path(path) | |
| break | |
| class CloudCoderAgent: | |
| """Autonomous cloud coder using opencode CLI.""" | |
| def __init__(self): | |
| self.status = "idle" | |
| self.current_task: Optional[str] = None | |
| self.agent_type = self._detect_agent_type() | |
| def _detect_agent_type(self) -> str: | |
| """Detect agent type from agent.json.""" | |
| try: | |
| meta = json.loads((ROOT / "agent.json").read_text()) | |
| agent_id = meta.get("id", "") | |
| # Extract type from id: a2a-sin-code-{type} | |
| if "a2a-sin-code-" in agent_id: | |
| return agent_id.split("a2a-sin-code-")[-1] | |
| except Exception: | |
| pass | |
| return "backend" # default fallback | |
| def get_status(self) -> str: | |
| """Return human-readable status.""" | |
| if self.current_task: | |
| return f"working on {self.current_task[:20]}..." | |
| return "idle" | |
| def is_healthy(self) -> bool: | |
| """Check if agent is healthy.""" | |
| # Basic health: can we run opencode? | |
| try: | |
| result = subprocess.run( | |
| ["opencode", "--version"], capture_output=True, text=True, timeout=5 | |
| ) | |
| return result.returncode == 0 | |
| except Exception: | |
| return False | |
| def process_task(self, task_id: str, task_data: Dict[str, Any], tasks_store: Dict): | |
| """Process a single A2A task.""" | |
| try: | |
| with tasks_store._lock if hasattr(tasks_store, "_lock") else None: | |
| tasks_store[task_id]["status"] = "running" | |
| tasks_store[task_id]["timestamp"] = str( | |
| subprocess.check_output(["date"]).decode().strip() | |
| ) | |
| self.status = "processing" | |
| self.current_task = task_data.get("description", "")[:50] | |
| # Extract task parameters | |
| description = task_data.get("description", "") | |
| task_type = task_data.get("type", self.agent_type) # Use detected type as fallback | |
| target_branch = task_data.get("target_branch") | |
| issue_number = task_data.get("issue_number") | |
| # Generate prompt using template | |
| prompt = get_agent_prompt(task_type, description, issue_number) | |
| # Call opencode CLI | |
| result = self._call_opencode(prompt) | |
| # Commit changes | |
| commit_msg = ( | |
| f"Fix #{issue_number}: {description[:50]}" | |
| if issue_number | |
| else f"Auto-fix: {description[:50]}" | |
| ) | |
| commit_sha = self._commit_changes(commit_msg) | |
| # Push branch if requested | |
| if target_branch: | |
| gh = GitHubOps(REPO_DIR) | |
| if gh.branch_exists_on_remote(target_branch): | |
| logger.info("Branch already exists on remote", branch=target_branch) | |
| else: | |
| push_success = gh.push_branch(target_branch) | |
| logger.info("Pushed branch", branch=target_branch, success=push_success) | |
| # Update task result | |
| with tasks_store._lock if hasattr(tasks_store, "_lock") else None: | |
| tasks_store[task_id]["status"] = "completed" | |
| tasks_store[task_id]["result"] = { | |
| "commit_sha": commit_sha, | |
| "branch": target_branch, | |
| "summary": result.get("summary", ""), | |
| "agent_type": self.agent_type, | |
| } | |
| logger.task(task_id, "completed", agent_type=self.agent_type) | |
| self.status = "idle" | |
| self.current_task = None | |
| except Exception as e: | |
| logger.error("Task failed", task_id=task_id, error=str(e)) | |
| with tasks_store._lock if hasattr(tasks_store, "_lock") else None: | |
| tasks_store[task_id]["status"] = "failed" | |
| tasks_store[task_id]["error"] = str(e) | |
| self.status = "idle" | |
| self.current_task = None | |
| except Exception as e: | |
| with tasks_store._lock if hasattr(tasks_store, "_lock") else None: | |
| tasks_store[task_id]["status"] = "failed" | |
| tasks_store[task_id]["error"] = str(e) | |
| self.status = "idle" | |
| self.current_task = None | |
| def _build_prompt(self, description: str, task_type: str, issue_number: Optional[int]) -> str: | |
| """Build a detailed prompt for opencode CLI.""" | |
| base_prompt = f"""You are an expert {task_type} developer working on the OpenSIN-Code project. | |
| TASK: {description} | |
| Issue Number: {issue_number if issue_number else "Not specified"} | |
| CRITICAL RULES: | |
| 1. Use ONLY opencode CLI for any tool/API calls. Never use direct git commands or external APIs. | |
| 2. Make minimal, surgical changes - fix the exact issue without refactoring unrelated code. | |
| 3. Follow existing code style and patterns in the repository. | |
| 4. Write or update tests if they are missing for the fix. | |
| 5. Ensure all imports are correct and dependencies are in package.json requirements. | |
| 6. If the fix requires new dependencies, add them to the appropriate files. | |
| 7. Do NOT create new branches - work on the existing branch. | |
| 8. After making changes, EXPLICITLY run tests if a test suite exists. | |
| 9. Output format: First a brief summary of what you changed, then the exact commands you ran (with --format json), then the results. | |
| WORKFLOW: | |
| 1. First, explore the codebase to understand the structure. | |
| 2. Locate the files that need to be changed. | |
| 3. Apply the fix. | |
| 4. Run tests (if available). | |
| 5. Provide a summary of changes. | |
| Begin by exploring the repository structure.""" | |
| # Add type-specific guidance | |
| if task_type == "plugin": | |
| base_prompt += "\n\nPLUGIN SPECIFIC: Ensure the plugin follows the OpenCode plugin architecture with proper manifest.json, entry points, and A2A integration." | |
| elif task_type == "command": | |
| base_prompt += "\n\nCOMMAND SPECIFIC: Implement CLI commands following the OpenCode command pattern with proper argument parsing and help text." | |
| elif task_type == "tool": | |
| base_prompt += "\n\nTOOL SPECIFIC: Create reusable tool functions with clear docstrings, type hints, and error handling." | |
| elif task_type == "backend": | |
| base_prompt += "\n\nBACKEND SPECIFIC: Focus on API endpoints, database models, business logic, and integrations." | |
| elif task_type == "frontend": | |
| base_prompt += "\n\nFRONTEND SPECIFIC: Implement UI components with proper accessibility, responsive design, and modern patterns." | |
| elif task_type == "fullstack": | |
| base_prompt += "\n\nFULLSTACK SPECIFIC: Ensure both frontend and backend are consistent; update API contracts and UI together." | |
| return base_prompt | |
| def _call_opencode(self, prompt: str) -> Dict[str, Any]: | |
| """Call opencode CLI with JSON-RPC streaming.""" | |
| try: | |
| # Run opencode with format=json to get structured output | |
| proc = subprocess.run( | |
| [ | |
| "opencode", | |
| "run", | |
| prompt, | |
| "--format", | |
| "json", | |
| "--fallback", | |
| "opencode/minimax-m2.5-free", | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=300, # 5 minute timeout | |
| cwd=str(REPO_DIR), | |
| ) | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"opencode failed: {proc.stderr}") | |
| # Parse streaming JSON responses | |
| result_parts = [] | |
| commit_sha = None | |
| files_changed = [] | |
| for line in proc.stdout.splitlines(): | |
| try: | |
| event = json.loads(line) | |
| if event.get("type") == "text": | |
| content = event.get("part", {}).get("text", "") | |
| result_parts.append(content) | |
| elif event.get("type") == "tool_call": | |
| # Track tool calls (e.g., shell commands) | |
| tool_name = event.get("tool_name", "") | |
| args = event.get("args", {}) | |
| if tool_name == "execute_shell_command": | |
| cmd = args.get("cmd", "") | |
| if "git commit" in cmd and commit_sha is None: | |
| # Extract commit SHA if available | |
| pass | |
| except json.JSONDecodeError: | |
| continue | |
| summary = "\n".join(result_parts) | |
| return { | |
| "summary": summary, | |
| "commit_sha": commit_sha, | |
| "files_changed": files_changed, | |
| "raw_output": proc.stdout, | |
| } | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError("opencode call timed out after 5 minutes") | |
| except Exception as e: | |
| raise RuntimeError(f"opencode execution failed: {e}") | |
| def _commit_changes(self, commit_msg: str) -> str: | |
| """Commit any pending changes to the repository.""" | |
| try: | |
| # Check git status | |
| subprocess.run( | |
| ["git", "status", "--porcelain"], | |
| capture_output=True, | |
| text=True, | |
| cwd=str(REPO_DIR), | |
| check=True, | |
| ) | |
| # Add all changes | |
| subprocess.run( | |
| ["git", "add", "."], capture_output=True, text=True, cwd=str(REPO_DIR), check=True | |
| ) | |
| # Commit | |
| result = subprocess.run( | |
| ["git", "commit", "-m", commit_msg], | |
| capture_output=True, | |
| text=True, | |
| cwd=str(REPO_DIR), | |
| ) | |
| if result.returncode == 0: | |
| # Extract commit SHA | |
| sha = result.stdout.strip().split()[-1] if result.stdout else "unknown" | |
| return sha | |
| else: | |
| # No changes to commit? | |
| if "nothing to commit" in result.stderr.lower(): | |
| return None | |
| raise RuntimeError(f"git commit failed: {result.stderr}") | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"git operation failed: {e}") | |