File size: 9,488 Bytes
7c06aef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import asyncio
import pandas as pd
import time
import os
from datetime import datetime, timedelta
from tqdm.asyncio import tqdm_asyncio
from models import models
from tasks import tasks
from languages import languages
import json

# Google Cloud Storage imports
try:
    from google.cloud import storage
    GCS_AVAILABLE = True
    print("βœ… Google Cloud Storage available")
except ImportError:
    GCS_AVAILABLE = False
    print("❌ Google Cloud Storage not available - install with: pip install google-cloud-storage")

async def save_results_to_gcs(results, bucket_name="ai-language-eval-results"):
    """Save results to Google Cloud Storage"""
    if not GCS_AVAILABLE:
        print("❌ Google Cloud Storage not available")
        return
    
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        
        # Create bucket if it doesn't exist
        if not bucket.exists():
            bucket = storage_client.create_bucket(bucket_name, location="us-central1")
            print(f"πŸ“¦ Created bucket: {bucket_name}")
        
        # Save results with timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        blob_name = f"results_{timestamp}.json"
        blob = bucket.blob(blob_name)
        
        # Convert results to JSON and upload
        results_json = json.dumps(results, indent=2)
        blob.upload_from_string(results_json, content_type='application/json')
        
        print(f"πŸ’Ύ Results saved to GCS: gs://{bucket_name}/{blob_name}")
        print(f"πŸ“Š Download with: gsutil cp gs://{bucket_name}/{blob_name} ./")
        
        # Also save latest results
        latest_blob = bucket.blob("results_latest.json")
        latest_blob.upload_from_string(results_json, content_type='application/json')
        print(f"πŸ’Ύ Latest results: gs://{bucket_name}/results_latest.json")
        
    except Exception as e:
        print(f"❌ Failed to save to GCS: {e}")
        print("πŸ’Ύ Results saved locally to results.json")

results = pd.DataFrame()

