Spaces:
Running
on
Zero
Running
on
Zero
| import json | |
| import os | |
| import tempfile | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| import datasets | |
| import pandas as pd | |
| from datasets import Image | |
| from huggingface_hub import repo_exists, CommitScheduler | |
| def update_dataset_with_new_splits(new_splits: dict, process_name: str = "Main"): | |
| """ | |
| Add new splits to a regular HuggingFace dataset without downloading existing data. | |
| This function pushes individual splits to the Hub using the split parameter, | |
| which preserves all existing splits and only adds/updates the specified ones. | |
| Key Features: | |
| - No downloading of existing dataset required | |
| - Existing splits are preserved (not overwritten) | |
| - Each split is pushed individually using dataset.push_to_hub(split="name") | |
| - Efficient for large datasets with many splits | |
| Args: | |
| new_splits: dict of {split_name: DataFrame} - splits to add/update | |
| process_name: Name for logging and commit messages | |
| Example: | |
| new_splits = { | |
| "validation_2024": val_df, | |
| "test_batch_1": test_df, | |
| "custom_split": custom_df | |
| } | |
| update_dataset_with_new_splits(new_splits) | |
| """ | |
| repo_id = os.environ["REPO_ID"] | |
| hf_token = os.environ["HF_TOKEN"] | |
| print(f"\n[{process_name}] Starting dataset splits update process...") | |
| # --- Start of Critical Section --- | |
| if not repo_exists(repo_id, repo_type="dataset", token=hf_token): | |
| print(f"[{process_name}] Repository {repo_id} not found. Cannot update.") | |
| return | |
| # Skip downloading existing dataset - we'll push only new splits | |
| print(f"[{process_name}] Preparing to push {len(new_splits)} new splits individually...") | |
| # Prepare each split for individual pushing | |
| splits_to_push = [] | |
| for split_id, df in new_splits.items(): | |
| new_split_dataset = datasets.Dataset.from_pandas(df) | |
| splits_to_push.append((split_id, new_split_dataset)) | |
| print(f"[{process_name}] Prepared split '{split_id}' with {len(new_split_dataset)} entries.") | |
| # Push individual splits to Hub with Retry Mechanism | |
| _push_splits_to_hub(splits_to_push, repo_id, hf_token, process_name) | |
| print(f"[{process_name}] Finished pushing new dataset splits to Hub.") | |
| def update_dataset_with_new_images(image_df: pd.DataFrame, process_name: str = "Main", scheduler: CommitScheduler=None, dataset_dir: Path=None, jsonl_path: Path=None): | |
| """ | |
| Add new images to an image HuggingFace dataset using smart approach: | |
| - If dataset is empty/doesn't exist: Create proper HuggingFace dataset | |
| - If dataset has data: Use CommitScheduler for efficient incremental updates | |
| Key Features: | |
| - Automatically detects empty datasets and bootstraps them | |
| - Uses CommitScheduler for incremental updates on existing datasets | |
| - Saves images as PNG files with unique names | |
| - Stores metadata in JSONL format for file-based approach | |
| - Thread-safe with scheduler locking | |
| Args: | |
| image_df: DataFrame with 'id' and 'image' columns (image should be PIL Image objects) | |
| process_name: Name for logging and commit messages | |
| Example: | |
| img_df = pd.DataFrame([{"id": "img1", "image": pil_image}]) | |
| update_dataset_with_new_images(img_df) | |
| """ | |
| print(f"\n[{process_name}] Starting image dataset update...") | |
| # Validate input format for image datasets | |
| if not hasattr(image_df, 'columns'): | |
| raise ValueError(f"image_df must be a pandas DataFrame with 'id' and 'image' columns, got {type(image_df)}") | |
| # Validate required columns | |
| required_columns = ['id', 'image', 'annotated_image'] | |
| missing_columns = [col for col in required_columns if col not in image_df.columns] | |
| if missing_columns: | |
| raise ValueError(f"Missing required columns: {missing_columns}. Found columns: {list(image_df.columns)}") | |
| print(f"[{process_name}] Validated DataFrame with {len(image_df)} entries and columns: {list(image_df.columns)}") | |
| return _append_images_with_scheduler(image_df, process_name, scheduler, dataset_dir, jsonl_path) | |
| def _append_images_with_scheduler(image_df: pd.DataFrame, process_name: str, scheduler, dataset_dir, jsonl_path): | |
| """ | |
| Append images to existing dataset using CommitScheduler for efficient incremental updates. | |
| """ | |
| print(f"[{process_name}] Using CommitScheduler for incremental updates...") | |
| print(f"[IMAGE_SCHEDULER] Created CommitScheduler for {os.environ['IMAGE_REPO_ID']} with local path: {dataset_dir}") | |
| # Process each image | |
| saved_count = 0 | |
| failed_count = 0 | |
| for idx, row in image_df.iterrows(): | |
| try: | |
| image_id = row['id'] | |
| image = row['image'] | |
| annotated_image = row['annotated_image'] # Optional | |
| # Skip if image is None | |
| if image is None: | |
| print(f"[{process_name}] Skipping image {image_id}: image is None") | |
| failed_count += 1 | |
| continue | |
| if annotated_image is None: | |
| print(f"[{process_name}] Warning: annotated_image is None for {image_id}") | |
| failed_count += 1 | |
| continue | |
| # Generate unique filename | |
| unique_filename_orig = f"{uuid.uuid4()}_orig.png" | |
| unique_filename_ann = f"{uuid.uuid4()}_ann.png" | |
| image_path_orig = dataset_dir / unique_filename_orig | |
| image_path_ann = dataset_dir / unique_filename_ann | |
| # Save image and metadata with scheduler | |
| with scheduler.lock: | |
| # Save image file | |
| if hasattr(image, 'save'): | |
| # PIL Image object | |
| image.save(image_path_orig, format='PNG') | |
| elif hasattr(image, 'shape'): | |
| # Numpy array | |
| from PIL import Image as PILImage | |
| PILImage.fromarray(image).save(image_path_orig, format='PNG') | |
| else: | |
| print(f"[{process_name}] Warning: Unsupported image type for {image_id}: {type(image)}") | |
| failed_count += 1 | |
| continue | |
| # Save annotated image file | |
| if hasattr(annotated_image, 'save'): | |
| annotated_image.save(image_path_ann, format='PNG') | |
| elif hasattr(annotated_image, 'shape'): | |
| from PIL import Image as PILImage | |
| PILImage.fromarray(annotated_image).save(image_path_ann, format='PNG') | |
| else: | |
| print(f"[{process_name}] Warning: Unsupported annotated_image type for {image_id}: {type(annotated_image)}") | |
| failed_count += 1 | |
| continue | |
| # Append metadata to JSONL | |
| metadata = { | |
| "id": image_id, | |
| "file_name": unique_filename_orig, | |
| "annotated_file_name": unique_filename_ann | |
| } | |
| with jsonl_path.open("a", encoding="utf-8") as f: | |
| json.dump(metadata, f, ensure_ascii=False) | |
| f.write("\n") | |
| saved_count += 1 | |
| print(f"[{process_name}] Saved image {saved_count}/{len(image_df)}: {image_id} -> {unique_filename_orig}") | |
| except Exception as e: | |
| print(f"[{process_name}] Error processing image {image_id}: {e}") | |
| failed_count += 1 | |
| continue | |
| print(f"[{process_name}] Finished image dataset update:") | |
| print(f"[{process_name}] - Successfully saved: {saved_count} images") | |
| print(f"[{process_name}] - Failed: {failed_count} images") | |
| print(f"[{process_name}] - Images will be automatically committed to dataset repository") | |
| if saved_count == 0: | |
| print(f"[{process_name}] Warning: No images were successfully saved") | |
| return saved_count, failed_count | |
| def _push_splits_to_hub(splits_to_push: list, repo_id: str, hf_token: str, process_name: str): | |
| """ | |
| Helper function to push individual splits to Hub with retry mechanism. | |
| Args: | |
| splits_to_push: List of (split_name, dataset) tuples | |
| repo_id: HuggingFace repository ID | |
| hf_token: HuggingFace token | |
| process_name: Process name for logging | |
| """ | |
| max_retries = 5 | |
| successful_splits = [] | |
| failed_splits = [] | |
| for split_name, split_dataset in splits_to_push: | |
| print(f"[{process_name}] Pushing split '{split_name}' with {len(split_dataset)} entries...") | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"[{process_name}] Pushing split '{split_name}' (Attempt {attempt + 1}/{max_retries})...") | |
| split_dataset.push_to_hub( | |
| repo_id=repo_id, | |
| split=split_name, # This preserves existing splits | |
| token=hf_token, | |
| commit_message=f"feat: Add split '{split_name}' from {process_name} with {len(split_dataset)} entries" | |
| ) | |
| print(f"[{process_name}] Split '{split_name}' pushed successfully on attempt {attempt + 1}.") | |
| successful_splits.append(split_name) | |
| break # Exit retry loop on success | |
| except Exception as e: | |
| print(f"[{process_name}] Split '{split_name}' push attempt {attempt + 1} failed: {e}") | |
| if attempt < max_retries - 1: | |
| wait_time = 5 | |
| print(f"[{process_name}] Waiting for {wait_time} seconds before retrying...") | |
| time.sleep(wait_time) | |
| else: | |
| print(f"[{process_name}] All {max_retries} push attempts failed for split '{split_name}'.") | |
| failed_splits.append(split_name) | |
| # Report results | |
| if successful_splits: | |
| print(f"[{process_name}] Successfully pushed splits: {successful_splits}") | |
| if failed_splits: | |
| print(f"[{process_name}] Failed to push splits: {failed_splits}") | |