| """ |
| Database-based Task Scheduler for Apex Ores |
| |
| Remplace les cron jobs traditionnels par un système robuste basé sur la DB: |
| - Rattrapage automatique des tâches manquées après redémarrage |
| - Historique complet des exécutions |
| - Interface admin pour monitoring et contrôle |
| - Tolérance aux pannes serveur |
| |
| Usage: |
| # Mode daemon (boucle infinie) |
| python -m app.task_runner --daemon |
| |
| # Mode one-shot (pour cron ou test) |
| python -m app.task_runner |
| |
| # Via Flask CLI |
| flask run-scheduler |
| """ |
|
|
| import sys |
| import os |
| import socket |
| import traceback |
| from datetime import datetime, timedelta, timezone, date, time |
| from io import StringIO |
| from contextlib import redirect_stdout |
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from app import create_app, db |
| from app.models import ScheduledTask, TaskExecutionLog |
|
|
|
|
| class TaskRunner: |
| def __init__(self): |
| self.app = create_app() |
| self.hostname = socket.gethostname() |
| self.pid = os.getpid() |
|
|
| def run_pending_tasks(self): |
| """Exécute toutes les tâches qui doivent tourner""" |
| with self.app.app_context(): |
| now = datetime.now(timezone.utc) |
|
|
| pending_tasks = ScheduledTask.query.filter( |
| ScheduledTask.is_active == True, |
| db.or_( |
| ScheduledTask.next_run_at == None, ScheduledTask.next_run_at <= now |
| ), |
| ).all() |
|
|
| print(f"[{now}] {len(pending_tasks)} tâche(s) à exécuter") |
|
|
| for task in pending_tasks: |
| self.execute_task(task) |
|
|
| def execute_task(self, task): |
| """Exécute une tâche spécifique avec logging complet""" |
| print(f"\n[{datetime.now(timezone.utc)}] Exécution de: {task.name}") |
|
|
| with self.app.app_context(): |
| execution_log = TaskExecutionLog( |
| task_id=task.id, |
| task_name=task.name, |
| server_hostname=self.hostname, |
| process_id=self.pid, |
| ) |
| db.session.add(execution_log) |
| task.mark_running() |
| execution_log.status = "running" |
| db.session.commit() |
|
|
| try: |
| output = self.run_task_function(task.name) |
|
|
| execution_log.status = "success" |
| execution_log.completed_at = datetime.now(timezone.utc) |
| execution_log.output_log = output |
| task.mark_success() |
|
|
| print(f" ✓ {task.name} terminé avec succès") |
|
|
| except Exception as e: |
| error_msg = f"{str(e)}\n{traceback.format_exc()}" |
|
|
| execution_log.status = "failed" |
| execution_log.completed_at = datetime.now(timezone.utc) |
| execution_log.error_message = error_msg[:2000] |
| task.mark_failed(str(e)) |
|
|
| print(f" ✗ {task.name} a échoué: {str(e)}") |
|
|
| db.session.commit() |
|
|
| def run_task_function(self, task_name): |
| """Exécute la fonction associée au nom de la tâche""" |
| f = StringIO() |
|
|
| with redirect_stdout(f): |
| if task_name == "daily_gains": |
| self._run_daily_gains() |
| elif task_name == "hourly_cleanup": |
| self._run_hourly_cleanup() |
| elif task_name == "weekly_report": |
| self._run_weekly_report() |
| else: |
| raise ValueError(f"Tâche inconnue: {task_name}") |
|
|
| return f.getvalue() |
|
|
| def _run_daily_gains(self): |
| """Calcule les gains quotidiens et traite les commissions""" |
| from scripts.daily_gains import ( |
| calculate_daily_gains, |
| auto_process_withdrawals, |
| cleanup_old_notifications, |
| print_platform_summary, |
| ) |
|
|
| calculate_daily_gains() |
| auto_process_withdrawals() |
| cleanup_old_notifications() |
| print_platform_summary() |
|
|
| def _run_hourly_cleanup(self): |
| """Nettoyage périodique""" |
| print("Nettoyage horaire...") |
| from scripts.daily_gains import cleanup_old_notifications |
|
|
| cleanup_old_notifications() |
| print("Nettoyage terminé") |
|
|
| def _run_weekly_report(self): |
| """Rapport hebdomadaire""" |
| print("Génération du rapport hebdomadaire...") |
| print("TODO: Implémenter le rapport hebdomadaire") |
|
|
| def check_missed_tasks(self): |
| """Vérifie et exécute les tâches manquées depuis le dernier démarrage""" |
| with self.app.app_context(): |
| now = datetime.now(timezone.utc) |
|
|
| missed_tasks = ScheduledTask.query.filter( |
| ScheduledTask.is_active == True, |
| db.or_( |
| ScheduledTask.last_run_at.is_(None), |
| ScheduledTask.last_run_at < now - timedelta(hours=25), |
| ), |
| ).all() |
|
|
| if missed_tasks: |
| print(f"\n⚠️ {len(missed_tasks)} tâche(s) manquée(s) détectée(s)!") |
| for task in missed_tasks: |
| last_run = ( |
| task.last_run_at.strftime("%Y-%m-%d %H:%M") |
| if task.last_run_at |
| else "JAMAIS" |
| ) |
| print(f" - {task.name} (dernier run: {last_run})") |
| self.execute_task(task) |
| else: |
| print(f"\n✓ Aucune tâche manquée") |
|
|
| def initialize_default_tasks(self): |
| """Crée les tâches par défaut si elles n'existent pas""" |
| with self.app.app_context(): |
| default_tasks = [ |
| { |
| "name": "daily_gains", |
| "description": "Calcule les gains quotidiens, traite les retraits et commissions", |
| "schedule_type": "daily", |
| "schedule_time": time(0, 5, 0), |
| "is_active": True, |
| }, |
| { |
| "name": "hourly_cleanup", |
| "description": "Nettoyage des notifications et logs anciens", |
| "schedule_type": "hourly", |
| "is_active": True, |
| }, |
| ] |
|
|
| for task_data in default_tasks: |
| existing = ScheduledTask.query.filter_by(name=task_data["name"]).first() |
| if not existing: |
| task = ScheduledTask(**task_data) |
| task.calculate_next_run() |
| db.session.add(task) |
| print(f"✓ Tâche créée: {task_data['name']}") |
|
|
| db.session.commit() |
|
|
| def print_status(self): |
| """Affiche le statut de toutes les tâches""" |
| with self.app.app_context(): |
| tasks = ScheduledTask.query.all() |
|
|
| print("\n" + "=" * 70) |
| print("STATUT DES TÂCHES PLANIFIÉES") |
| print("=" * 70) |
|
|
| for task in tasks: |
| status_icon = ( |
| "✓" |
| if task.last_status == "success" |
| else "✗" |
| if task.last_status == "failed" |
| else "○" |
| ) |
| next_run = ( |
| task.next_run_at.strftime("%d/%m %H:%M") |
| if task.next_run_at |
| else "Non calculé" |
| ) |
| last_run = ( |
| task.last_run_at.strftime("%d/%m %H:%M") |
| if task.last_run_at |
| else "Jamais" |
| ) |
|
|
| print( |
| f"{status_icon} {task.name:20} | Prochain: {next_run:12} | Dernier: {last_run:12} | Runs: {task.run_count}" |
| ) |
|
|
| print("=" * 70) |
|
|
|
|
| def run_scheduler_daemon(): |
| """Mode daemon - boucle infinie qui vérifie chaque minute""" |
| runner = TaskRunner() |
|
|
| print("=" * 70) |
| print("SCHEDULER DB-BASED - MODE DAEMON") |
| print(f"Démarré à: {datetime.now(timezone.utc)}") |
| print(f"Hostname: {runner.hostname} | PID: {runner.pid}") |
| print("=" * 70) |
|
|
| runner.initialize_default_tasks() |
| runner.check_missed_tasks() |
| runner.print_status() |
|
|
| print("\nEn attente des tâches... (Ctrl+C pour arrêter)\n") |
|
|
| try: |
| while True: |
| runner.run_pending_tasks() |
| import time |
|
|
| time.sleep(60) |
| except KeyboardInterrupt: |
| print("\n\nArrêt du scheduler.") |
| print(f"Terminé à: {datetime.now(timezone.utc)}") |
|
|
|
|
| def run_scheduler_once(): |
| """Mode one-shot - une seule exécution puis sortie""" |
| runner = TaskRunner() |
|
|
| print("=" * 70) |
| print("SCHEDULER DB-BASED - MODE ONE-SHOT") |
| print(f"Exécution à: {datetime.now(timezone.utc)}") |
| print("=" * 70) |
|
|
| runner.initialize_default_tasks() |
| runner.check_missed_tasks() |
| runner.run_pending_tasks() |
| runner.print_status() |
|
|
| print("\nExécution terminée.") |
|
|
|
|
| def check_and_run_missed_on_startup(): |
| """Vérifie les tâches manquées au démarrage de l'app (non bloquant)""" |
| try: |
| runner = TaskRunner() |
| runner.initialize_default_tasks() |
| runner.check_missed_tasks() |
| except Exception as e: |
| print(f"[Scheduler] Erreur lors de la vérification des tâches: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| if len(sys.argv) > 1 and sys.argv[1] == "--daemon": |
| run_scheduler_daemon() |
| else: |
| run_scheduler_once() |
|
|