async def evaluate():
    # FIXME we should not need this for-loop, but it helps
    n_sentences = int(os.environ.get("N_SENTENCES", 1)) # Default 1 for quick testing
    
    # Load models and languages
    models_df = pd.DataFrame(models)
    languages_df = pd.DataFrame(languages)

    print(f"πŸš€ Running full evaluation with {len(models_df)} models.")
    start_time = time.time()
    print(f"πŸš€ Starting full evaluation at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"πŸ“Š Evaluating {n_sentences} sentences per task")
    
    # Evaluate top languages by speakers (configurable via MAX_LANGUAGES env var)
    max_languages = int(os.environ.get("MAX_LANGUAGES", 2))  # Default 2 for quick testing
    top_languages = languages.head(max_languages)  # Top N by population
    print(f"🌍 Evaluating top {len(top_languages)} languages by speakers (max: {max_languages})")
    
    # For testing, just use all available languages up to max_languages
    for n_languages in [min(max_languages, len(top_languages))]:
        print(f"running evaluations for {n_languages} languages")
        old_results = pd.read_json("results.json")
        if old_results.empty:
            old_results = pd.DataFrame(columns=["model", "bcp_47", "task", "metric", "origin", "score"])
        old_models = pd.read_json("models.json")
        # get all combinations of model, language and task
        combis = [
            (model, lang.bcp_47, task_name)
            for model in models_df["id"]
            for lang in top_languages.iloc[:n_languages].itertuples()
            for task_name, task in tasks.items()
            if task_name in models_df[models_df["id"] == model]["tasks"].iloc[0]
        ]
        # filter out combinations that have already been evaluated
        combis = pd.DataFrame(combis, columns=["model", "bcp_47", "task"])
        combis = combis.merge(old_results, on=["model", "bcp_47", "task"], how="left")
        combis = combis[combis["metric"].isna()][["model", "bcp_47", "task"]]
        # run evaluations in batches to prevent HTTP pool exhaustion
        all_tasks = []
        for i in range(n_sentences):
            for model, bcp_47, task_name in combis.itertuples(index=False):
                # All tasks now use the same signature
                all_tasks.append((tasks[task_name], model, bcp_47, i))
        
        print(f"⏳ Processing {len(all_tasks)} evaluation tasks in batches...")
        
        batch_size = 50  # Process 50 tasks at a time
        all_results = []
        
        for i in range(0, len(all_tasks), batch_size):
            batch = all_tasks[i:i+batch_size]
            print(f"πŸ“¦ Processing batch {i//batch_size + 1}/{(len(all_tasks) + batch_size - 1)//batch_size} ({len(batch)} tasks)")
            
            # Show what's being evaluated in this batch
            batch_summary = {}
            for task_data in batch:
                task_func, model, bcp_47, sentence_nr = task_data
                # Extract task name from function - handle both partial functions and regular functions
                if hasattr(task_func, 'func'):
                    task_name = task_func.func.__name__.replace('_and_evaluate', '')
                else:
                    task_name = task_func.__name__.replace('_and_evaluate', '')
                
                if task_name not in batch_summary:
                    batch_summary[task_name] = set()
                batch_summary[task_name].add(bcp_47)
            
            for task_name, languages_set in batch_summary.items():
                lang_list = ', '.join(sorted(languages_set))
                print(f"  πŸ”„ {task_name}: {lang_list}")
            
            batch_coroutines = []
            for task_data in batch:
                task_func, model, bcp_47, sentence_nr = task_data
                batch_coroutines.append(task_func(model, bcp_47, sentence_nr))
            batch_results = await asyncio.gather(*batch_coroutines, return_exceptions=True)
            all_results.extend(batch_results)
            
            # Small delay between batches to avoid overwhelming the API
            await asyncio.sleep(1)
        
        results = all_results
        # Filter out exceptions and flatten results
        valid_results = []
        exception_count = 0
        for r in results:
            if isinstance(r, Exception):
                exception_count += 1
                continue
            if isinstance(r, list):
                valid_results.extend(r)
            else:
                valid_results.append(r)
        
        print(f"⚠️  Encountered {exception_count} API errors (model unavailable/rate limits)")
        print(f"βœ… Successfully processed {len(valid_results)} evaluations")
        
        # Save partial results even if some failed
        if valid_results:
            results = valid_results
            args = dict(orient="records", indent=2, force_ascii=False)
            
            # Aggregate results like main branch
            results_df = pd.DataFrame(results)
            if len(results_df) > 0:
                results_df = (
                    results_df.groupby(["model", "bcp_47", "task", "metric", "origin"])
                    .agg({"score": "mean"})
                    .reset_index()
                )
                # Merge with old results
                old_results = pd.read_json("results.json")
                results_df = pd.concat([old_results, results_df])
                results_df = results_df.sort_values(by=["model", "bcp_47", "task", "metric"])
                results_df.to_json("results.json", **args)
                print(f"πŸ’Ύ Saved {len(results_df)} aggregated results to results.json")
            else:
                print("⚠️  No valid results to aggregate")
        else:
            print("⚠️  No valid results to save - all API calls failed")
            
        # Save up-to-date info on models and languages (like main branch)
        all_models = pd.concat([pd.DataFrame(models), old_models])
        all_models = all_models.drop_duplicates(subset=["id"]).sort_values(by=["id"])
        all_models.to_json("models.json", **args)
        pd.DataFrame(languages).to_json("languages.json", **args)
            
        # Continue with next batch even if this one had errors
        
        # Time estimation
        elapsed = time.time() - start_time
        elapsed_str = str(timedelta(seconds=int(elapsed)))
        if n_languages < max_languages:
            remaining_batches = (max_languages - n_languages) // 10
            batch_count = max(1, n_languages // 10)  # Avoid division by zero
            estimated_remaining = elapsed * remaining_batches / batch_count
            eta = datetime.now() + timedelta(seconds=estimated_remaining)
            print(f"⏱️  Batch completed in {elapsed_str}. ETA for full run: {eta.strftime('%H:%M:%S')}")
        else:
            print(f"βœ… Full evaluation completed in {elapsed_str}")
            print(f"πŸŽ‰ Finished at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    # Save results locally
    with open("results.json", "w") as f:
        json.dump(results, f, indent=2)
    print(f"πŸ’Ύ Results saved to results.json")
    
    # Save to Google Cloud Storage
    await save_results_to_gcs(results)
            
    return results


if __name__ == "__main__":
    results = asyncio.run(evaluate())