Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| PyArrow Dataset Generator for ML Inference Service | |
| Generates test datasets for academic challenges and model validation. | |
| Creates 100 PyArrow datasets with various image types and test scenarios. | |
| """ | |
| import base64 | |
| import json | |
| import random | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Tuple | |
| import io | |
| import numpy as np | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| from PIL import Image, ImageDraw, ImageFont | |
| class TestDatasetGenerator: | |
| def __init__(self, output_dir: str = "test_datasets"): | |
| self.output_dir = Path(output_dir) | |
| self.output_dir.mkdir(exist_ok=True) | |
| # ImageNet class labels (sample for testing) | |
| self.imagenet_labels = [ | |
| "tench", "goldfish", "great_white_shark", "tiger_shark", "hammerhead", | |
| "electric_ray", "stingray", "cock", "hen", "ostrich", "brambling", | |
| "goldfinch", "house_finch", "junco", "indigo_bunting", "robin", | |
| "bulbul", "jay", "magpie", "chickadee", "water_ouzel", "kite", | |
| "bald_eagle", "vulture", "great_grey_owl", "European_fire_salamander", | |
| "common_newt", "eft", "spotted_salamander", "axolotl", "bullfrog", | |
| "tree_frog", "tailed_frog", "loggerhead", "leatherback_turtle", | |
| "mud_turtle", "terrapin", "box_turtle", "banded_gecko", "common_iguana", | |
| "American_chameleon", "whiptail", "agama", "frilled_lizard", "alligator_lizard", | |
| "Gila_monster", "green_lizard", "African_chameleon", "Komodo_dragon", | |
| "African_crocodile", "American_alligator", "triceratops", "thunder_snake" | |
| ] | |
| def create_synthetic_image(self, width: int = 224, height: int = 224, | |
| image_type: str = "random") -> Image.Image: | |
| """Create synthetic images for testing.""" | |
| if image_type == "random": | |
| # Random noise image | |
| array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) | |
| return Image.fromarray(array) | |
| elif image_type == "geometric": | |
| # Geometric patterns | |
| img = Image.new('RGB', (width, height), color='white') | |
| draw = ImageDraw.Draw(img) | |
| # Draw random shapes | |
| for _ in range(random.randint(3, 8)): | |
| color = tuple(random.randint(0, 255) for _ in range(3)) | |
| shape_type = random.choice(['rectangle', 'ellipse']) | |
| x1, y1 = random.randint(0, width//2), random.randint(0, height//2) | |
| x2, y2 = x1 + random.randint(20, width//2), y1 + random.randint(20, height//2) | |
| if shape_type == 'rectangle': | |
| draw.rectangle([x1, y1, x2, y2], fill=color) | |
| else: | |
| draw.ellipse([x1, y1, x2, y2], fill=color) | |
| return img | |
| elif image_type == "gradient": | |
| array = np.zeros((height, width, 3), dtype=np.uint8) | |
| for i in range(height): | |
| for j in range(width): | |
| array[i, j] = [i * 255 // height, j * 255 // width, (i + j) * 255 // (height + width)] | |
| return Image.fromarray(array) | |
| elif image_type == "text": | |
| img = Image.new('RGB', (width, height), color='white') | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.load_default() | |
| except: | |
| font = None | |
| text = f"Test Image {random.randint(1, 1000)}" | |
| draw.text((width//4, height//2), text, fill='black', font=font) | |
| return img | |
| else: | |
| color = tuple(random.randint(0, 255) for _ in range(3)) | |
| return Image.new('RGB', (width, height), color=color) | |
| def image_to_base64(self, image: Image.Image, format: str = "JPEG") -> str: | |
| """Convert PIL image to base64 string.""" | |
| buffer = io.BytesIO() | |
| image.save(buffer, format=format) | |
| image_bytes = buffer.getvalue() | |
| return base64.b64encode(image_bytes).decode('utf-8') | |
| def create_api_request(self, image_b64: str, media_type: str = "image/jpeg") -> Dict[str, Any]: | |
| """Create API request structure matching your service.""" | |
| return { | |
| "image": { | |
| "mediaType": media_type, | |
| "data": image_b64 | |
| } | |
| } | |
| def create_expected_response(self, model_name: str = "microsoft/resnet-18", | |
| media_type: str = "image/jpeg") -> Dict[str, Any]: | |
| """Create expected response structure.""" | |
| prediction = random.choice(self.imagenet_labels) | |
| return { | |
| "prediction": prediction, | |
| "confidence": round(random.uniform(0.3, 0.99), 4), | |
| "predicted_label": random.randint(0, len(self.imagenet_labels) - 1), | |
| "model": model_name, | |
| "mediaType": media_type | |
| } | |
| def generate_standard_datasets(self, count: int = 25) -> List[Dict[str, Any]]: | |
| """Generate standard test cases with normal images.""" | |
| datasets = [] | |
| for i in range(count): | |
| image_types = ["random", "geometric", "gradient", "text", "solid"] | |
| sizes = [(224, 224), (256, 256), (299, 299), (384, 384)] | |
| formats = [("JPEG", "image/jpeg"), ("PNG", "image/png")] | |
| records = [] | |
| for j in range(random.randint(5, 20)): # 5-20 images per dataset | |
| img_type = random.choice(image_types) | |
| size = random.choice(sizes) | |
| format_info = random.choice(formats) | |
| image = self.create_synthetic_image(size[0], size[1], img_type) | |
| image_b64 = self.image_to_base64(image, format_info[0]) | |
| api_request = self.create_api_request(image_b64, format_info[1]) | |
| expected_response = self.create_expected_response() | |
| record = { | |
| "dataset_id": f"standard_{i:03d}", | |
| "image_id": f"img_{j:03d}", | |
| "image_type": img_type, | |
| "image_size": f"{size[0]}x{size[1]}", | |
| "format": format_info[0], | |
| "media_type": format_info[1], | |
| "api_request": json.dumps(api_request), | |
| "expected_response": json.dumps(expected_response), | |
| "test_category": "standard", | |
| "difficulty": "normal" | |
| } | |
| records.append(record) | |
| datasets.append({ | |
| "name": f"standard_test_{i:03d}", | |
| "category": "standard", | |
| "description": f"Standard test dataset {i+1} with {len(records)} images", | |
| "records": records | |
| }) | |
| return datasets | |
| def generate_edge_case_datasets(self, count: int = 25) -> List[Dict[str, Any]]: | |
| """Generate datasets for edge case scenarios.""" | |
| datasets = [] | |
| for i in range(count): | |
| records = [] | |
| edge_cases = [ | |
| {"type": "tiny", "size": (32, 32), "difficulty": "high"}, | |
| {"type": "huge", "size": (2048, 2048), "difficulty": "high"}, | |
| {"type": "extreme_aspect", "size": (1000, 50), "difficulty": "medium"}, | |
| {"type": "single_pixel", "size": (1, 1), "difficulty": "extreme"}, | |
| {"type": "corrupted_base64", "size": (224, 224), "difficulty": "extreme"} | |
| ] | |
| for j, edge_case in enumerate(edge_cases): | |
| if edge_case["type"] == "corrupted_base64": | |
| image = self.create_synthetic_image(224, 224, "random") | |
| image_b64 = self.image_to_base64(image, "JPEG") | |
| corrupted_b64 = image_b64[:-20] + "CORRUPTED_DATA" | |
| api_request = self.create_api_request(corrupted_b64) | |
| expected_response = { | |
| "error": "Invalid image data", | |
| "status": "failed" | |
| } | |
| else: | |
| image = self.create_synthetic_image( | |
| edge_case["size"][0], edge_case["size"][1], "random" | |
| ) | |
| image_b64 = self.image_to_base64(image, "PNG") | |
| api_request = self.create_api_request(image_b64, "image/png") | |
| expected_response = self.create_expected_response() | |
| record = { | |
| "dataset_id": f"edge_{i:03d}", | |
| "image_id": f"edge_{j:03d}", | |
| "image_type": edge_case["type"], | |
| "image_size": f"{edge_case['size'][0]}x{edge_case['size'][1]}", | |
| "format": "PNG", | |
| "media_type": "image/png", | |
| "api_request": json.dumps(api_request), | |
| "expected_response": json.dumps(expected_response), | |
| "test_category": "edge_case", | |
| "difficulty": edge_case["difficulty"] | |
| } | |
| records.append(record) | |
| datasets.append({ | |
| "name": f"edge_case_{i:03d}", | |
| "category": "edge_case", | |
| "description": f"Edge case dataset {i+1} with challenging scenarios", | |
| "records": records | |
| }) | |
| return datasets | |
| def generate_performance_datasets(self, count: int = 25) -> List[Dict[str, Any]]: | |
| """Generate performance benchmark datasets.""" | |
| datasets = [] | |
| for i in range(count): | |
| batch_sizes = [1, 5, 10, 25, 50, 100] | |
| batch_size = random.choice(batch_sizes) | |
| records = [] | |
| for j in range(batch_size): | |
| image = self.create_synthetic_image(224, 224, "random") | |
| image_b64 = self.image_to_base64(image, "JPEG") | |
| api_request = self.create_api_request(image_b64) | |
| expected_response = self.create_expected_response() | |
| record = { | |
| "dataset_id": f"perf_{i:03d}", | |
| "image_id": f"batch_{j:03d}", | |
| "image_type": "performance_test", | |
| "image_size": "224x224", | |
| "format": "JPEG", | |
| "media_type": "image/jpeg", | |
| "api_request": json.dumps(api_request), | |
| "expected_response": json.dumps(expected_response), | |
| "test_category": "performance", | |
| "difficulty": "normal", | |
| "batch_size": batch_size, | |
| "expected_max_latency_ms": batch_size * 100 | |
| } | |
| records.append(record) | |
| datasets.append({ | |
| "name": f"performance_test_{i:03d}", | |
| "category": "performance", | |
| "description": f"Performance dataset {i+1} with batch size {batch_size}", | |
| "records": records | |
| }) | |
| return datasets | |
| def generate_model_comparison_datasets(self, count: int = 25) -> List[Dict[str, Any]]: | |
| """Generate datasets for comparing different models.""" | |
| datasets = [] | |
| model_types = [ | |
| "microsoft/resnet-18", "microsoft/resnet-50", "google/vit-base-patch16-224", | |
| "facebook/convnext-tiny-224", "microsoft/swin-tiny-patch4-window7-224" | |
| ] | |
| for i in range(count): | |
| # Same images tested across different model types | |
| base_images = [] | |
| for _ in range(10): # 10 base images per comparison dataset | |
| image = self.create_synthetic_image(224, 224, "geometric") | |
| base_images.append(self.image_to_base64(image, "JPEG")) | |
| records = [] | |
| for j, model in enumerate(model_types): | |
| for k, image_b64 in enumerate(base_images): | |
| api_request = self.create_api_request(image_b64) | |
| expected_response = self.create_expected_response(model) | |
| record = { | |
| "dataset_id": f"comparison_{i:03d}", | |
| "image_id": f"img_{k:03d}_model_{j}", | |
| "image_type": "comparison_base", | |
| "image_size": "224x224", | |
| "format": "JPEG", | |
| "media_type": "image/jpeg", | |
| "api_request": json.dumps(api_request), | |
| "expected_response": json.dumps(expected_response), | |
| "test_category": "model_comparison", | |
| "difficulty": "normal", | |
| "model_type": model, | |
| "comparison_group": k | |
| } | |
| records.append(record) | |
| datasets.append({ | |
| "name": f"model_comparison_{i:03d}", | |
| "category": "model_comparison", | |
| "description": f"Model comparison dataset {i+1} testing {len(model_types)} models", | |
| "records": records | |
| }) | |
| return datasets | |
| def save_dataset_to_parquet(self, dataset: Dict[str, Any]): | |
| """Save a dataset to PyArrow Parquet format.""" | |
| records = dataset["records"] | |
| # Convert to PyArrow table | |
| table = pa.table({ | |
| "dataset_id": [r["dataset_id"] for r in records], | |
| "image_id": [r["image_id"] for r in records], | |
| "image_type": [r["image_type"] for r in records], | |
| "image_size": [r["image_size"] for r in records], | |
| "format": [r["format"] for r in records], | |
| "media_type": [r["media_type"] for r in records], | |
| "api_request": [r["api_request"] for r in records], | |
| "expected_response": [r["expected_response"] for r in records], | |
| "test_category": [r["test_category"] for r in records], | |
| "difficulty": [r["difficulty"] for r in records], | |
| # Optional fields with defaults | |
| "batch_size": [r.get("batch_size", 1) for r in records], | |
| "expected_max_latency_ms": [r.get("expected_max_latency_ms", 1000) for r in records], | |
| "model_type": [r.get("model_type", "microsoft/resnet-18") for r in records], | |
| "comparison_group": [r.get("comparison_group", 0) for r in records] | |
| }) | |
| output_path = self.output_dir / f"{dataset['name']}.parquet" | |
| pq.write_table(table, output_path) | |
| # Save metadata as JSON | |
| metadata = { | |
| "name": dataset["name"], | |
| "category": dataset["category"], | |
| "description": dataset["description"], | |
| "record_count": len(records), | |
| "file_size_mb": round(output_path.stat().st_size / (1024 * 1024), 2), | |
| "schema": [field.name for field in table.schema] | |
| } | |
| metadata_path = self.output_dir / f"{dataset['name']}_metadata.json" | |
| with open(metadata_path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| def generate_all_datasets(self): | |
| """Generate all 100 datasets.""" | |
| print(" Starting dataset generation...") | |
| print("π Generating standard test datasets (25)...") | |
| standard_datasets = self.generate_standard_datasets(25) | |
| for dataset in standard_datasets: | |
| self.save_dataset_to_parquet(dataset) | |
| print("β‘ Generating edge case datasets (25)...") | |
| edge_datasets = self.generate_edge_case_datasets(25) | |
| for dataset in edge_datasets: | |
| self.save_dataset_to_parquet(dataset) | |
| print("π Generating performance datasets (25)...") | |
| performance_datasets = self.generate_performance_datasets(25) | |
| for dataset in performance_datasets: | |
| self.save_dataset_to_parquet(dataset) | |
| print("π Generating model comparison datasets (25)...") | |
| comparison_datasets = self.generate_model_comparison_datasets(25) | |
| for dataset in comparison_datasets: | |
| self.save_dataset_to_parquet(dataset) | |
| print(f"β Generated 100 datasets in {self.output_dir}/") | |
| self.generate_summary() | |
| def generate_summary(self): | |
| """Generate a summary of all datasets.""" | |
| summary = { | |
| "total_datasets": 100, | |
| "categories": { | |
| "standard": 25, | |
| "edge_case": 25, | |
| "performance": 25, | |
| "model_comparison": 25 | |
| }, | |
| "dataset_info": [], | |
| "usage_instructions": { | |
| "loading": "Use pyarrow.parquet.read_table('dataset.parquet')", | |
| "testing": "Run python scripts/test_datasets.py", | |
| "api_endpoint": "POST /predict/resnet", | |
| "request_format": "See api_request column in datasets" | |
| } | |
| } | |
| # Add individual dataset info | |
| for parquet_file in self.output_dir.glob("*.parquet"): | |
| metadata_file = self.output_dir / f"{parquet_file.stem}_metadata.json" | |
| if metadata_file.exists(): | |
| with open(metadata_file, 'r') as f: | |
| metadata = json.load(f) | |
| summary["dataset_info"].append(metadata) | |
| summary_path = self.output_dir / "datasets_summary.json" | |
| with open(summary_path, 'w') as f: | |
| json.dump(summary, f, indent=2) | |
| print(f"π Summary saved to {summary_path}") | |
| if __name__ == "__main__": | |
| generator = TestDatasetGenerator() | |
| generator.generate_all_datasets() |