| """ |
| Celery tasks for background processing |
| """ |
| import os |
| import sys |
| import logging |
| from celery import Celery |
| from datetime import datetime |
| from app.config import get_settings |
|
|
| |
| backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| project_root = os.path.dirname(backend_dir) |
| simulation_path = os.path.join(project_root, "simulation") |
| if simulation_path not in sys.path: |
| sys.path.insert(0, project_root) |
|
|
| settings = get_settings() |
| logger = logging.getLogger(__name__) |
|
|
| |
| celery_app = Celery( |
| "agentsociety", |
| broker=settings.redis_url, |
| backend=settings.redis_url |
| ) |
|
|
| import ssl as ssl_module |
|
|
| celery_app.conf.update( |
| task_serializer="json", |
| accept_content=["json"], |
| result_serializer="json", |
| timezone="UTC", |
| enable_utc=True, |
| task_track_started=True, |
| task_time_limit=3600, |
| broker_connection_retry_on_startup=True, |
| ) |
|
|
| |
| if settings.redis_url.startswith("rediss://"): |
| celery_app.conf.update( |
| broker_use_ssl={"ssl_cert_reqs": ssl_module.CERT_REQUIRED}, |
| redis_backend_use_ssl={"ssl_cert_reqs": ssl_module.CERT_REQUIRED}, |
| ) |
|
|
|
|
| @celery_app.task(bind=True) |
| def process_video_task(self, project_id: str): |
| """ |
| Background task to process video with VLM |
| """ |
| from app.database import SessionLocal |
| from app.models import Project |
| from app.services.vlm_service import process_video |
| |
| db = SessionLocal() |
| |
| try: |
| |
| project = db.query(Project).filter(Project.id == project_id).first() |
| if not project: |
| logger.error(f"Project not found: {project_id}") |
| return {"error": "Project not found"} |
| |
| |
| project.status = "PROCESSING" |
| db.commit() |
| |
| logger.info(f"Processing video for project {project_id}") |
| |
| |
| descriptions, duration = process_video(project.video_path) |
| |
| |
| project.vlm_generated_context = descriptions |
| project.video_duration_seconds = duration |
| project.status = "READY" |
| db.commit() |
| |
| logger.info(f"Video processing complete for project {project_id}") |
| |
| return { |
| "project_id": project_id, |
| "status": "READY", |
| "duration": duration |
| } |
| |
| except Exception as e: |
| logger.error(f"Video processing failed for project {project_id}: {e}") |
| try: |
| project = db.query(Project).filter(Project.id == project_id).first() |
| if project: |
| project.status = "FAILED" |
| db.commit() |
| except: |
| pass |
| return {"error": str(e)} |
| finally: |
| db.close() |
|
|
|
|
| @celery_app.task(bind=True) |
| def run_simulation_task(self, simulation_id: str): |
| """ |
| Background task to queue simulation for Ray worker |
| |
| This task: |
| 1. Validates the simulation and project |
| 2. Publishes request to Redis 'simulation_requests' channel |
| 3. Ray worker (separate process) handles the actual simulation |
| 4. Results listener updates the database when complete |
| """ |
| from app.database import SessionLocal |
| from app.models import SimulationRun, Project |
| import json |
| import redis |
| |
| db = SessionLocal() |
| redis_kwargs = {} |
| if settings.redis_url.startswith("rediss://"): |
| redis_kwargs["ssl_cert_reqs"] = ssl_module.CERT_REQUIRED |
| redis_client = redis.from_url(settings.redis_url, **redis_kwargs) |
| |
| try: |
| |
| simulation = db.query(SimulationRun).filter(SimulationRun.id == simulation_id).first() |
| if not simulation: |
| logger.error(f"Simulation not found: {simulation_id}") |
| return {"error": "Simulation not found"} |
| |
| |
| project = db.query(Project).filter(Project.id == simulation.project_id).first() |
| if not project or not project.vlm_generated_context: |
| logger.error(f"Project not ready for simulation: {simulation.project_id}") |
| simulation.status = "FAILED" |
| simulation.error_message = "Project video analysis not complete" |
| db.commit() |
| return {"error": "Project not ready"} |
| |
| |
| simulation.status = "RUNNING" |
| simulation.started_at = datetime.utcnow() |
| db.commit() |
| |
| logger.info(f"Sending simulation {simulation_id} to Ray worker") |
| |
| |
| request = { |
| "simulation_id": str(simulation.id), |
| "project_id": str(project.id), |
| "ad_content": project.vlm_generated_context, |
| "demographic_filter": project.demographic_filter, |
| "num_agents": simulation.num_agents, |
| "simulation_days": simulation.simulation_days |
| } |
| |
| redis_client.publish("simulation_requests", json.dumps(request)) |
| |
| logger.info(f"Simulation {simulation_id} published to Ray worker queue") |
| |
| return { |
| "simulation_id": simulation_id, |
| "status": "RUNNING", |
| "message": "Simulation sent to Ray worker for processing" |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to queue simulation {simulation_id}: {e}") |
| try: |
| simulation = db.query(SimulationRun).filter(SimulationRun.id == simulation_id).first() |
| if simulation: |
| simulation.status = "FAILED" |
| simulation.error_message = str(e) |
| simulation.completed_at = datetime.utcnow() |
| db.commit() |
| except: |
| pass |
| return {"error": str(e)} |
| finally: |
| db.close() |