| | |
| | """ |
| | MutSyncHub Analytics Engine |
| | Enterprise-grade AI analytics platform with zero-cost inference |
| | # """ |
| | import logging |
| | import os |
| | import time |
| | import uuid |
| | import subprocess |
| | import asyncio |
| | import threading |
| | import pathlib |
| | import json |
| |
|
| | |
| | from fastapi import FastAPI, Depends, HTTPException, Request, Query, BackgroundTasks |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from fastapi.responses import JSONResponse |
| | from contextlib import asynccontextmanager |
| |
|
| | |
| | from app.core.event_hub import event_hub |
| | |
| | |
| | |
| | from app.core.worker_manager import get_worker_manager |
| | from app.deps import rate_limit_org, verify_api_key, check_all_services |
| | from app.tasks.analytics_worker import trigger_kpi_computation |
| | from app.service.vector_service import cleanup_expired_vectors |
| | from app.routers import health, datasources, reports, flags, scheduler, analytics_stream,ai_query,schema |
| | from app.service.llm_service import load_llm_service |
| | from app.deps import get_qstash_client |
| | from prometheus_client import make_asgi_app |
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", |
| | datefmt="%Y-%m-%d %H:%M:%S" |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | def safe_redis_decode(value): |
| | """Safely decode Redis values that might be bytes or str""" |
| | if isinstance(value, bytes): |
| | return value.decode('utf-8') |
| | return value |
| | |
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | """ |
| | Enterprise startup/shutdown sequence with health validation. |
| | """ |
| | |
| | logger.info("=" * 60) |
| | logger.info("π ANALYTICS ENGINE v3.0 - STARTUP SEQUENCE") |
| | logger.info("=" * 60) |
| | |
| | app.state.instance_id = f"engine-{uuid.uuid4().hex[:8]}" |
| | logger.info(f"Instance ID: {app.state.instance_id}") |
| | logger.info("π STARTUP SEQUENCE") |
| | |
| | |
| | os.makedirs("/data/hf_cache", exist_ok=True) |
| | os.environ["HF_HOME"] = "/data/hf_cache" |
| | os.environ["TRANSFORMERS_CACHE"] = "/data/hf_cache" |
| | os.environ["HF_HUB_CACHE"] = "/data/hf_cache" |
| | |
| | |
| | cache_dir = pathlib.Path("/data/hf_cache") |
| | home_cache = pathlib.Path.home() / ".cache" / "huggingface" |
| | if not home_cache.exists(): |
| | home_cache.parent.mkdir(parents=True, exist_ok=True) |
| | home_cache.symlink_to(cache_dir) |
| | |
| | try: |
| | services = check_all_services() |
| | healthy = [k for k, v in services.items() if "β
" in str(v)] |
| | unhealthy = [k for k, v in services.items() if "β" in str(v)] |
| | |
| | logger.info(f"β
Healthy: {len(healthy)} services") |
| | for svc in healthy: |
| | logger.info(f" β {svc}: {services[svc]}") |
| | |
| | if unhealthy: |
| | logger.warning(f"β οΈ Unhealthy: {len(unhealthy)} services") |
| | for svc in unhealthy: |
| | logger.warning(f" β {svc}: {services[svc]}") |
| | |
| | except Exception as e: |
| | logger.error(f"π΄ Startup health check failed: {e}") |
| | |
| | |
| | scheduler_process = None |
| | if os.getenv("DISABLE_SCHEDULER") != "1": |
| | try: |
| | scheduler_process = subprocess.Popen(["python", "/app/scheduler_loop.py"]) |
| | logger.info(f"β
Scheduler started (PID: {scheduler_process.pid})") |
| | except Exception as e: |
| | logger.warning(f"β οΈ Scheduler failed to start: {e}") |
| | else: |
| | logger.info("βΉοΈ Scheduler start skipped (DISABLE_SCHEDULER=1)") |
| | |
| | logger.info("β
Startup sequence complete") |
| |
|
| | |
| | |
| | |
| | if os.getenv("DISABLE_WORKER_MANAGER") != "1": |
| | logger.info("π starting worker manager...") |
| | try: |
| | |
| | worker_manager = await get_worker_manager() |
| | asyncio.create_task(worker_manager.start_listener(), name="worker-manager") |
| | except Exception as e: |
| | logger.error(f"β Failed to start worker manager: {e}") |
| | else: |
| | logger.info("βΉοΈ Worker manager start skipped (DISABLE_WORKER_MANAGER=1)") |
| | |
| | if os.getenv("DISABLE_LLM_LOAD") != "1": |
| | try: |
| | load_llm_service() |
| | logger.info("π€ LLM service loading in background...") |
| | except Exception as e: |
| | logger.error(f"β LLM load failed: {e}") |
| | else: |
| | logger.info("βΉοΈ LLM loading skipped (DISABLE_LLM_LOAD=1)") |
| |
|
| | |
| | if os.getenv("DISABLE_QSTASH") != "1": |
| | try: |
| | get_qstash_client() |
| | logger.info("β
QStash ready") |
| | except RuntimeError as e: |
| | logger.warning(f"β οΈ QStash disabled: {e}") |
| | else: |
| | logger.info("βΉοΈ QStash initialization skipped (DISABLE_QSTASH=1)") |
| | yield |
| | |
| | |
| | logger.info("=" * 60) |
| | logger.info("π ANALYTICS ENGINE - SHUTDOWN SEQUENCE") |
| | logger.info("=" * 60) |
| | |
| | |
| | scheduler_process.terminate() |
| | logger.info(" β Stopped scheduler") |
| | |
| | |
| | from app.deps import _org_db_connections, _vector_db_conn |
| | |
| | if _org_db_connections: |
| | for org_id, conn in _org_db_connections.items(): |
| | try: |
| | conn.close() |
| | logger.info(f" β Closed DB: {org_id}") |
| | except Exception: |
| | pass |
| | |
| | if _vector_db_conn: |
| | try: |
| | _vector_db_conn.close() |
| | logger.info(" β Closed Vector DB") |
| | except Exception: |
| | pass |
| | |
| | logger.info("β
Shutdown complete") |
| |
|
| | |
| | app = FastAPI( |
| | title="MutSyncHub Analytics Engine", |
| | version="3.0.0", |
| | description="""Enterprise-grade AI analytics engine with: |
| | |
| | β’ Hybrid entity detection (Rule-based + LLM) |
| | β’ Vector similarity search (DuckDB VSS) |
| | β’ Zero external API costs (Local Mistral-7B) |
| | β’ Multi-tenant data isolation |
| | β’ Redis-backed async processing |
| | |
| | **π All endpoints require X-API-KEY header except /health**""", |
| | lifespan=lifespan, |
| | docs_url="/api/docs", |
| | redoc_url="/api/redoc", |
| | openapi_url="/api/openapi.json", |
| | contact={ |
| | "name": "MutSyncHub Enterprise", |
| | "email": "enterprise@mutsynchub.com" |
| | }, |
| | license_info={ |
| | "name": "MIT License", |
| | } |
| | ) |
| | metrics_app = make_asgi_app() |
| | app.mount("/metrics", metrics_app) |
| |
|
| | |
| | @app.on_event("startup") |
| | async def start_workers(): |
| | """π Start Einstein+Elon engine""" |
| | |
| | |
| | |
| | logger.info("β
Worker manager will handle trigger events") |
| | |
| | |
| | def run_cleanup(): |
| | while True: |
| | cleanup_expired_vectors() |
| | time.sleep(86400) |
| | |
| | cleanup_thread = threading.Thread(target=run_cleanup, daemon=True) |
| | cleanup_thread.start() |
| | logger.info("β
Vector cleanup scheduler started") |
| |
|
| | |
| | @app.middleware("http") |
| | async def add_request_tracking(request: Request, call_next): |
| | """ |
| | Add request ID and timing for observability. |
| | """ |
| | request_id = f"req-{uuid.uuid4().hex[:12]}" |
| | request.state.request_id = request_id |
| | |
| | start_time = time.time() |
| | response = await call_next(request) |
| | process_time = time.time() - start_time |
| | |
| | |
| | response.headers["X-Request-ID"] = request_id |
| | response.headers["X-Response-Time"] = f"{process_time:.3f}s" |
| | |
| | |
| | logger.info( |
| | f"{request.method} {request.url.path} | {response.status_code} " |
| | f"| {process_time:.3f}s | {request_id}" |
| | ) |
| | |
| | return response |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | |
| | @app.post("/api/v1/kpi/compute") |
| | async def compute_kpis( |
| | background_tasks: BackgroundTasks, |
| | org_id: str = Query(..., description="Organization ID"), |
| | source_id: str = Query(..., description="Data source ID"), |
| | api_key: str = Depends(verify_api_key), |
| | limited_org: str = Depends(rate_limit_org(max_requests=50)) |
| | ): |
| | """ |
| | Trigger KPI computation. |
| | Returns immediately; results published to Redis stream. |
| | """ |
| | try: |
| | |
| | cached = event_hub.get_key(f"kpi_cache:{org_id}:{source_id}") |
| | if cached: |
| | return { |
| | "status": "cached", |
| | "org_id": org_id, |
| | "data": json.loads(cached), |
| | "rate_limit": { |
| | "remaining": 50, |
| | "reset_in": 60 |
| | } |
| | } |
| | |
| | |
| | background_tasks.add_task(trigger_kpi_computation, org_id, source_id) |
| | |
| | return { |
| | "status": "processing", |
| | "org_id": org_id, |
| | "message": "KPI computation queued. Poll /analytics/stream/recent for results.", |
| | "poll_url": f"/api/v1/analytics/stream/recent?org_id={org_id}&source_id={source_id}" |
| | } |
| | except Exception as e: |
| | logger.error(f"β KPI compute error: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | |
| | async def continuous_kpi_refresh(): |
| | """ |
| | Auto-refresh KPIs every 5 minutes for active organizations. |
| | """ |
| | await asyncio.sleep(10) |
| | |
| | while True: |
| | try: |
| | logger.debug("π KPI scheduler tick...") |
| | |
| | active_keys = event_hub.keys("entity:*") |
| | for key in active_keys[:10]: |
| | key_parts = safe_redis_decode(key).split(":") |
| | if len(key_parts) >= 3: |
| | org_id, source_id = key_parts[1], key_parts[2] |
| |
|
| | |
| | cache_key = f"kpi_cache:{org_id}:{source_id}" |
| | if event_hub.exists(cache_key): |
| | continue |
| |
|
| | |
| | if event_hub.exists(f"worker:lock:{org_id}:{source_id}"): |
| | continue |
| |
|
| | |
| | logger.info(f"β° Auto-triggering KPIs for {org_id}/{source_id}") |
| | await trigger_kpi_computation(org_id, source_id) |
| | await asyncio.sleep(1) |
| | |
| | except Exception as e: |
| | logger.error(f"β Scheduler error: {e}") |
| | |
| | await asyncio.sleep(300) |
| | @app.get("/debug/stream-content") |
| | def debug_stream( |
| | org_id: str = Query(...), |
| | source_id: str = Query(...), |
| | api_key: str = Depends(verify_api_key) |
| | ): |
| | """See what's actually in the Redis stream""" |
| | stream_key = f"stream:analytics:{org_id}:{source_id}" |
| | events = event_hub.read_recent_stream(stream_key, 10) |
| | |
| | |
| | entity_key = f"entity:{org_id}:{source_id}" |
| | industry_key = f"industry:{org_id}:{source_id}" |
| | |
| | return { |
| | "stream_key": stream_key, |
| | "events_count": len(events), |
| | "events": events, |
| | "entity_exists": bool(event_hub.get_key(entity_key)), |
| | "industry_exists": bool(event_hub.get_key(industry_key)), |
| | "entity_data": event_hub.get_key(entity_key), |
| | "industry_data": event_hub.get_key(industry_key), |
| | } |
| | @app.post("/api/v1/cache/clear") |
| | def clear_cache(org_id: str, source_id: str, api_key: str = Depends(verify_api_key)): |
| | """Clear entity/industry caches to force fresh reads""" |
| | cache_key = (org_id, source_id) |
| | |
| | |
| | from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE |
| | |
| | if cache_key in _ENTITY_CACHE: |
| | del _ENTITY_CACHE[cache_key] |
| | if cache_key in _INDUSTRY_CACHE: |
| | del _INDUSTRY_CACHE[cache_key] |
| | |
| | return {"status": "cleared", "cache_key": str(cache_key)} |
| |
|
| | |
| | @app.get("/", tags=["root"]) |
| | def read_root(): |
| | """ |
| | Service information and discovery. |
| | """ |
| | return { |
| | "status": "operational", |
| | "service": "MutSyncHub Analytics Engine", |
| | "version": "3.0.0", |
| | "mode": "production" if os.getenv("SPACE_ID") else "development", |
| | "instance_id": app.state.instance_id, |
| | "endpoints": { |
| | "docs": "/api/docs", |
| | "health": "/api/health/detailed", |
| | "datasources": "/api/datasources", |
| | }, |
| | "features": [ |
| | "Hybrid entity detection", |
| | "Vector similarity search", |
| | "Multi-tenant isolation", |
| | "Redis-backed async processing" |
| | ] |
| | } |
| |
|
| | |
| | ALLOWED_ORIGINS = [ |
| | "https://mut-sync-hub.vercel.app", |
| | "http://localhost:3000", |
| | "https://studio.huggingface.co", |
| | ] |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=ALLOWED_ORIGINS, |
| | allow_credentials=True, |
| | allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], |
| | allow_headers=["*"], |
| | expose_headers=["X-Request-ID", "X-Response-Time"], |
| | max_age=3600, |
| | ) |
| |
|
| | |
| | @app.exception_handler(Exception) |
| | async def global_exception_handler(request: Request, exc: Exception): |
| | """ |
| | Catch all uncaught exceptions and return safe error response. |
| | """ |
| | logger.error( |
| | f"π΄ Unhandled error | Path: {request.url.path} | " |
| | f"Request ID: {request.state.request_id} | Error: {str(exc)}", |
| | exc_info=True |
| | ) |
| | |
| | return JSONResponse( |
| | status_code=500, |
| | content={ |
| | "error": "Internal server error", |
| | "message": "An unexpected error occurred. Check server logs.", |
| | "request_id": request.state.request_id, |
| | "timestamp": time.time() |
| | } |
| | ) |
| |
|
| | |
| | |
| | app.include_router(health.router, prefix="/health") |
| | app.include_router(datasources.router, prefix="/api/v1/datasources", dependencies=[Depends(verify_api_key)]) |
| | app.include_router(reports.router, prefix="/api/v1/reports", dependencies=[Depends(verify_api_key)]) |
| | app.include_router(flags.router, prefix="/api/v1/flags", dependencies=[Depends(verify_api_key)]) |
| | app.include_router(scheduler.router, prefix="/api/v1/scheduler", dependencies=[Depends(verify_api_key)]) |
| | app.include_router(analytics_stream.router, dependencies=[Depends(verify_api_key)]) |
| | app.include_router(ai_query.router, prefix="/api/v1/ai-query", dependencies=[Depends(verify_api_key)]) |
| | app.include_router(schema.router, prefix="/api/v1/schema", dependencies=[Depends(verify_api_key)]) |