Spaces:
Paused
Paused
| import os | |
| import json | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import zipfile | |
| import shutil | |
| from typing import Dict, List, Set, Optional, Tuple, Any | |
| from urllib.parse import quote | |
| from datetime import datetime | |
| from pathlib import Path | |
| import io | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException, status | |
| from pydantic import BaseModel, Field | |
| from huggingface_hub import HfApi, hf_hub_download | |
| AUTO_START_INDEX = 0 # Hardcoded default start index if no progress is found | |
| FLOW_ID = os.getenv("FLOW_ID", "flow_default") | |
| FLOW_PORT = int(os.getenv("FLOW_PORT", 8001)) | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| HF_DATASET_ID = os.getenv("HF_DATASET_ID", "samfred2/BG4") # Source dataset for zip files | |
| HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "samelias1/helium_tg") # Target dataset for captions | |
| # Progress and State Tracking | |
| PROGRESS_FILE = Path("processing_progress.json") | |
| HF_STATE_FILE = "processing_state_captions.json" # State file in helium dataset | |
| LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file | |
| LOCAL_STATE_FOLDER.mkdir(exist_ok=True) | |
| # Directory within the HF dataset where the zip files are located | |
| ZIP_FILE_PREFIX = "frames_zips/" | |
| # Using the full list from the user's original code for actual deployment | |
| CAPTION_SERVERS = [ | |
| "https://elias80-elen-1.hf.space/analyze", | |
| "https://elias80-elen-2.hf.space/analyze", | |
| "https://elias80-elen-3.hf.space/analyze", | |
| "https://elias80-elen-4.hf.space/analyze", | |
| "https://elias80-elen-5.hf.space/analyze", | |
| "https://elias80-elen-6.hf.space/analyze", | |
| "https://elias80-elen-7.hf.space/analyze", | |
| "https://elias80-elen-8.hf.space/analyze", | |
| "https://elias80-elen-9.hf.space/analyze", | |
| "https://elias80-elen-10.hf.space/analyze", | |
| "https://elias80-elen-11.hf.space/analyze", | |
| "https://elias80-elen-12.hf.space/analyze", | |
| "https://elias80-elen-13.hf.space/analyze", | |
| "https://elias80-elen-14.hf.space/analyze", | |
| "https://elias80-elen-15.hf.space/analyze", | |
| "https://elias80-elen-16.hf.space/analyze", | |
| "https://elias80-elen-17.hf.space/analyze", | |
| "https://elias80-elen-18.hf.space/analyze", | |
| "https://elias80-elen-19.hf.space/analyze", | |
| "https://elias80-elen-20.hf.space/analyze", | |
| # "https://onewayto-jean-1.hf.space/analyze", | |
| # "https://onewayto-jean-2.hf.space/analyze", | |
| # "https://onewayto-jean-3.hf.space/analyze", | |
| # "https://onewayto-jean-4.hf.space/analyze", | |
| # "https://onewayto-jean-5.hf.space/analyze", | |
| # "https://onewayto-jean-6.hf.space/analyze", | |
| # "https://onewayto-jean-7.hf.space/analyze", | |
| # "https://onewayto-jean-8.hf.space/analyze", | |
| # "https://onewayto-jean-9.hf.space/analyze", | |
| # "https://onewayto-jean-10.hf.space/analyze", | |
| # "https://onewayto-jean-11.hf.space/analyze", | |
| # "https://onewayto-jean-12.hf.space/analyze", | |
| # "https://onewayto-jean-13.hf.space/analyze", | |
| # "https://onewayto-jean-14.hf.space/analyze", | |
| # "https://onewayto-jean-15.hf.space/analyze", | |
| # "https://onewayto-jean-16.hf.space/analyze", | |
| # "https://onewayto-jean-17.hf.space/analyze", | |
| # "https://onewayto-jean-18.hf.space/analyze", | |
| # "https://onewayto-jean-19.hf.space/analyze", | |
| # "https://onewayto-jean-20.hf.space/analyze", | |
| "https://Elias2211-bam-1.hf.space/analyze", | |
| "https://Elias2211-bam-2.hf.space/analyze", | |
| "https://Elias2211-bam-3.hf.space/analyze", | |
| "https://Elias2211-bam-4.hf.space/analyze", | |
| "https://Elias2211-bam-5.hf.space/analyze", | |
| "https://Elias2211-bam-6.hf.space/analyze", | |
| "https://Elias2211-bam-7.hf.space/analyze", | |
| "https://Elias2211-bam-8.hf.space/analyze", | |
| "https://Elias2211-bam-10.hf.space/analyze", | |
| "https://Elias2211-bam-11.hf.space/analyze", | |
| "https://Elias2211-bam-12.hf.space/analyze", | |
| "https://Elias2211-bam-13.hf.space/analyze", | |
| "https://Elias2211-bam-14.hf.space/analyze", | |
| "https://Elias2211-bam-15.hf.space/analyze", | |
| "https://Elias2211-bam-16.hf.space/analyze", | |
| "https://Elias2211-bam-17.hf.space/analyze", | |
| "https://Elias2211-bam-18.hf.space/analyze", | |
| "https://Elias2211-bam-19.hf.space/analyze", | |
| "https://Elias2211-bam-20.hf.space/analyze", | |
| "https://samfred2-isherelike-1.hf.space/analyze", | |
| "https://samfred2-isherelike-2.hf.space/analyze", | |
| "https://samfred2-isherelike-3.hf.space/analyze", | |
| "https://samfred2-isherelike-5.hf.space/analyze", | |
| "https://samfred2-isherelike-6.hf.space/analyze", | |
| "https://samfred2-isherelike-7.hf.space/analyze", | |
| "https://samfred2-isherelike-8.hf.space/analyze", | |
| "https://samfred2-isherelike-9.hf.space/analyze", | |
| "https://samfred2-isherelike-10.hf.space/analyze", | |
| "https://samfred2-isherelike-11.hf.space/analyze", | |
| "https://samfred2-isherelike-12.hf.space/analyze", | |
| "https://samfred2-isherelike-13.hf.space/analyze", | |
| "https://samfred2-isherelike-14.hf.space/analyze", | |
| "https://samfred2-isherelike-15.hf.space/analyze", | |
| "https://samfred2-isherelike-16.hf.space/analyze", | |
| "https://samfred2-isherelike-17.hf.space/analyze", | |
| "https://samfred2-isherelike-19.hf.space/analyze", | |
| "https://Fred800-jam-1.hf.space/analyze", | |
| "https://Fred800-jam-2.hf.space/analyze", | |
| "https://Fred800-jam-3.hf.space/analyze", | |
| "https://Fred800-jam-4.hf.space/analyze", | |
| "https://Fred800-jam-5.hf.space/analyze", | |
| "https://Fred800-jam-6.hf.space/analyze", | |
| "https://Fred800-jam-7.hf.space/analyze", | |
| "https://Fred800-jam-8.hf.space/analyze", | |
| "https://Fred800-jam-9.hf.space/analyze", | |
| "https://Fred800-jam-10.hf.space/analyze", | |
| "https://Fred800-jam-11.hf.space/analyze", | |
| "https://Fred800-jam-12.hf.space/analyze", | |
| "https://Fred800-jam-13.hf.space/analyze", | |
| "https://Fred800-jam-14.hf.space/analyze", | |
| "https://Fred800-jam-15.hf.space/analyze", | |
| "https://Fred800-jam-17.hf.space/analyze", | |
| "https://Fred800-jam-18.hf.space/analyze", | |
| "https://Fred800-jam-19.hf.space/analyze", | |
| "https://Fred800-jam-20.hf.space/analyze", | |
| "https://favoredone-sweet-1.hf.space/analyze", | |
| "https://favoredone-sweet-2.hf.space/analyze", | |
| "https://favoredone-sweet-3.hf.space/analyze", | |
| "https://favoredone-sweet-4.hf.space/analyze", | |
| "https://favoredone-sweet-5.hf.space/analyze", | |
| "https://favoredone-sweet-6.hf.space/analyze", | |
| "https://favoredone-sweet-7.hf.space/analyze", | |
| "https://favoredone-sweet-8.hf.space/analyze", | |
| "https://favoredone-sweet-9.hf.space/analyze", | |
| "https://favoredone-sweet-10.hf.space/analyze", | |
| "https://favoredone-sweet-11.hf.space/analyze", | |
| "https://favoredone-sweet-12.hf.space/analyze", | |
| "https://favoredone-sweet-13.hf.space/analyze", | |
| "https://favoredone-sweet-14.hf.space/analyze", | |
| "https://favoredone-sweet-15.hf.space/analyze", | |
| "https://favoredone-sweet-16.hf.space/analyze", | |
| "https://favoredone-sweet-17.hf.space/analyze", | |
| "https://favoredone-sweet-18.hf.space/analyze", | |
| "https://favoredone-sweet-19.hf.space/analyze", | |
| "https://favoredone-sweet-20.hf.space/analyze", | |
| "https://sameli05-sweet-1.hf.space/analyze", | |
| "https://sameli05-sweet-2.hf.space/analyze", | |
| "https://sameli05-sweet-3.hf.space/analyze", | |
| "https://sameli05-sweet-4.hf.space/analyze", | |
| "https://sameli05-sweet-5.hf.space/analyze", | |
| "https://sameli05-sweet-6.hf.space/analyze", | |
| "https://sameli05-sweet-7.hf.space/analyze", | |
| "https://sameli05-sweet-8.hf.space/analyze", | |
| "https://sameli05-sweet-9.hf.space/analyze", | |
| "https://sameli05-sweet-10.hf.space/analyze", | |
| "https://sameli05-sweet-11.hf.space/analyze", | |
| "https://sameli05-sweet-12.hf.space/analyze", | |
| "https://sameli05-sweet-13.hf.space/analyze", | |
| "https://sameli05-sweet-14.hf.space/analyze", | |
| "https://sameli05-sweet-15.hf.space/analyze", | |
| "https://sameli05-sweet-16.hf.space/analyze", | |
| "https://sameli05-sweet-17.hf.space/analyze", | |
| "https://sameli05-sweet-18.hf.space/analyze", | |
| "https://sameli05-sweet-19.hf.space/analyze", | |
| "https://sameli05-sweet-20.hf.space/analyze", | |
| "https://michy2-swait-1.hf.space/analyze", | |
| "https://michy2-swait-2.hf.space/analyze", | |
| "https://michy2-swait-3.hf.space/analyze", | |
| "https://michy2-swait-4.hf.space/analyze", | |
| "https://michy2-swait-5.hf.space/analyze", | |
| "https://michy2-swait-6.hf.space/analyze", | |
| "https://michy2-swait-7.hf.space/analyze", | |
| "https://michy2-swait-8.hf.space/analyze", | |
| "https://michy2-swait-9.hf.space/analyze", | |
| "https://michy2-swait-10.hf.space/analyze", | |
| "https://michy2-swait-11.hf.space/analyze", | |
| "https://michy2-swait-12.hf.space/analyze", | |
| "https://michy2-swait-13.hf.space/analyze", | |
| "https://michy2-swait-14.hf.space/analyze", | |
| "https://michy2-swait-15.hf.space/analyze", | |
| "https://michy2-swait-16.hf.space/analyze", | |
| "https://michy2-swait-17.hf.space/analyze", | |
| "https://michy2-swait-18.hf.space/analyze", | |
| "https://michy2-swait-19.hf.space/analyze", | |
| "https://michy2-swait-20.hf.space/analyze", | |
| ] | |
| MODEL_TYPE = "Florence-2-large" | |
| # Temporary storage for images | |
| TEMP_DIR = Path(f"temp_images_{FLOW_ID}") | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| # --- Models --- | |
| class ProcessStartRequest(BaseModel): | |
| start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).") | |
| class CaptionServer: | |
| def __init__(self, url): | |
| self.url = url | |
| self.busy = False | |
| self.total_processed = 0 | |
| self.total_time = 0 | |
| self.model = MODEL_TYPE | |
| def fps(self): | |
| return self.total_processed / self.total_time if self.total_time > 0 else 0 | |
| # Global state for caption servers | |
| servers = [CaptionServer(url) for url in CAPTION_SERVERS] | |
| server_index = 0 | |
| # --- Progress and State Management Functions --- | |
| def load_progress() -> Dict: | |
| """Loads the local processing progress from the JSON file.""" | |
| if PROGRESS_FILE.exists(): | |
| try: | |
| with PROGRESS_FILE.open('r') as f: | |
| return json.load(f) | |
| except json.JSONDecodeError: | |
| print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.") | |
| # Fall through to return default structure | |
| # Default structure | |
| return { | |
| "last_processed_index": 0, | |
| "processed_files": {}, # {index: repo_path} | |
| "file_list": [] # Full list of all zip files found in the dataset | |
| } | |
| def save_progress(progress_data: Dict): | |
| """Saves the local processing progress to the JSON file.""" | |
| try: | |
| with PROGRESS_FILE.open('w') as f: | |
| json.dump(progress_data, f, indent=4) | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}") | |
| def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]: | |
| """Load state from JSON file with migration logic for new structure.""" | |
| if os.path.exists(file_path): | |
| try: | |
| with open(file_path, "r") as f: | |
| data = json.load(f) | |
| # Migration Logic | |
| if "file_states" not in data or not isinstance(data["file_states"], dict): | |
| print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.") | |
| data["file_states"] = {} | |
| if "next_download_index" not in data: | |
| data["next_download_index"] = 0 | |
| return data | |
| except json.JSONDecodeError: | |
| print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}") | |
| return default_value | |
| def save_json_state(file_path: str, data: Dict[str, Any]): | |
| """Save state to JSON file""" | |
| with open(file_path, "w") as f: | |
| json.dump(data, f, indent=2) | |
| async def download_hf_state() -> Dict[str, Any]: | |
| """Downloads the state file from Hugging Face or returns a default state.""" | |
| local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE | |
| default_state = {"next_download_index": 0, "file_states": {}} | |
| try: | |
| # Check if the file exists in the helium repo | |
| files = HfApi(token=HF_TOKEN).list_repo_files( | |
| repo_id=HF_OUTPUT_DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| if HF_STATE_FILE not in files: | |
| print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.") | |
| return default_state | |
| # Download the file | |
| hf_hub_download( | |
| repo_id=HF_OUTPUT_DATASET_ID, | |
| filename=HF_STATE_FILE, | |
| repo_type="dataset", | |
| local_dir=LOCAL_STATE_FOLDER, | |
| local_dir_use_symlinks=False, | |
| token=HF_TOKEN | |
| ) | |
| print(f"[{FLOW_ID}] Successfully downloaded state file.") | |
| return load_json_state(str(local_path), default_state) | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.") | |
| return default_state | |
| async def upload_hf_state(state: Dict[str, Any]) -> bool: | |
| """Uploads the state file to Hugging Face.""" | |
| local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE | |
| try: | |
| # Save state locally first | |
| save_json_state(str(local_path), state) | |
| # Upload to helium dataset | |
| HfApi(token=HF_TOKEN).upload_file( | |
| path_or_fileobj=str(local_path), | |
| path_in_repo=HF_STATE_FILE, | |
| repo_id=HF_OUTPUT_DATASET_ID, | |
| repo_type="dataset", | |
| commit_message=f"Update caption processing state: next_index={state['next_download_index']}" | |
| ) | |
| print(f"[{FLOW_ID}] Successfully uploaded state file.") | |
| return True | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}") | |
| return False | |
| async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool: | |
| """Marks a file as 'processing' in the state file and uploads the lock.""" | |
| print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}") | |
| # Update state locally | |
| state["file_states"][zip_filename] = "processing" | |
| # Upload the updated state file immediately to establish the lock | |
| if await upload_hf_state(state): | |
| print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}") | |
| return True | |
| else: | |
| print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}") | |
| # Revert local state | |
| if zip_filename in state["file_states"]: | |
| del state["file_states"][zip_filename] | |
| return False | |
| async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool: | |
| """Marks a file as 'processed', updates the index, and uploads the state.""" | |
| print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}") | |
| # Update state locally | |
| state["file_states"][zip_filename] = "processed" | |
| state["next_download_index"] = next_index | |
| # Upload the updated state | |
| if await upload_hf_state(state): | |
| print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}") | |
| return True | |
| else: | |
| print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}") | |
| return False | |
| # --- Hugging Face Utility Functions --- | |
| async def get_zip_file_list(progress_data: Dict) -> List[str]: | |
| """ | |
| Fetches the list of all zip files from the dataset, or uses the cached list. | |
| Updates the progress_data with the file list if a new list is fetched. | |
| """ | |
| if progress_data['file_list']: | |
| print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.") | |
| return progress_data['file_list'] | |
| print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...") | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| repo_files = api.list_repo_files( | |
| repo_id=HF_DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing | |
| zip_files = sorted([ | |
| f for f in repo_files | |
| if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip') | |
| ]) | |
| if not zip_files: | |
| raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.") | |
| print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.") | |
| # Update and save the progress data | |
| progress_data['file_list'] = zip_files | |
| save_progress(progress_data) | |
| return zip_files | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}") | |
| return [] | |
| async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]: | |
| """Downloads the zip file for the given index and extracts its contents.""" | |
| # Extract the base name for the extraction directory | |
| zip_full_name = Path(repo_file_full_path).name | |
| course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name | |
| print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}") | |
| try: | |
| # Use hf_hub_download to get the file path | |
| zip_path = hf_hub_download( | |
| repo_id=HF_DATASET_ID, | |
| filename=repo_file_full_path, # Use the full path in the repo | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...") | |
| # Create a temporary directory for extraction | |
| extract_dir = TEMP_DIR / course_name | |
| # Ensure a clean directory for extraction | |
| if extract_dir.exists(): | |
| shutil.rmtree(extract_dir) | |
| extract_dir.mkdir(exist_ok=True) | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.") | |
| # Clean up the downloaded zip file to save space | |
| os.remove(zip_path) | |
| return extract_dir | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}") | |
| return None | |
| async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool: | |
| """Uploads the final captions JSON file to the output dataset.""" | |
| # Use the full zip name, replacing the extension with .json | |
| caption_filename = Path(zip_full_name).with_suffix('.json').name | |
| try: | |
| print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...") | |
| # Create JSON content in memory | |
| json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8') | |
| api = HfApi(token=HF_TOKEN) | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(json_content), | |
| path_in_repo=caption_filename, | |
| repo_id=HF_OUTPUT_DATASET_ID, | |
| repo_type="dataset", | |
| commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}" | |
| ) | |
| print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") | |
| return True | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}") | |
| return False | |
| # --- Core Processing Functions (Modified) --- | |
| async def get_available_server(timeout: float = 300.0) -> CaptionServer: | |
| """Round-robin selection of an available caption server.""" | |
| global server_index | |
| start_time = time.time() | |
| while True: | |
| # Round-robin check for an available server | |
| for _ in range(len(servers)): | |
| server = servers[server_index] | |
| server_index = (server_index + 1) % len(servers) | |
| if not server.busy: | |
| return server | |
| # If all servers are busy, wait for a short period and check again | |
| await asyncio.sleep(0.5) | |
| # Check if timeout has been reached | |
| if time.time() - start_time > timeout: | |
| raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.") | |
| async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]: | |
| """Sends a single image to a caption server for processing.""" | |
| # This function now handles server selection and retries internally | |
| MAX_RETRIES = 3 | |
| for attempt in range(MAX_RETRIES): | |
| server = None | |
| try: | |
| # 1. Get an available server (will wait if all are busy, with a timeout) | |
| server = await get_available_server() | |
| server.busy = True | |
| start_time = time.time() | |
| # Print a less verbose message only on the first attempt | |
| if attempt == 0: | |
| print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...") | |
| # 2. Prepare request data | |
| form_data = aiohttp.FormData() | |
| form_data.add_field('file', | |
| image_path.open('rb'), | |
| filename=image_path.name, | |
| content_type='image/jpeg') | |
| form_data.add_field('model_choice', MODEL_TYPE) | |
| # 3. Send request | |
| async with aiohttp.ClientSession() as session: | |
| # Increased timeout to 10 minutes (600s) as requested by user's problem description | |
| async with session.post(server.url, data=form_data, timeout=600) as resp: | |
| if resp.status == 200: | |
| result = await resp.json() | |
| caption = result.get("caption") | |
| if caption: | |
| # Update progress counter | |
| progress_tracker['completed'] += 1 | |
| if progress_tracker['completed'] % 50 == 0: | |
| print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.") | |
| # Log success only if it's not a progress report interval | |
| if progress_tracker['completed'] % 50 != 0: | |
| print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}") | |
| return { | |
| "course": course_name, | |
| "image_path": image_path.name, | |
| "caption": caption, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| else: | |
| print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...") | |
| continue # Retry with a different server | |
| else: | |
| error_text = await resp.text() | |
| print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...") | |
| continue # Retry with a different server | |
| except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e: | |
| print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...") | |
| continue # Retry with a different server | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...") | |
| continue # Retry with a different server | |
| finally: | |
| if server: | |
| end_time = time.time() | |
| server.busy = False | |
| server.total_processed += 1 | |
| server.total_time += (end_time - start_time) | |
| print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.") | |
| return None | |
| async def process_dataset_task(start_index: int): | |
| """Main task to process the dataset sequentially starting from a given index.""" | |
| # Load both local progress and HF state | |
| progress = load_progress() | |
| current_state = await download_hf_state() | |
| file_list = await get_zip_file_list(progress) | |
| if not file_list: | |
| print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.") | |
| return False | |
| # Ensure start_index is within bounds | |
| if start_index > len(file_list): | |
| print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.") | |
| return True | |
| # Determine the actual starting index in the 0-indexed list | |
| start_list_index = start_index - 1 | |
| print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.") | |
| global_success = True | |
| for i in range(start_list_index, len(file_list)): | |
| file_index = i + 1 # 1-indexed for user display and progress tracking | |
| repo_file_full_path = file_list[i] | |
| zip_full_name = Path(repo_file_full_path).name | |
| course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name | |
| # Check file state in both local and HF state | |
| file_state = current_state["file_states"].get(zip_full_name) | |
| if file_state == "processed": | |
| print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.") | |
| continue | |
| elif file_state == "processing": | |
| print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.") | |
| continue | |
| # Try to lock the file | |
| if not await lock_file_for_processing(zip_full_name, current_state): | |
| print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.") | |
| continue | |
| extract_dir = None | |
| current_file_success = False | |
| try: | |
| # 1. Download and Extract | |
| extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path) | |
| if not extract_dir: | |
| raise Exception("Failed to download or extract zip file.") | |
| # 2. Find Images | |
| # Use recursive glob to find images in subdirectories | |
| image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']] | |
| print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.") | |
| if not image_paths: | |
| print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.") | |
| current_file_success = True | |
| else: | |
| # 3. Process Images (Captioning) | |
| progress_tracker = { | |
| 'total': len(image_paths), | |
| 'completed': 0 | |
| } | |
| print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...") | |
| # Create a semaphore to limit concurrent tasks to the number of available servers | |
| semaphore = asyncio.Semaphore(len(servers)) | |
| async def limited_send_image_for_captioning(image_path, course_name, progress_tracker): | |
| async with semaphore: | |
| return await send_image_for_captioning(image_path, course_name, progress_tracker) | |
| # Create a list of tasks for parallel captioning | |
| caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths] | |
| # Run all captioning tasks concurrently | |
| results = await asyncio.gather(*caption_tasks) | |
| # Filter out failed results | |
| all_captions = [r for r in results if r is not None] | |
| # Final progress report for the current file | |
| if len(all_captions) == len(image_paths): | |
| print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.") | |
| else: | |
| print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.") | |
| # Consider the file successful if we have any captions | |
| current_file_success = len(all_captions) > 0 | |
| # 4. Upload Results | |
| if all_captions: | |
| print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...") | |
| if await upload_captions_to_hf(zip_full_name, all_captions): | |
| print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.") | |
| # Keep current_file_success as True since we have captions and successfully uploaded them | |
| current_file_success = True | |
| else: | |
| print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.") | |
| current_file_success = False | |
| else: | |
| print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.") | |
| current_file_success = False | |
| except Exception as e: | |
| print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}") | |
| current_file_success = False | |
| global_success = False # Mark overall task as failed if any file fails critically | |
| finally: | |
| # 5. Cleanup and Update Progress | |
| if extract_dir and extract_dir.exists(): | |
| print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.") | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| if current_file_success: | |
| # Update both local progress and HF state | |
| progress['last_processed_index'] = file_index | |
| progress['processed_files'][str(file_index)] = repo_file_full_path | |
| save_progress(progress) | |
| # Update HF state and unlock the file | |
| if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1): | |
| print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}") | |
| else: | |
| print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}") | |
| else: | |
| # Mark as failed in the state and continue with next file | |
| current_state["file_states"][zip_full_name] = "failed" | |
| await upload_hf_state(current_state) | |
| print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.") | |
| global_success = False | |
| print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}") | |
| return global_success | |
| # --- FastAPI App and Endpoints --- | |
| app = FastAPI( | |
| title=f"Flow Server {FLOW_ID} API", | |
| description="Sequentially processes zip files from a dataset, captions images, and tracks progress.", | |
| version="1.0.0" | |
| ) | |
| async def startup_event(): | |
| print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.") | |
| # Get both local progress and HF state | |
| progress = load_progress() | |
| current_state = await download_hf_state() | |
| # Get the next_download_index from HF state if available | |
| hf_next_index = current_state.get("next_download_index", 0) | |
| # If HF state has a higher index, use that instead of local progress | |
| if hf_next_index > 0: | |
| start_index = hf_next_index | |
| print(f"[{FLOW_ID}] Using next_download_index from HF state: {start_index}") | |
| else: | |
| # Fall back to local progress if HF state doesn't have a meaningful index | |
| start_index = progress.get('last_processed_index', 0) + 1 | |
| if start_index < AUTO_START_INDEX: | |
| start_index = AUTO_START_INDEX | |
| # Use a dummy BackgroundTasks object for the startup task | |
| # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task | |
| # to run the long-running process in the background without blocking the server startup. | |
| print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...") | |
| asyncio.create_task(process_dataset_task(start_index)) | |
| async def root(): | |
| progress = load_progress() | |
| return { | |
| "flow_id": FLOW_ID, | |
| "status": "ready", | |
| "last_processed_index": progress['last_processed_index'], | |
| "total_files_in_list": len(progress['file_list']), | |
| "processed_files_count": len(progress['processed_files']), | |
| "total_servers": len(servers), | |
| "busy_servers": sum(1 for s in servers if s.busy), | |
| } | |
| async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks): | |
| """ | |
| Starts the sequential processing of zip files from the given index in the background. | |
| """ | |
| start_index = request.start_index | |
| print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.") | |
| # Start the heavy processing in a background task so the API call returns immediately | |
| # Note: The server is already auto-starting, but this allows for manual restart/override. | |
| background_tasks.add_task(process_dataset_task, start_index) | |
| return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port. | |
| uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT) | |