Spaces:
Running
Running
| import asyncio | |
| import pandas as pd | |
| import time | |
| import os | |
| from datetime import datetime, timedelta | |
| from tqdm.asyncio import tqdm_asyncio | |
| from models import models | |
| from tasks import tasks | |
| from languages import languages | |
| import json | |
| # Google Cloud Storage imports | |
| try: | |
| from google.cloud import storage | |
| GCS_AVAILABLE = True | |
| print("β Google Cloud Storage available") | |
| except ImportError: | |
| GCS_AVAILABLE = False | |
| print("β Google Cloud Storage not available - install with: pip install google-cloud-storage") | |
| async def save_results_to_gcs(results, bucket_name="ai-language-eval-results"): | |
| """Save results to Google Cloud Storage""" | |
| if not GCS_AVAILABLE: | |
| print("β Google Cloud Storage not available") | |
| return | |
| try: | |
| storage_client = storage.Client() | |
| bucket = storage_client.bucket(bucket_name) | |
| # Create bucket if it doesn't exist | |
| if not bucket.exists(): | |
| bucket = storage_client.create_bucket(bucket_name, location="us-central1") | |
| print(f"π¦ Created bucket: {bucket_name}") | |
| # Save results with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| blob_name = f"results_{timestamp}.json" | |
| blob = bucket.blob(blob_name) | |
| # Convert results to JSON and upload | |
| results_json = json.dumps(results, indent=2) | |
| blob.upload_from_string(results_json, content_type='application/json') | |
| print(f"πΎ Results saved to GCS: gs://{bucket_name}/{blob_name}") | |
| print(f"π Download with: gsutil cp gs://{bucket_name}/{blob_name} ./") | |
| # Also save latest results | |
| latest_blob = bucket.blob("results_latest.json") | |
| latest_blob.upload_from_string(results_json, content_type='application/json') | |
| print(f"πΎ Latest results: gs://{bucket_name}/results_latest.json") | |
| except Exception as e: | |
| print(f"β Failed to save to GCS: {e}") | |
| print("πΎ Results saved locally to results.json") | |
| results = pd.DataFrame() | |
| async def evaluate(): | |
| # FIXME we should not need this for-loop, but it helps | |
| n_sentences = int(os.environ.get("N_SENTENCES", 1)) # Default 1 for quick testing | |
| # Load models and languages | |
| models_df = pd.DataFrame(models) | |
| languages_df = pd.DataFrame(languages) | |
| print(f"π Running full evaluation with {len(models_df)} models.") | |
| start_time = time.time() | |
| print(f"π Starting full evaluation at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"π Evaluating {n_sentences} sentences per task") | |
| # Evaluate top languages by speakers (configurable via MAX_LANGUAGES env var) | |
| max_languages = int(os.environ.get("MAX_LANGUAGES", 2)) # Default 2 for quick testing | |
| top_languages = languages.head(max_languages) # Top N by population | |
| print(f"π Evaluating top {len(top_languages)} languages by speakers (max: {max_languages})") | |
| # For testing, just use all available languages up to max_languages | |
| for n_languages in [min(max_languages, len(top_languages))]: | |
| print(f"running evaluations for {n_languages} languages") | |
| old_results = pd.read_json("results.json") | |
| if old_results.empty: | |
| old_results = pd.DataFrame(columns=["model", "bcp_47", "task", "metric", "origin", "score"]) | |
| old_models = pd.read_json("models.json") | |
| # get all combinations of model, language and task | |
| combis = [ | |
| (model, lang.bcp_47, task_name) | |
| for model in models_df["id"] | |
| for lang in top_languages.iloc[:n_languages].itertuples() | |
| for task_name, task in tasks.items() | |
| if task_name in models_df[models_df["id"] == model]["tasks"].iloc[0] | |
| ] | |
| # filter out combinations that have already been evaluated | |
| combis = pd.DataFrame(combis, columns=["model", "bcp_47", "task"]) | |
| combis = combis.merge(old_results, on=["model", "bcp_47", "task"], how="left") | |
| combis = combis[combis["metric"].isna()][["model", "bcp_47", "task"]] | |
| # run evaluations in batches to prevent HTTP pool exhaustion | |
| all_tasks = [] | |
| for i in range(n_sentences): | |
| for model, bcp_47, task_name in combis.itertuples(index=False): | |
| # All tasks now use the same signature | |
| all_tasks.append((tasks[task_name], model, bcp_47, i)) | |
| print(f"β³ Processing {len(all_tasks)} evaluation tasks in batches...") | |
| batch_size = 50 # Process 50 tasks at a time | |
| all_results = [] | |
| for i in range(0, len(all_tasks), batch_size): | |
| batch = all_tasks[i:i+batch_size] | |
| print(f"π¦ Processing batch {i//batch_size + 1}/{(len(all_tasks) + batch_size - 1)//batch_size} ({len(batch)} tasks)") | |
| # Show what's being evaluated in this batch | |
| batch_summary = {} | |
| for task_data in batch: | |
| task_func, model, bcp_47, sentence_nr = task_data | |
| # Extract task name from function - handle both partial functions and regular functions | |
| if hasattr(task_func, 'func'): | |
| task_name = task_func.func.__name__.replace('_and_evaluate', '') | |
| else: | |
| task_name = task_func.__name__.replace('_and_evaluate', '') | |
| if task_name not in batch_summary: | |
| batch_summary[task_name] = set() | |
| batch_summary[task_name].add(bcp_47) | |
| for task_name, languages_set in batch_summary.items(): | |
| lang_list = ', '.join(sorted(languages_set)) | |
| print(f" π {task_name}: {lang_list}") | |
| batch_coroutines = [] | |
| for task_data in batch: | |
| task_func, model, bcp_47, sentence_nr = task_data | |
| batch_coroutines.append(task_func(model, bcp_47, sentence_nr)) | |
| batch_results = await asyncio.gather(*batch_coroutines, return_exceptions=True) | |
| all_results.extend(batch_results) | |
| # Small delay between batches to avoid overwhelming the API | |
| await asyncio.sleep(1) | |
| results = all_results | |
| # Filter out exceptions and flatten results | |
| valid_results = [] | |
| exception_count = 0 | |
| for r in results: | |
| if isinstance(r, Exception): | |
| exception_count += 1 | |
| continue | |
| if isinstance(r, list): | |
| valid_results.extend(r) | |
| else: | |
| valid_results.append(r) | |
| print(f"β οΈ Encountered {exception_count} API errors (model unavailable/rate limits)") | |
| print(f"β Successfully processed {len(valid_results)} evaluations") | |
| # Save partial results even if some failed | |
| if valid_results: | |
| results = valid_results | |
| args = dict(orient="records", indent=2, force_ascii=False) | |
| # Aggregate results like main branch | |
| results_df = pd.DataFrame(results) | |
| if len(results_df) > 0: | |
| results_df = ( | |
| results_df.groupby(["model", "bcp_47", "task", "metric", "origin"]) | |
| .agg({"score": "mean"}) | |
| .reset_index() | |
| ) | |
| # Merge with old results | |
| old_results = pd.read_json("results.json") | |
| results_df = pd.concat([old_results, results_df]) | |
| results_df = results_df.sort_values(by=["model", "bcp_47", "task", "metric"]) | |
| results_df.to_json("results.json", **args) | |
| print(f"πΎ Saved {len(results_df)} aggregated results to results.json") | |
| else: | |
| print("β οΈ No valid results to aggregate") | |
| else: | |
| print("β οΈ No valid results to save - all API calls failed") | |
| # Save up-to-date info on models and languages (like main branch) | |
| all_models = pd.concat([pd.DataFrame(models), old_models]) | |
| all_models = all_models.drop_duplicates(subset=["id"]).sort_values(by=["id"]) | |
| all_models.to_json("models.json", **args) | |
| pd.DataFrame(languages).to_json("languages.json", **args) | |
| # Continue with next batch even if this one had errors | |
| # Time estimation | |
| elapsed = time.time() - start_time | |
| elapsed_str = str(timedelta(seconds=int(elapsed))) | |
| if n_languages < max_languages: | |
| remaining_batches = (max_languages - n_languages) // 10 | |
| batch_count = max(1, n_languages // 10) # Avoid division by zero | |
| estimated_remaining = elapsed * remaining_batches / batch_count | |
| eta = datetime.now() + timedelta(seconds=estimated_remaining) | |
| print(f"β±οΈ Batch completed in {elapsed_str}. ETA for full run: {eta.strftime('%H:%M:%S')}") | |
| else: | |
| print(f"β Full evaluation completed in {elapsed_str}") | |
| print(f"π Finished at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| # Save results locally | |
| with open("results.json", "w") as f: | |
| json.dump(results, f, indent=2) | |
| print(f"πΎ Results saved to results.json") | |
| # Save to Google Cloud Storage | |
| await save_results_to_gcs(results) | |
| return results | |
| if __name__ == "__main__": | |
| results = asyncio.run(evaluate()) |