| """ |
| State-less scheduler β caller (Next-js) orchestrates storage & quota. |
| Only duty: run analytics on cron, return JSON. |
| """ |
| import asyncio |
| import pandas as pd |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| from app.engine.analytics import AnalyticsService |
| from app.service.industry_svc import (eda, forecast, basket, market_dynamics, |
| supply_chain, customer_insights, |
| operational_efficiency, risk_assessment, |
| sustainability) |
| from app.utils.detect_industry import detect_industry |
| from app.utils.email import send_pdf_email |
| import os |
| from datetime import datetime |
| import aiohttp |
|
|
| sched = AsyncIOScheduler() |
|
|
| |
| |
| |
| async def run_analytic_job(org_id: str, analytic_type: str, **kwargs) -> dict: |
| """ |
| 1. Canonify last 6 h of raw rows (any column names) via DuckDB |
| 2. Compute chosen analytic |
| 3. Log KPIs + purge old raw data |
| 4. Return shaped payload |
| """ |
| from app.mapper import canonify_df |
| from app.tasks.kpi_logger import log_kpis_and_purge |
|
|
| df = canonify_df(org_id) |
| if df.empty: |
| return {"error": "No recent data found"} |
|
|
| data = df.to_dict("records") |
| industry, _ = detect_industry(df) |
|
|
| match analytic_type: |
| case "eda": |
| result = await eda(data, industry) |
| case "forecast": |
| result = await forecast(data, kwargs["date_col"], kwargs["value_col"]) |
| case "basket": |
| result = await basket(data, 0.01, 0.3, 1.0) |
| case "market-dynamics": |
| result = await market_dynamics(data) |
| case "supply-chain": |
| result = await supply_chain(data) |
| case "customer-insights": |
| result = await customer_insights(data) |
| case "operational-efficiency": |
| result = await operational_efficiency(data) |
| case "risk-assessment": |
| result = await risk_assessment(data) |
| case "sustainability": |
| result = await sustainability(data) |
| case _: |
| return {"error": "Unknown analytic"} |
|
|
| |
| log_kpis_and_purge(org_id) |
| |
| async with aiohttp.ClientSession() as session: |
| await session.post( |
| f"{os.getenv('NEXT_PUBLIC_ORIGIN')}/analytics/report/sync", |
| json={ |
| "orgId": org_id, |
| "type": analytic_type, |
| "results": result, |
| "lastRun": datetime.utcnow().isoformat(), |
| }, |
| headers={"x-api-key": os.getenv("ANALYTICS_KEY")}, |
| ) |
| |
| pdf_url = f"{os.getenv('PUBLIC_URL', '')}/api/reports/{org_id}/{analytic_type}.pdf" |
| asyncio.create_task(send_pdf_email(org_id, f"{analytic_type} report", {"pdf": pdf_url, "data": result})) |
|
|
| return {"orgId": org_id, "analytic": analytic_type, "industry": industry, "results": result, "timestamp": datetime.utcnow().isoformat()} |
|
|
| |
| |
| |
| def add_job_to_scheduler(schedule: dict): |
| org_id = schedule["orgId"] |
| freq = schedule["frequency"] |
| analytics = schedule["analytics"] |
| for analytic in analytics: |
| job_id = f"{schedule['id']}_{analytic}" |
| if freq == "daily": |
| sched.add_job(run_analytic_job, "cron", hour=6, minute=0, |
| args=[org_id, analytic], id=job_id) |
| elif freq == "weekly": |
| sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, |
| args=[org_id, analytic], id=job_id) |
| elif freq == "monthly": |
| sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, |
| args=[org_id, analytic], id=job_id) |
|
|
| def remove_job_from_scheduler(schedule_id: str): |
| for job in sched.get_jobs(): |
| if job.id.startswith(schedule_id): |
| sched.remove_job(job.id) |
|
|
| |
| |
| |
| async def load_schedules(): |
| import json |
| raw = os.getenv("SCHEDULES", "[]") |
| try: |
| schedules = json.loads(raw) |
| except Exception: |
| schedules = [] |
|
|
| for sch in schedules: |
| org_id = sch["orgId"] |
| freq = sch.get("frequency", "daily") |
| analytics = sch.get("analytics", ["eda"]) |
|
|
| for analytic in analytics: |
| job_id = f"{org_id}_{analytic}" |
| if freq == "daily": |
| sched.add_job(run_analytic_job, "cron", hour=6, minute=0, args=[org_id, analytic], id=job_id) |
| elif freq == "weekly": |
| sched.add_job(run_analytic_job, "cron", day_of_week=0, hour=6, minute=0, args=[org_id, analytic], id=job_id) |
| elif freq == "monthly": |
| sched.add_job(run_analytic_job, "cron", day=1, hour=6, minute=0, args=[org_id, analytic], id=job_id) |
|
|
| |
| |
| |
| def start_scheduler(): |
| asyncio.create_task(load_schedules()) |
| sched.start() |