Researcher / src /scheduler.py
amarck's picture
Add HF Spaces support, preference seeding, archive search, tests
430d0f8
"""APScheduler β€” configurable pipeline trigger running inside the web process."""
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
log = logging.getLogger(__name__)
scheduler = BackgroundScheduler()
JOB_ID = "weekly_run"
# Default: Sunday 22:00 UTC
_DEFAULT_CRON = CronTrigger(day_of_week="sun", hour=22, minute=0)
def _parse_cron_trigger(cron_str: str) -> CronTrigger | None:
"""Parse a 5-field cron string into a CronTrigger.
Returns None for empty string (manual-only mode).
Falls back to default schedule on parse errors.
"""
if not cron_str or not cron_str.strip():
return None
parts = cron_str.strip().split()
if len(parts) != 5:
log.warning("Invalid cron string '%s' (expected 5 fields) β€” using default", cron_str)
return _DEFAULT_CRON
try:
return CronTrigger(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4],
)
except (ValueError, TypeError) as e:
log.warning("Failed to parse cron '%s': %s β€” using default", cron_str, e)
return _DEFAULT_CRON
def weekly_run():
"""Run all enabled pipelines: aiml β†’ security β†’ github β†’ events β†’ reports."""
from src.config import is_pipeline_enabled
log.info("Starting scheduled run ...")
if is_pipeline_enabled("aiml"):
try:
from src.pipelines.aiml import run_aiml_pipeline
from src.scoring import rescore_top, score_run
aiml_run_id = run_aiml_pipeline()
score_run(aiml_run_id, "aiml")
rescore_top(aiml_run_id, "aiml")
from src.web.app import _enrich_s2, _generate_report
_enrich_s2(aiml_run_id, "aiml")
_generate_report(aiml_run_id, "aiml")
except Exception:
log.exception("AI/ML pipeline failed")
else:
log.info("AI/ML pipeline disabled β€” skipping")
if is_pipeline_enabled("security"):
try:
from src.pipelines.security import run_security_pipeline
from src.scoring import rescore_top, score_run
sec_run_id = run_security_pipeline()
score_run(sec_run_id, "security")
rescore_top(sec_run_id, "security")
from src.web.app import _enrich_s2, _generate_report
_enrich_s2(sec_run_id, "security")
_generate_report(sec_run_id, "security")
except Exception:
log.exception("Security pipeline failed")
else:
log.info("Security pipeline disabled β€” skipping")
if is_pipeline_enabled("github"):
try:
from src.pipelines.github import run_github_pipeline
run_github_pipeline()
except Exception:
log.exception("GitHub pipeline failed")
else:
log.info("GitHub pipeline disabled β€” skipping")
if is_pipeline_enabled("events"):
try:
from src.pipelines.events import run_events_pipeline
run_events_pipeline()
except Exception:
log.exception("Events pipeline failed")
else:
log.info("Events pipeline disabled β€” skipping")
# Recompute preferences after scoring so adjusted rankings reflect new data
try:
from src.preferences import compute_preferences
from src.db import get_signal_counts
counts = get_signal_counts()
if sum(counts.values()) > 0:
compute_preferences()
log.info("Preferences recomputed after scheduled run")
except Exception:
log.exception("Post-run preference recompute failed")
log.info("Scheduled run complete")
def start_scheduler():
"""Start the background scheduler with the configured cron job."""
from src.config import SCHEDULE_CRON
trigger = _parse_cron_trigger(SCHEDULE_CRON)
if trigger is None:
log.info("Schedule set to manual β€” no automatic jobs")
scheduler.start()
return
scheduler.add_job(
weekly_run,
trigger=trigger,
id=JOB_ID,
name="Scheduled research pipeline",
replace_existing=True,
)
scheduler.start()
log.info("Scheduler started with cron: %s", SCHEDULE_CRON)
def reschedule(cron_str: str | None = None):
"""Update the scheduler with a new cron string at runtime.
If cron_str is None, reads from current config.
"""
if cron_str is None:
from src.config import SCHEDULE_CRON
cron_str = SCHEDULE_CRON
# Remove existing job if present
try:
scheduler.remove_job(JOB_ID)
except Exception:
pass # Job may not exist
trigger = _parse_cron_trigger(cron_str)
if trigger is None:
log.info("Rescheduled to manual β€” removed automatic job")
return
scheduler.add_job(
weekly_run,
trigger=trigger,
id=JOB_ID,
name="Scheduled research pipeline",
replace_existing=True,
)
log.info("Rescheduled with cron: %s", cron_str)