Spaces:
Running
Running
| import uuid | |
| import traceback | |
| import logging | |
| from enum import Enum | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional, List | |
| from services.supabase_service import db | |
| from services.github_service import GitHubService | |
| logger = logging.getLogger(__name__) | |
| class ErrorCategory(Enum): | |
| GITHUB_API = "github_api" | |
| GEMINI_API = "gemini_api" | |
| SUPABASE = "supabase" | |
| FIREBASE_AUTH = "firebase_auth" | |
| WEBHOOK = "webhook" | |
| AGENT_TASK = "agent_task" | |
| CONTEXT_BUILD = "context_build" | |
| TEMPLATE = "template" | |
| VERSIONING = "versioning" | |
| EXTERNAL_API = "external_api" | |
| class ErrorService: | |
| def __init__(self): | |
| self.github_service = GitHubService() | |
| async def log_error( | |
| self, | |
| category: ErrorCategory, | |
| error: Exception, | |
| context: Dict[str, Any], | |
| repo_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| severity: str = "error" | |
| ): | |
| """Log an error to the database and optionally create a GitHub issue.""" | |
| error_msg = str(error) | |
| tb = traceback.format_exc() | |
| logger.error(f"[ERROR_SVC] Logging {severity} error in {category.value}: {error_msg}") | |
| if repo_id: | |
| logger.error(f"[ERROR_SVC] Repo ID: {repo_id}") | |
| if task_id: | |
| logger.error(f"[ERROR_SVC] Task ID: {task_id}") | |
| log_data = { | |
| "repo_id": repo_id, | |
| "task_id": task_id, | |
| "category": category.value, | |
| "severity": severity, | |
| "message": error_msg, | |
| "traceback": tb, | |
| "context": context | |
| } | |
| # Save to Supabase | |
| try: | |
| result = db.client.table("error_logs").insert(log_data).execute() | |
| log_record = result.data[0] if result.data else None | |
| logger.info(f"[ERROR_SVC] Error logged to database with ID: {log_record.get('id') if log_record else 'unknown'}") | |
| except Exception as e: | |
| logger.error(f"[ERROR_SVC] Failed to log error to Supabase: {e}") | |
| return None | |
| # If critical and we have a repo_id, create a GitHub issue | |
| if severity == "critical" and repo_id: | |
| logger.warning(f"[ERROR_SVC] Critical error detected for repo {repo_id}. Attempting to create GitHub issue.") | |
| try: | |
| repo = await db.get_repo_by_id(repo_id) | |
| if repo and repo.get("github_full_name"): | |
| issue_title = f"[ContriBot Error] {category.value}: {error_msg[:50]}..." | |
| issue_body = f""" | |
| ## ContriBot Critical Error | |
| An automated critical error was detected by ContriBot. | |
| **Category:** {category.value} | |
| **Context:** | |
| ```json | |
| {context} | |
| ``` | |
| **Traceback:** | |
| ```python | |
| {tb} | |
| ``` | |
| """ | |
| logger.info(f"[ERROR_SVC] Would create GitHub issue on {repo['github_full_name']}: {issue_title}") | |
| # We would need the user's installation token here, but for now we'll just log it | |
| # In a real scenario, we'd fetch the installation token and create the issue | |
| # await self.github_service.create_issue(installation_id, repo["github_full_name"], issue_title, issue_body, labels=["contribot-error"]) | |
| pass | |
| except Exception as e: | |
| logger.error(f"[ERROR_SVC] Failed to create GitHub issue for critical error: {e}") | |
| return log_record | |
| async def get_errors(self, repo_id: Optional[str] = None, category: Optional[str] = None, since_hours: int = 24) -> List[Dict[str, Any]]: | |
| """Get recent errors, optionally filtered by repo or category.""" | |
| time_threshold = (datetime.utcnow() - timedelta(hours=since_hours)).isoformat() | |
| query = db.client.table("error_logs").select("*").gte("created_at", time_threshold).order("created_at", desc=True) | |
| if repo_id: | |
| query = query.eq("repo_id", repo_id) | |
| if category: | |
| query = query.eq("category", category) | |
| result = query.execute() | |
| return result.data | |
| async def get_error_summary(self) -> Dict[str, Any]: | |
| """Get a summary of recent errors.""" | |
| time_threshold = (datetime.utcnow() - timedelta(hours=24)).isoformat() | |
| # Get all recent errors | |
| result = db.client.table("error_logs").select("category, severity, resolved").gte("created_at", time_threshold).execute() | |
| errors = result.data | |
| summary = { | |
| "total_24h": len(errors), | |
| "unresolved": len([e for e in errors if not e.get("resolved")]), | |
| "critical": len([e for e in errors if e.get("severity") == "critical"]), | |
| "by_category": {} | |
| } | |
| for e in errors: | |
| cat = e.get("category") | |
| if cat not in summary["by_category"]: | |
| summary["by_category"][cat] = 0 | |
| summary["by_category"][cat] += 1 | |
| return summary | |
| async def clear_old_errors(self, days: int = 30): | |
| """Delete errors older than the specified number of days.""" | |
| time_threshold = (datetime.utcnow() - timedelta(days=days)).isoformat() | |
| db.client.table("error_logs").delete().lt("created_at", time_threshold).execute() | |
| async def mark_resolved(self, error_id: str, resolved: bool = True): | |
| """Mark an error as resolved or unresolved.""" | |
| result = db.client.table("error_logs").update({"resolved": resolved}).eq("id", error_id).execute() | |
| return result.data[0] if result.data else None | |
| error_service = ErrorService() | |