| | """ |
| | Background task processing (no Celery, no AWS). |
| | """ |
| |
|
| | import json |
| | import os |
| | import threading |
| | import time |
| | from datetime import datetime, timezone |
| |
|
| | from .inference import run_inference |
| | from .config import OUTPUTS_DIR |
| |
|
| | |
| | runs: dict[str, dict] = {} |
| | runs_lock = threading.Lock() |
| |
|
| | |
| | FORCE_ERROR = os.getenv("FORCE_ERROR") == "1" |
| | SLEEP_SECS = int(os.getenv("SLEEP_SECS", "0")) |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| |
|
| | def run_task( |
| | run_id: str, |
| | image_path: str, |
| | topics: list = None, |
| | creators: list = None, |
| | model: str = "paintingclip", |
| | ) -> None: |
| | """ |
| | Process a single run: load image from disk, run ML inference, save output, update status. |
| | """ |
| | print(f"π Starting task for run {run_id}") |
| | print(f"π Image path: {image_path}") |
| | print(f"π Topics: {topics}, Creators: {creators}, Model: {model}") |
| | |
| | |
| | print(f"π Environment check:") |
| | print(f" STUB_MODE: {os.getenv('STUB_MODE', 'not set')}") |
| | print(f" Current working directory: {os.getcwd()}") |
| | print(f" Image file exists: {os.path.exists(image_path)}") |
| | if os.path.exists(image_path): |
| | print(f" Image file size: {os.path.getsize(image_path)} bytes") |
| | |
| | |
| | try: |
| | from .patch_inference import _prepare_image |
| | _prepare_image.cache_clear() |
| | print(f"β
Cleared patch inference cache") |
| | except ImportError as e: |
| | print(f"β οΈ patch_inference import failed: {e}") |
| |
|
| | |
| | with runs_lock: |
| | if run_id not in runs: |
| | print(f"β Run {run_id} not found in runs store") |
| | return |
| | runs[run_id]["status"] = "processing" |
| | runs[run_id]["startedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds") |
| | runs[run_id]["updatedAt"] = runs[run_id]["startedAt"] |
| | print(f"β
Run {run_id} marked as processing") |
| |
|
| | try: |
| | |
| | if not os.path.exists(image_path): |
| | raise FileNotFoundError(f"Image file not found: {image_path}") |
| |
|
| | if SLEEP_SECS: |
| | time.sleep(SLEEP_SECS) |
| |
|
| | print(f"π About to call run_inference...") |
| | |
| | |
| | labels = run_inference( |
| | image_path, filter_topics=topics, filter_creators=creators, model_type=model |
| | ) |
| | |
| | print(f"β
run_inference completed successfully") |
| | print(f"β
Labels type: {type(labels)}") |
| | print(f"β
Labels length: {len(labels) if isinstance(labels, list) else 'not a list'}") |
| |
|
| | |
| | if FORCE_ERROR: |
| | raise RuntimeError("Forced error for testing") |
| |
|
| | |
| | print(f"π Saving results to outputs directory...") |
| | os.makedirs(OUTPUTS_DIR, exist_ok=True) |
| | output_filename = f"{run_id}.json" |
| | output_path = os.path.join(OUTPUTS_DIR, output_filename) |
| | output_key = f"outputs/{output_filename}" |
| |
|
| | with open(output_path, "w") as f: |
| | json.dump(labels, f) |
| |
|
| | |
| | if not os.path.exists(output_path): |
| | raise RuntimeError(f"Failed to create output file: {output_path}") |
| |
|
| | |
| | with runs_lock: |
| | runs[run_id]["status"] = "done" |
| | runs[run_id]["outputKey"] = output_key |
| | runs[run_id]["finishedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds") |
| | runs[run_id]["updatedAt"] = runs[run_id]["finishedAt"] |
| | runs[run_id].pop("errorMessage", None) |
| | print(f"β
Task completed successfully for run {run_id}") |
| | print(f"β
Output saved to: {output_path}") |
| | print(f"β
Output key: {output_key}") |
| |
|
| | except Exception as exc: |
| | |
| | print(f"β Error in run {run_id}: {exc}") |
| | print(f"β Error type: {type(exc).__name__}") |
| | import traceback |
| | print(f"β Full traceback:") |
| | traceback.print_exc() |
| |
|
| | with runs_lock: |
| | if run_id in runs: |
| | runs[run_id]["status"] = "error" |
| | runs[run_id]["errorMessage"] = str(exc)[:500] |
| | runs[run_id]["updatedAt"] = datetime.now(timezone.utc).isoformat(timespec="seconds") |
| | print(f"β Run {run_id} marked as error: {runs[run_id]['errorMessage']}") |
| |
|