text-to-3d-backend / utils /job_manager.py
aniket47's picture
Initial FastAPI backend for HF Spaces
86e7db6
"""
Job management for tracking async tasks
"""
import time
import threading
import logging
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
class JobManager:
"""Manages background job tracking and progress"""
def __init__(self):
self.active_jobs: Dict[str, Dict] = {}
self.job_progress: Dict[str, Dict] = {}
self.job_results: Dict[str, Dict] = {}
self.jobs_lock = threading.Lock()
self.progress_lock = threading.Lock()
self.results_lock = threading.Lock()
# Start cleanup task
self._start_cleanup_task()
def register_job(self, job_id: str):
"""Register a new job"""
with self.jobs_lock:
self.active_jobs[job_id] = {
'cancelled': False,
'created_at': time.time(),
'status': 'active'
}
with self.progress_lock:
self.job_progress[job_id] = {
'stage': 'starting',
'progress': 0,
'message': 'Job started...',
'timestamp': time.time()
}
logger.info(f"πŸ“ Job registered: {job_id}")
def is_job_cancelled(self, job_id: str) -> bool:
"""Check if a job has been cancelled"""
with self.jobs_lock:
return self.active_jobs.get(job_id, {}).get('cancelled', False)
def cancel_job(self, job_id: str) -> bool:
"""Cancel a job"""
with self.jobs_lock:
if job_id in self.active_jobs:
self.active_jobs[job_id]['cancelled'] = True
self.active_jobs[job_id]['status'] = 'cancelled'
logger.info(f"❌ Job cancelled: {job_id}")
return True
return False
def update_job_progress(self, job_id: str, stage: str, progress: int, message: str):
"""Update job progress"""
with self.progress_lock:
if job_id in self.job_progress:
self.job_progress[job_id] = {
'stage': stage,
'progress': progress,
'message': message,
'timestamp': time.time()
}
logger.info(f"πŸ“Š Job {job_id}: {stage} - {progress}% - {message}")
def get_job_progress(self, job_id: str) -> Optional[Dict]:
"""Get current job progress"""
with self.progress_lock:
return self.job_progress.get(job_id)
def complete_job(self, job_id: str, results: Dict[str, Any]):
"""Mark job as completed with results"""
with self.jobs_lock:
if job_id in self.active_jobs:
self.active_jobs[job_id]['status'] = 'completed'
with self.progress_lock:
self.job_progress[job_id] = {
'stage': 'completed',
'progress': 100,
'message': 'Job completed successfully!',
'timestamp': time.time()
}
with self.results_lock:
self.job_results[job_id] = {
**results,
'completed_at': time.time()
}
logger.info(f"βœ… Job completed: {job_id}")
def fail_job(self, job_id: str, error_message: str):
"""Mark job as failed"""
with self.jobs_lock:
if job_id in self.active_jobs:
self.active_jobs[job_id]['status'] = 'failed'
with self.progress_lock:
self.job_progress[job_id] = {
'stage': 'error',
'progress': 0,
'message': f'Error: {error_message}',
'timestamp': time.time()
}
logger.error(f"❌ Job failed: {job_id} - {error_message}")
def get_job_results(self, job_id: str) -> Optional[Dict]:
"""Get job results if completed"""
with self.results_lock:
return self.job_results.get(job_id)
def get_active_job_count(self) -> int:
"""Get number of active jobs"""
with self.jobs_lock:
return len([j for j in self.active_jobs.values() if j['status'] == 'active'])
def cleanup_old_jobs(self):
"""Clean up jobs older than 30 minutes"""
current_time = time.time()
cleanup_age = 1800 # 30 minutes
jobs_to_remove = []
with self.jobs_lock:
for job_id, job_data in self.active_jobs.items():
if current_time - job_data['created_at'] > cleanup_age:
jobs_to_remove.append(job_id)
for job_id in jobs_to_remove:
self._remove_job(job_id)
logger.info(f"🧹 Cleaned up old job: {job_id}")
def _remove_job(self, job_id: str):
"""Remove job from all tracking dictionaries"""
with self.jobs_lock:
self.active_jobs.pop(job_id, None)
with self.progress_lock:
self.job_progress.pop(job_id, None)
with self.results_lock:
self.job_results.pop(job_id, None)
def _start_cleanup_task(self):
"""Start background cleanup task"""
def cleanup_worker():
while True:
time.sleep(300) # Run every 5 minutes
try:
self.cleanup_old_jobs()
except Exception as e:
logger.error(f"❌ Error in cleanup task: {str(e)}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
logger.info("🧹 Cleanup task started")