Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Dataset Tester for ML Inference Service | |
| Tests the generated PyArrow datasets against the running ML inference service. | |
| Validates API requests/responses and measures performance metrics. | |
| """ | |
| import json | |
| import time | |
| import asyncio | |
| import statistics | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| import argparse | |
| import pyarrow.parquet as pq | |
| import requests | |
| import pandas as pd | |
| class DatasetTester: | |
| def __init__(self, base_url: str = "http://127.0.0.1:8000", datasets_dir: str = "test_datasets"): | |
| self.base_url = base_url.rstrip('/') | |
| self.datasets_dir = Path(datasets_dir) | |
| self.endpoint = f"{self.base_url}/predict/resnet" | |
| self.results = [] | |
| def load_dataset(self, dataset_path: Path) -> pd.DataFrame: | |
| """Load a PyArrow dataset.""" | |
| table = pq.read_table(dataset_path) | |
| return table.to_pandas() | |
| def test_api_connection(self) -> bool: | |
| """Test if the API is running and accessible.""" | |
| try: | |
| response = requests.get(f"{self.base_url}/docs", timeout=5) | |
| return response.status_code == 200 | |
| except requests.RequestException: | |
| return False | |
| def send_prediction_request(self, api_request_json: str) -> Dict[str, Any]: | |
| """Send a single prediction request to the API.""" | |
| try: | |
| request_data = json.loads(api_request_json) | |
| start_time = time.time() | |
| response = requests.post( | |
| self.endpoint, | |
| json=request_data, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30 | |
| ) | |
| end_time = time.time() | |
| latency_ms = (end_time - start_time) * 1000 | |
| return { | |
| "success": response.status_code == 200, | |
| "status_code": response.status_code, | |
| "response": response.json() if response.status_code == 200 else response.text, | |
| "latency_ms": round(latency_ms, 2), | |
| "error": None | |
| } | |
| except requests.RequestException as e: | |
| return { | |
| "success": False, | |
| "status_code": None, | |
| "response": None, | |
| "latency_ms": None, | |
| "error": str(e) | |
| } | |
| except json.JSONDecodeError as e: | |
| return { | |
| "success": False, | |
| "status_code": None, | |
| "response": None, | |
| "latency_ms": None, | |
| "error": f"JSON decode error: {str(e)}" | |
| } | |
| def validate_response(self, actual_response: Dict[str, Any], | |
| expected_response_json: str) -> Dict[str, Any]: | |
| """Validate API response against expected response.""" | |
| try: | |
| expected = json.loads(expected_response_json) | |
| validation = { | |
| "structure_valid": True, | |
| "field_errors": [] | |
| } | |
| # Check required fields exist | |
| required_fields = ["prediction", "confidence", "predicted_label", "model", "mediaType"] | |
| for field in required_fields: | |
| if field not in actual_response: | |
| validation["structure_valid"] = False | |
| validation["field_errors"].append(f"Missing field: {field}") | |
| # Validate field types | |
| if "confidence" in actual_response: | |
| if not isinstance(actual_response["confidence"], (int, float)): | |
| validation["field_errors"].append("confidence must be numeric") | |
| elif not (0 <= actual_response["confidence"] <= 1): | |
| validation["field_errors"].append("confidence must be between 0 and 1") | |
| if "predicted_label" in actual_response: | |
| if not isinstance(actual_response["predicted_label"], int): | |
| validation["field_errors"].append("predicted_label must be integer") | |
| return validation | |
| except json.JSONDecodeError: | |
| return { | |
| "structure_valid": False, | |
| "field_errors": ["Invalid expected response JSON"] | |
| } | |
| def test_dataset(self, dataset_path: Path, max_samples: Optional[int] = None) -> Dict[str, Any]: | |
| """Test a single dataset.""" | |
| print(f"๐ Testing dataset: {dataset_path.name}") | |
| try: | |
| df = self.load_dataset(dataset_path) | |
| if max_samples: | |
| df = df.head(max_samples) | |
| results = { | |
| "dataset_name": dataset_path.stem, | |
| "total_samples": len(df), | |
| "tested_samples": 0, | |
| "successful_requests": 0, | |
| "failed_requests": 0, | |
| "validation_errors": 0, | |
| "latencies_ms": [], | |
| "errors": [], | |
| "category": df['test_category'].iloc[0] if not df.empty else "unknown" | |
| } | |
| for idx, row in df.iterrows(): | |
| print(f" Testing sample {idx + 1}/{len(df)}", end="\r") | |
| # Send API request | |
| api_result = self.send_prediction_request(row['api_request']) | |
| results["tested_samples"] += 1 | |
| if api_result["success"]: | |
| results["successful_requests"] += 1 | |
| results["latencies_ms"].append(api_result["latency_ms"]) | |
| # Validate response structure | |
| validation = self.validate_response( | |
| api_result["response"], | |
| row['expected_response'] | |
| ) | |
| if not validation["structure_valid"]: | |
| results["validation_errors"] += 1 | |
| results["errors"].append({ | |
| "sample_id": row['image_id'], | |
| "type": "validation_error", | |
| "details": validation["field_errors"] | |
| }) | |
| else: | |
| results["failed_requests"] += 1 | |
| results["errors"].append({ | |
| "sample_id": row['image_id'], | |
| "type": "request_failed", | |
| "status_code": api_result["status_code"], | |
| "error": api_result["error"] | |
| }) | |
| # Calculate statistics | |
| if results["latencies_ms"]: | |
| results["avg_latency_ms"] = round(statistics.mean(results["latencies_ms"]), 2) | |
| results["min_latency_ms"] = round(min(results["latencies_ms"]), 2) | |
| results["max_latency_ms"] = round(max(results["latencies_ms"]), 2) | |
| results["median_latency_ms"] = round(statistics.median(results["latencies_ms"]), 2) | |
| else: | |
| results.update({ | |
| "avg_latency_ms": None, | |
| "min_latency_ms": None, | |
| "max_latency_ms": None, | |
| "median_latency_ms": None | |
| }) | |
| results["success_rate"] = round( | |
| results["successful_requests"] / results["tested_samples"] * 100, 2 | |
| ) if results["tested_samples"] > 0 else 0 | |
| print(f"\n โ Completed: {results['success_rate']}% success rate") | |
| return results | |
| except Exception as e: | |
| print(f"\n โ Failed to test dataset: {str(e)}") | |
| return { | |
| "dataset_name": dataset_path.stem, | |
| "error": str(e), | |
| "success_rate": 0 | |
| } | |
| def test_all_datasets(self, max_samples_per_dataset: Optional[int] = None, | |
| category_filter: Optional[str] = None) -> Dict[str, Any]: | |
| """Test all datasets or filtered by category.""" | |
| if not self.test_api_connection(): | |
| print("โ API is not accessible. Please start the service first:") | |
| print(" uvicorn main:app --reload") | |
| return {"error": "API not accessible"} | |
| print(f" Starting dataset testing against {self.endpoint}") | |
| parquet_files = list(self.datasets_dir.glob("*.parquet")) | |
| if not parquet_files: | |
| print(f"โ No datasets found in {self.datasets_dir}") | |
| return {"error": "No datasets found"} | |
| if category_filter: | |
| parquet_files = [f for f in parquet_files if category_filter in f.name] | |
| print(f" Found {len(parquet_files)} datasets to test") | |
| all_results = [] | |
| start_time = time.time() | |
| for dataset_file in parquet_files: | |
| result = self.test_dataset(dataset_file, max_samples_per_dataset) | |
| all_results.append(result) | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| summary = self.generate_summary(all_results, total_time) | |
| self.save_results(summary, all_results) | |
| return summary | |
| def generate_summary(self, results: List[Dict[str, Any]], total_time: float) -> Dict[str, Any]: | |
| """Generate summary of all test results.""" | |
| successful_datasets = [r for r in results if r.get("success_rate", 0) > 0] | |
| failed_datasets = [r for r in results if r.get("error") or r.get("success_rate", 0) == 0] | |
| total_samples = sum(r.get("tested_samples", 0) for r in results) | |
| total_successful = sum(r.get("successful_requests", 0) for r in results) | |
| total_failed = sum(r.get("failed_requests", 0) for r in results) | |
| all_latencies = [] | |
| for r in results: | |
| all_latencies.extend(r.get("latencies_ms", [])) | |
| summary = { | |
| "test_summary": { | |
| "total_datasets": len(results), | |
| "successful_datasets": len(successful_datasets), | |
| "failed_datasets": len(failed_datasets), | |
| "total_samples_tested": total_samples, | |
| "total_successful_requests": total_successful, | |
| "total_failed_requests": total_failed, | |
| "overall_success_rate": round( | |
| total_successful / total_samples * 100, 2 | |
| ) if total_samples > 0 else 0, | |
| "total_test_time_seconds": round(total_time, 2) | |
| }, | |
| "performance_metrics": { | |
| "avg_latency_ms": round(statistics.mean(all_latencies), 2) if all_latencies else None, | |
| "median_latency_ms": round(statistics.median(all_latencies), 2) if all_latencies else None, | |
| "min_latency_ms": round(min(all_latencies), 2) if all_latencies else None, | |
| "max_latency_ms": round(max(all_latencies), 2) if all_latencies else None, | |
| "requests_per_second": round( | |
| total_successful / total_time, 2 | |
| ) if total_time > 0 else 0 | |
| }, | |
| "category_breakdown": {}, | |
| "failed_datasets": [r["dataset_name"] for r in failed_datasets] | |
| } | |
| categories = {} | |
| for result in results: | |
| category = result.get("category", "unknown") | |
| if category not in categories: | |
| categories[category] = { | |
| "count": 0, | |
| "success_rates": [], | |
| "avg_success_rate": 0 | |
| } | |
| categories[category]["count"] += 1 | |
| categories[category]["success_rates"].append(result.get("success_rate", 0)) | |
| for category, data in categories.items(): | |
| data["avg_success_rate"] = round( | |
| statistics.mean(data["success_rates"]), 2 | |
| ) if data["success_rates"] else 0 | |
| summary["category_breakdown"] = categories | |
| return summary | |
| def save_results(self, summary: Dict[str, Any], detailed_results: List[Dict[str, Any]]): | |
| """Save test results to files.""" | |
| results_dir = Path("test_results") | |
| results_dir.mkdir(exist_ok=True) | |
| timestamp = int(time.time()) | |
| # Save summary | |
| summary_path = results_dir / f"test_summary_{timestamp}.json" | |
| with open(summary_path, 'w') as f: | |
| json.dump(summary, f, indent=2) | |
| # Save detailed results | |
| detailed_path = results_dir / f"test_detailed_{timestamp}.json" | |
| with open(detailed_path, 'w') as f: | |
| json.dump(detailed_results, f, indent=2) | |
| print(f" Results saved:") | |
| print(f" Summary: {summary_path}") | |
| print(f" Details: {detailed_path}") | |
| def print_summary(self, summary: Dict[str, Any]): | |
| """Print test summary to console.""" | |
| print("\n" + "="*60) | |
| print("๐ DATASET TESTING SUMMARY") | |
| print("="*60) | |
| ts = summary["test_summary"] | |
| print(f"Datasets tested: {ts['total_datasets']}") | |
| print(f"Successful datasets: {ts['successful_datasets']}") | |
| print(f"Failed datasets: {ts['failed_datasets']}") | |
| print(f"Total samples: {ts['total_samples_tested']}") | |
| print(f"Overall success rate: {ts['overall_success_rate']}%") | |
| print(f"Test duration: {ts['total_test_time_seconds']}s") | |
| pm = summary["performance_metrics"] | |
| if pm["avg_latency_ms"]: | |
| print(f"\nPerformance:") | |
| print(f" Avg latency: {pm['avg_latency_ms']}ms") | |
| print(f" Median latency: {pm['median_latency_ms']}ms") | |
| print(f" Min latency: {pm['min_latency_ms']}ms") | |
| print(f" Max latency: {pm['max_latency_ms']}ms") | |
| print(f" Requests/sec: {pm['requests_per_second']}") | |
| print(f"\nCategory breakdown:") | |
| for category, data in summary["category_breakdown"].items(): | |
| print(f" {category}: {data['count']} datasets, {data['avg_success_rate']}% avg success") | |
| if summary["failed_datasets"]: | |
| print(f"\nFailed datasets: {', '.join(summary['failed_datasets'])}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Test PyArrow datasets against ML inference service") | |
| parser.add_argument("--base-url", default="http://127.0.0.1:8000", help="Base URL of the API") | |
| parser.add_argument("--datasets-dir", default="scripts/test_datasets", help="Directory containing datasets") | |
| parser.add_argument("--max-samples", type=int, help="Max samples per dataset to test") | |
| parser.add_argument("--category", help="Filter datasets by category (standard, edge_case, performance, model_comparison)") | |
| parser.add_argument("--quick", action="store_true", help="Quick test with max 5 samples per dataset") | |
| args = parser.parse_args() | |
| tester = DatasetTester(args.base_url, args.datasets_dir) | |
| max_samples = args.max_samples | |
| if args.quick: | |
| max_samples = 5 | |
| results = tester.test_all_datasets(max_samples, args.category) | |
| if "error" not in results: | |
| tester.print_summary(results) | |
| if results["test_summary"]["overall_success_rate"] > 90: | |
| print("\n๐ Excellent! API is working great with the datasets!") | |
| elif results["test_summary"]["overall_success_rate"] > 70: | |
| print("\n๐ Good! API works well, minor issues detected.") | |
| else: | |
| print("\nโ ๏ธ Warning: Several issues detected. Check the detailed results.") | |
| if __name__ == "__main__": | |
| main() |