a2a-sin-code-command / app /coder_agent.py
SIN-Deploy-Bot
Deploy via automated script [skip ci]
8e2543b
Raw
History Blame Contribute Delete
11.2 kB
"""
Cloud Coder Agent - Uses opencode CLI exclusively for code generation.
This agent receives A2A tasks, uses the opencode CLI to generate code,
commits to GitHub, and optionally creates PRs.
"""
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, Optional
from app.github_ops import GitHubOps
from app.prompt_templates import get_agent_prompt
from app.utils.logger import logger
# Configure paths - will be set by HF Space or environment
REPO_DIR = Path(os.getenv("REPO_DIR", "/workspace/OpenSIN-Code"))
if not REPO_DIR.exists():
# Try fallback paths
for path in [
"/home/user/project/OpenSIN-Code",
"/home/user/OpenSIN-Code",
Path.cwd().parent / "OpenSIN-Code",
]:
if Path(path).exists():
REPO_DIR = Path(path)
break
class CloudCoderAgent:
"""Autonomous cloud coder using opencode CLI."""
def __init__(self):
self.status = "idle"
self.current_task: Optional[str] = None
self.agent_type = self._detect_agent_type()
def _detect_agent_type(self) -> str:
"""Detect agent type from agent.json."""
try:
meta = json.loads((ROOT / "agent.json").read_text())
agent_id = meta.get("id", "")
# Extract type from id: a2a-sin-code-{type}
if "a2a-sin-code-" in agent_id:
return agent_id.split("a2a-sin-code-")[-1]
except Exception:
pass
return "backend" # default fallback
def get_status(self) -> str:
"""Return human-readable status."""
if self.current_task:
return f"working on {self.current_task[:20]}..."
return "idle"
def is_healthy(self) -> bool:
"""Check if agent is healthy."""
# Basic health: can we run opencode?
try:
result = subprocess.run(
["opencode", "--version"], capture_output=True, text=True, timeout=5
)
return result.returncode == 0
except Exception:
return False
def process_task(self, task_id: str, task_data: Dict[str, Any], tasks_store: Dict):
"""Process a single A2A task."""
try:
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "running"
tasks_store[task_id]["timestamp"] = str(
subprocess.check_output(["date"]).decode().strip()
)
self.status = "processing"
self.current_task = task_data.get("description", "")[:50]
# Extract task parameters
description = task_data.get("description", "")
task_type = task_data.get("type", self.agent_type) # Use detected type as fallback
target_branch = task_data.get("target_branch")
issue_number = task_data.get("issue_number")
# Generate prompt using template
prompt = get_agent_prompt(task_type, description, issue_number)
# Call opencode CLI
result = self._call_opencode(prompt)
# Commit changes
commit_msg = (
f"Fix #{issue_number}: {description[:50]}"
if issue_number
else f"Auto-fix: {description[:50]}"
)
commit_sha = self._commit_changes(commit_msg)
# Push branch if requested
if target_branch:
gh = GitHubOps(REPO_DIR)
if gh.branch_exists_on_remote(target_branch):
logger.info("Branch already exists on remote", branch=target_branch)
else:
push_success = gh.push_branch(target_branch)
logger.info("Pushed branch", branch=target_branch, success=push_success)
# Update task result
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "completed"
tasks_store[task_id]["result"] = {
"commit_sha": commit_sha,
"branch": target_branch,
"summary": result.get("summary", ""),
"agent_type": self.agent_type,
}
logger.task(task_id, "completed", agent_type=self.agent_type)
self.status = "idle"
self.current_task = None
except Exception as e:
logger.error("Task failed", task_id=task_id, error=str(e))
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "failed"
tasks_store[task_id]["error"] = str(e)
self.status = "idle"
self.current_task = None
except Exception as e:
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "failed"
tasks_store[task_id]["error"] = str(e)
self.status = "idle"
self.current_task = None
def _build_prompt(self, description: str, task_type: str, issue_number: Optional[int]) -> str:
"""Build a detailed prompt for opencode CLI."""
base_prompt = f"""You are an expert {task_type} developer working on the OpenSIN-Code project.
TASK: {description}
Issue Number: {issue_number if issue_number else "Not specified"}
CRITICAL RULES:
1. Use ONLY opencode CLI for any tool/API calls. Never use direct git commands or external APIs.
2. Make minimal, surgical changes - fix the exact issue without refactoring unrelated code.
3. Follow existing code style and patterns in the repository.
4. Write or update tests if they are missing for the fix.
5. Ensure all imports are correct and dependencies are in package.json requirements.
6. If the fix requires new dependencies, add them to the appropriate files.
7. Do NOT create new branches - work on the existing branch.
8. After making changes, EXPLICITLY run tests if a test suite exists.
9. Output format: First a brief summary of what you changed, then the exact commands you ran (with --format json), then the results.
WORKFLOW:
1. First, explore the codebase to understand the structure.
2. Locate the files that need to be changed.
3. Apply the fix.
4. Run tests (if available).
5. Provide a summary of changes.
Begin by exploring the repository structure."""
# Add type-specific guidance
if task_type == "plugin":
base_prompt += "\n\nPLUGIN SPECIFIC: Ensure the plugin follows the OpenCode plugin architecture with proper manifest.json, entry points, and A2A integration."
elif task_type == "command":
base_prompt += "\n\nCOMMAND SPECIFIC: Implement CLI commands following the OpenCode command pattern with proper argument parsing and help text."
elif task_type == "tool":
base_prompt += "\n\nTOOL SPECIFIC: Create reusable tool functions with clear docstrings, type hints, and error handling."
elif task_type == "backend":
base_prompt += "\n\nBACKEND SPECIFIC: Focus on API endpoints, database models, business logic, and integrations."
elif task_type == "frontend":
base_prompt += "\n\nFRONTEND SPECIFIC: Implement UI components with proper accessibility, responsive design, and modern patterns."
elif task_type == "fullstack":
base_prompt += "\n\nFULLSTACK SPECIFIC: Ensure both frontend and backend are consistent; update API contracts and UI together."
return base_prompt
def _call_opencode(self, prompt: str) -> Dict[str, Any]:
"""Call opencode CLI with JSON-RPC streaming."""
try:
# Run opencode with format=json to get structured output
proc = subprocess.run(
[
"opencode",
"run",
prompt,
"--format",
"json",
"--fallback",
"opencode/minimax-m2.5-free",
],
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
cwd=str(REPO_DIR),
)
if proc.returncode != 0:
raise RuntimeError(f"opencode failed: {proc.stderr}")
# Parse streaming JSON responses
result_parts = []
commit_sha = None
files_changed = []
for line in proc.stdout.splitlines():
try:
event = json.loads(line)
if event.get("type") == "text":
content = event.get("part", {}).get("text", "")
result_parts.append(content)
elif event.get("type") == "tool_call":
# Track tool calls (e.g., shell commands)
tool_name = event.get("tool_name", "")
args = event.get("args", {})
if tool_name == "execute_shell_command":
cmd = args.get("cmd", "")
if "git commit" in cmd and commit_sha is None:
# Extract commit SHA if available
pass
except json.JSONDecodeError:
continue
summary = "\n".join(result_parts)
return {
"summary": summary,
"commit_sha": commit_sha,
"files_changed": files_changed,
"raw_output": proc.stdout,
}
except subprocess.TimeoutExpired:
raise RuntimeError("opencode call timed out after 5 minutes")
except Exception as e:
raise RuntimeError(f"opencode execution failed: {e}")
def _commit_changes(self, commit_msg: str) -> str:
"""Commit any pending changes to the repository."""
try:
# Check git status
subprocess.run(
["git", "status", "--porcelain"],
capture_output=True,
text=True,
cwd=str(REPO_DIR),
check=True,
)
# Add all changes
subprocess.run(
["git", "add", "."], capture_output=True, text=True, cwd=str(REPO_DIR), check=True
)
# Commit
result = subprocess.run(
["git", "commit", "-m", commit_msg],
capture_output=True,
text=True,
cwd=str(REPO_DIR),
)
if result.returncode == 0:
# Extract commit SHA
sha = result.stdout.strip().split()[-1] if result.stdout else "unknown"
return sha
else:
# No changes to commit?
if "nothing to commit" in result.stderr.lower():
return None
raise RuntimeError(f"git commit failed: {result.stderr}")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"git operation failed: {e}")