Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Asynchronous Complete GAIA Test System | |
Main orchestrator for concurrent testing of all GAIA questions with honest accuracy measurement. | |
""" | |
import asyncio | |
import json | |
import logging | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Tuple | |
import sys | |
import os | |
# Add the project root to the Python path | |
sys.path.insert(0, str(Path(__file__).parent)) | |
from async_question_processor import AsyncQuestionProcessor | |
from classification_analyzer import ClassificationAnalyzer | |
from summary_report_generator import SummaryReportGenerator | |
class AsyncGAIATestSystem: | |
"""Main orchestrator for asynchronous GAIA testing with honest accuracy measurement.""" | |
def __init__(self, | |
max_concurrent: int = 3, | |
timeout_seconds: int = 900, | |
output_dir: str = "logs"): | |
""" | |
Initialize the async test system. | |
Args: | |
max_concurrent: Maximum number of concurrent question processors | |
timeout_seconds: Timeout per question (15 minutes default) | |
output_dir: Directory for test results and logs | |
""" | |
self.max_concurrent = max_concurrent | |
self.timeout_seconds = timeout_seconds | |
self.output_dir = Path(output_dir) | |
self.output_dir.mkdir(exist_ok=True) | |
# Create timestamped session directory | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
self.session_dir = self.output_dir / f"session_{timestamp}" | |
self.session_dir.mkdir(exist_ok=True) | |
# Initialize components | |
self.processor = AsyncQuestionProcessor( | |
session_dir=self.session_dir, | |
timeout_seconds=self.timeout_seconds | |
) | |
self.analyzer = ClassificationAnalyzer() | |
self.reporter = SummaryReportGenerator() | |
# Setup logging | |
self.setup_logging() | |
# Test results tracking | |
self.results: Dict[str, Dict] = {} | |
self.start_time: Optional[float] = None | |
self.end_time: Optional[float] = None | |
def setup_logging(self): | |
"""Setup comprehensive logging for the test session.""" | |
log_file = self.session_dir / "async_test_system.log" | |
# Configure logger | |
self.logger = logging.getLogger("AsyncGAIATest") | |
self.logger.setLevel(logging.INFO) | |
# File handler | |
file_handler = logging.FileHandler(log_file) | |
file_handler.setLevel(logging.INFO) | |
# Console handler | |
console_handler = logging.StreamHandler() | |
console_handler.setLevel(logging.INFO) | |
# Formatter | |
formatter = logging.Formatter( | |
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
file_handler.setFormatter(formatter) | |
console_handler.setFormatter(formatter) | |
# Add handlers | |
self.logger.addHandler(file_handler) | |
self.logger.addHandler(console_handler) | |
async def load_questions(self) -> List[Dict]: | |
"""Load GAIA questions from the standard source.""" | |
questions_file = Path("gaia_questions_list.txt") | |
if not questions_file.exists(): | |
self.logger.error(f"Questions file not found: {questions_file}") | |
return [] | |
questions = [] | |
try: | |
with open(questions_file, 'r') as f: | |
for line in f: | |
line = line.strip() | |
if line and line.startswith('{'): | |
try: | |
question = json.loads(line) | |
questions.append(question) | |
except json.JSONDecodeError as e: | |
self.logger.warning(f"Failed to parse question line: {line[:50]}... - {e}") | |
self.logger.info(f"Loaded {len(questions)} questions for testing") | |
return questions | |
except Exception as e: | |
self.logger.error(f"Failed to load questions: {e}") | |
return [] | |
async def process_question_batch(self, questions: List[Dict]) -> Dict[str, Dict]: | |
"""Process a batch of questions concurrently.""" | |
# Create semaphore to limit concurrent processing | |
semaphore = asyncio.Semaphore(self.max_concurrent) | |
async def process_single_question(question: Dict) -> Tuple[str, Dict]: | |
"""Process a single question with semaphore control.""" | |
async with semaphore: | |
question_id = question.get('task_id', 'unknown') | |
self.logger.info(f"Starting processing for question {question_id}") | |
try: | |
result = await self.processor.process_question(question) | |
self.logger.info(f"Completed processing for question {question_id}") | |
return question_id, result | |
except Exception as e: | |
self.logger.error(f"Failed to process question {question_id}: {e}") | |
return question_id, { | |
'status': 'error', | |
'error': str(e), | |
'timestamp': datetime.now().isoformat() | |
} | |
# Create tasks for all questions | |
tasks = [process_single_question(q) for q in questions] | |
# Process all questions concurrently | |
self.logger.info(f"Starting concurrent processing of {len(questions)} questions (max_concurrent={self.max_concurrent})") | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Organize results | |
organized_results = {} | |
for result in results: | |
if isinstance(result, Exception): | |
self.logger.error(f"Task failed with exception: {result}") | |
continue | |
question_id, question_result = result | |
organized_results[question_id] = question_result | |
return organized_results | |
async def run_complete_test(self) -> Dict: | |
"""Run the complete asynchronous GAIA test system.""" | |
self.logger.info("=" * 80) | |
self.logger.info("ASYNC GAIA TEST SYSTEM - STARTING COMPLETE TEST") | |
self.logger.info("=" * 80) | |
self.start_time = time.time() | |
try: | |
# Load questions | |
self.logger.info("Loading GAIA questions...") | |
questions = await self.load_questions() | |
if not questions: | |
self.logger.error("No questions loaded. Aborting test.") | |
return {"status": "error", "message": "No questions loaded"} | |
self.logger.info(f"Processing {len(questions)} questions with max_concurrent={self.max_concurrent}") | |
# Process questions concurrently | |
self.results = await self.process_question_batch(questions) | |
self.end_time = time.time() | |
total_duration = self.end_time - self.start_time | |
self.logger.info(f"All questions processed in {total_duration:.2f} seconds") | |
# Generate analysis and reports | |
await self.generate_comprehensive_analysis() | |
# Create session summary | |
session_summary = { | |
"session_id": self.session_dir.name, | |
"start_time": datetime.fromtimestamp(self.start_time).isoformat(), | |
"end_time": datetime.fromtimestamp(self.end_time).isoformat(), | |
"total_duration_seconds": total_duration, | |
"questions_processed": len(self.results), | |
"max_concurrent": self.max_concurrent, | |
"timeout_seconds": self.timeout_seconds, | |
"session_dir": str(self.session_dir), | |
"results": self.results | |
} | |
# Save session summary | |
summary_file = self.session_dir / "session_summary.json" | |
with open(summary_file, 'w') as f: | |
json.dump(session_summary, f, indent=2) | |
# Generate evaluation report | |
await self.generate_evaluation_report() | |
self.logger.info(f"Session summary saved to: {summary_file}") | |
return session_summary | |
except Exception as e: | |
self.logger.error(f"Complete test failed: {e}") | |
return {"status": "error", "message": str(e)} | |
async def generate_comprehensive_analysis(self): | |
"""Generate comprehensive analysis and reports.""" | |
self.logger.info("Generating comprehensive analysis...") | |
try: | |
# Classification-based analysis | |
classification_report = await self.analyzer.analyze_by_classification( | |
self.results, self.session_dir | |
) | |
# Master summary report | |
summary_report = await self.reporter.generate_master_report( | |
self.results, self.session_dir, classification_report | |
) | |
self.logger.info("Analysis and reports generated successfully") | |
except Exception as e: | |
self.logger.error(f"Failed to generate analysis: {e}") | |
async def generate_evaluation_report(self): | |
"""Generate evaluation report with final vs expected answers.""" | |
self.logger.info("Generating evaluation report...") | |
try: | |
# Load expected answers from validation metadata | |
validation_file = Path("gaia_validation_metadata.jsonl") | |
expected_answers = {} | |
if validation_file.exists(): | |
with open(validation_file, 'r') as f: | |
for line in f: | |
try: | |
data = json.loads(line.strip()) | |
task_id = data.get('task_id') | |
final_answer = data.get('Final answer') | |
question_text = data.get('Question') | |
if task_id and final_answer: | |
expected_answers[task_id] = { | |
'final_answer': final_answer, | |
'question_text': question_text | |
} | |
except json.JSONDecodeError: | |
continue | |
self.logger.info(f"Loaded expected answers for {len(expected_answers)} questions from validation metadata") | |
evaluation_data = [] | |
correct_count = 0 | |
total_count = len(self.results) | |
for question_id, result in self.results.items(): | |
validation_data = expected_answers.get(question_id, {}) | |
expected_answer = validation_data.get('final_answer', 'N/A') | |
question_text = validation_data.get('question_text', 'N/A') | |
# Extract final answer from solver result | |
solver_result = result.get('solver_result', {}) | |
final_answer = solver_result.get('answer', 'N/A') | |
# Check if correct (simple string comparison) | |
is_correct = str(final_answer).strip().lower() == str(expected_answer).strip().lower() | |
if is_correct: | |
correct_count += 1 | |
evaluation_entry = { | |
'question_id': question_id, | |
'question_text': question_text, | |
'final_answer': final_answer, | |
'expected_answer': expected_answer, | |
'is_correct': is_correct, | |
'classification': result.get('classification', {}), | |
'execution_time': solver_result.get('execution_time', 0), | |
'status': solver_result.get('status', 'unknown') | |
} | |
evaluation_data.append(evaluation_entry) | |
# Calculate accuracy | |
accuracy = (correct_count / total_count) * 100 if total_count > 0 else 0 | |
evaluation_report = { | |
'session_id': self.session_dir.name, | |
'timestamp': datetime.now().isoformat(), | |
'total_questions': total_count, | |
'correct_answers': correct_count, | |
'accuracy_percentage': accuracy, | |
'target_accuracy': 70.0, | |
'target_achieved': accuracy >= 70.0, | |
'detailed_results': evaluation_data | |
} | |
# Save evaluation report | |
eval_file = self.session_dir / "evaluation_report.json" | |
with open(eval_file, 'w') as f: | |
json.dump(evaluation_report, f, indent=2) | |
# Create human-readable summary | |
summary_text = f""" | |
# GAIA Test Evaluation Summary | |
## Session: {self.session_dir.name} | |
- **Total Questions**: {total_count} | |
- **Correct Answers**: {correct_count} | |
- **Accuracy**: {accuracy:.1f}% | |
- **Target**: 70.0% | |
- **Target Achieved**: {'β YES' if accuracy >= 70.0 else 'β NO'} | |
## Question-by-Question Results: | |
""" | |
for entry in evaluation_data: | |
status_emoji = "β " if entry['is_correct'] else "β" | |
summary_text += f""" | |
### {entry['question_id']} | |
{status_emoji} **Status**: {'CORRECT' if entry['is_correct'] else 'INCORRECT'} | |
- **Question**: {entry['question_text'][:100]}... | |
- **Final Answer**: {entry['final_answer']} | |
- **Expected Answer**: {entry['expected_answer']} | |
- **Execution Time**: {entry['execution_time']:.2f}s | |
""" | |
summary_file = self.session_dir / "evaluation_summary.md" | |
with open(summary_file, 'w') as f: | |
f.write(summary_text) | |
self.logger.info(f"Evaluation report saved to: {eval_file}") | |
self.logger.info(f"Human-readable summary saved to: {summary_file}") | |
self.logger.info(f"ACCURACY ACHIEVED: {accuracy:.1f}% ({correct_count}/{total_count})") | |
except Exception as e: | |
self.logger.error(f"Failed to generate evaluation report: {e}") | |
def main(): | |
"""Main entry point for the async test system.""" | |
import argparse | |
parser = argparse.ArgumentParser(description="Asynchronous GAIA Test System") | |
parser.add_argument('--max-concurrent', type=int, default=3, | |
help='Maximum concurrent question processors (default: 3)') | |
parser.add_argument('--timeout', type=int, default=900, | |
help='Timeout per question in seconds (default: 900)') | |
parser.add_argument('--output-dir', type=str, default='logs', | |
help='Output directory for results (default: logs)') | |
args = parser.parse_args() | |
# Create and run the test system | |
system = AsyncGAIATestSystem( | |
max_concurrent=args.max_concurrent, | |
timeout_seconds=args.timeout, | |
output_dir=args.output_dir | |
) | |
# Run the async test | |
try: | |
result = asyncio.run(system.run_complete_test()) | |
if result.get("status") == "error": | |
print(f"Test failed: {result.get('message')}") | |
sys.exit(1) | |
else: | |
print(f"Test completed successfully!") | |
print(f"Results saved to: {system.session_dir}") | |
except KeyboardInterrupt: | |
print("\nTest interrupted by user") | |
sys.exit(1) | |
except Exception as e: | |
print(f"Test failed with exception: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |