| | from fastapi import FastAPI, BackgroundTasks, HTTPException, Request |
| | from pydantic import BaseModel |
| | from typing import List, Optional, Dict, Any |
| | import logging |
| | import os |
| | import json |
| |
|
| | |
| | from agents.technical_auditor import TechnicalAuditorAgent |
| | from agents.content_optimizer import ContentOptimizationAgent |
| | from agents.competitor_intelligence import CompetitorIntelligenceAgent |
| | from agents.backlink_indexing import BacklinkIndexingAgent |
| | from agents.performance_analytics import PerformanceAnalyticsAgent |
| | from agents.orchestrator import OrchestratorAgent |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | app = FastAPI(title="SEO Multi-Agent System", version="1.0.0") |
| |
|
| | def setup_credentials_from_env(): |
| | """Security: Write env vars to files so agents can use them without committing secrets to repo""" |
| | os.makedirs("/app/credentials", exist_ok=True) |
| | |
| | |
| | gsc_json = os.environ.get("GSC_CREDENTIALS") |
| | if gsc_json: |
| | print("Loading GSC credentials from Environment Variable") |
| | with open("/app/credentials/gsc-credentials.json", "w") as f: |
| | f.write(gsc_json) |
| | |
| | |
| | ga4_json = os.environ.get("GA4_CREDENTIALS") |
| | if ga4_json: |
| | print("Loading GA4 credentials from Environment Variable") |
| | with open("/app/credentials/ga4-credentials.json", "w") as f: |
| | f.write(ga4_json) |
| |
|
| | |
| | try: |
| | with open("/app/credentials/gsc-credentials.json", "r") as f: |
| | creds = json.load(f) |
| | print(f"🕵️ DEBUG: Using Service Account Email: {creds.get('client_email')}") |
| | print(f"🕵️ DEBUG: Make sure THIS email is an Owner in GSC for https://fixyfile.com") |
| | except Exception as e: |
| | print(f"Could not read credentials for debug: {e}") |
| |
|
| | |
| | setup_credentials_from_env() |
| |
|
| | def get_credential_path(filename): |
| | """Helper to find credentials in common paths""" |
| | possible_paths = [ |
| | f"/app/credentials/{filename}", |
| | f"/app/{filename}", |
| | f"./{filename}", |
| | filename |
| | ] |
| | for path in possible_paths: |
| | if os.path.exists(path): |
| | return path |
| | return f"/app/credentials/{filename}" |
| |
|
| | |
| | |
| | technical_agent = TechnicalAuditorAgent() |
| | content_agent = ContentOptimizationAgent() |
| | competitor_agent = CompetitorIntelligenceAgent() |
| | indexing_agent = BacklinkIndexingAgent( |
| | gsc_credentials_path=get_credential_path("gsc-credentials.json"), |
| | site_url="https://fixyfile.com" |
| | ) |
| | analytics_agent = PerformanceAnalyticsAgent( |
| | ga4_property_id="YOUR_GA4_ID", |
| | ga4_credentials_path=get_credential_path("ga4-credentials.json"), |
| | gsc_credentials_path=get_credential_path("gsc-credentials.json"), |
| | |
| | site_url="sc-domain:fixyfile.com" |
| | ) |
| | |
| | orchestrator = OrchestratorAgent(redis_host=None, redis_port=None, agents={}) |
| |
|
| | |
| |
|
| | class PageAuditRequest(BaseModel): |
| | url: str |
| | checks: Optional[List[str]] = ["all"] |
| |
|
| | class ContentOptimizeRequest(BaseModel): |
| | url: str |
| | tasks: Optional[List[str]] = ["all"] |
| |
|
| | class CompetitorAnalysisRequest(BaseModel): |
| | keyword: str |
| |
|
| | class IndexingRequest(BaseModel): |
| | urls: List[str] |
| |
|
| | class WorkflowTrigger(BaseModel): |
| | workflow_name: str |
| | payload: Dict[str, Any] |
| |
|
| | |
| |
|
| | @app.get("/") |
| | async def root(): |
| | return { |
| | "service": "SEO Multi-Agent System", |
| | "status": "running", |
| | "agents": ["technical", "content", "competitor", "indexing", "performance"] |
| | } |
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | |
| | return {"status": "healthy", "orchestrator": "online"} |
| |
|
| | |
| | @app.post("/audit/execute") |
| | async def execute_audit(request: PageAuditRequest, background_tasks: BackgroundTasks): |
| | background_tasks.add_task(technical_agent.audit_page, request.url) |
| | return {"status": "started", "url": request.url} |
| |
|
| | |
| | @app.post("/optimize") |
| | async def optimize_content(request: ContentOptimizeRequest): |
| | result = content_agent.analyze_page(request.url) |
| | return result |
| |
|
| | |
| | @app.post("/competitor/analyze") |
| | async def analyze_competitor(request: CompetitorAnalysisRequest, background_tasks: BackgroundTasks): |
| | background_tasks.add_task(competitor_agent.generate_competitive_report, request.keyword) |
| | return {"status": "started", "keyword": request.keyword} |
| |
|
| | |
| | @app.post("/index/submit") |
| | async def submit_indexing(request: IndexingRequest): |
| | results = indexing_agent.auto_submit_new_pages(request.urls) |
| | return results |
| |
|
| | @app.get("/index/status") |
| | async def indexing_status(): |
| | status = indexing_agent.check_indexing_status() |
| | errors = indexing_agent.get_indexing_errors() |
| | return {"indexed_pages": status, "errors": errors} |
| |
|
| | |
| | @app.get("/analytics/underperforming") |
| | async def get_underperforming(): |
| | try: |
| | pages = analytics_agent.get_underperforming_pages(days=30) |
| | return pages |
| | except Exception as e: |
| | logger.error(f"Error getting underperforming pages: {e}") |
| | return [] |
| |
|
| | @app.get("/report/weekly") |
| | async def weekly_report(): |
| | try: |
| | report = analytics_agent.generate_weekly_report() |
| | return report |
| | except Exception as e: |
| | logger.error(f"Error generating weekly report: {e}") |
| | return {"error": str(e)} |
| |
|
| | @app.get("/monitor/algorithm") |
| | async def check_algorithm(): |
| | try: |
| | status = analytics_agent.detect_algorithm_update() |
| | return status |
| | except Exception as e: |
| | return {"volatility_detected": False, "error": str(e)} |
| |
|
| | @app.get("/monitor/cwv") |
| | async def check_cwv(url: str): |
| | result = analytics_agent.monitor_core_web_vitals(url) |
| | return result |
| |
|
| | |
| | @app.post("/workflow/trigger") |
| | async def trigger_workflow(request: WorkflowTrigger): |
| | workflow_id = orchestrator.create_workflow( |
| | request.workflow_name, |
| | request.payload |
| | ) |
| | return {"workflow_id": workflow_id, "status": "started"} |
| |
|