#!/usr/bin/env python """ Run validator in parallel across multiple processes """ import subprocess import sys import os import argparse import math from concurrent.futures import ProcessPoolExecutor, as_completed import pandas as pd def run_validator_range(args): """Run validator for a specific range""" excel_file, solver, reconciler, start, end, images, batch_size, output_base, compile_latex = args # Create unique output filename for this range range_output = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx') cmd = [ sys.executable, "universal_validator.py", excel_file, "--model", solver, "--reconciliation-model", reconciler, "--images", images, "--start", str(start), "--end", str(end), "--batch-size", str(batch_size), "--output", range_output ] if compile_latex: cmd.append("--compile-latex") print(f"[PARALLEL] Starting process for questions {start+1}-{end}...") try: # Run without capturing output so it streams to console process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace', bufsize=1 ) # Stream output output_lines = [] while True: line = process.stdout.readline() if not line: break print(f"[P{start//100+1}] {line.rstrip()}") output_lines.append(line) process.wait() if process.returncode == 0: print(f"[PARALLEL] Completed range {start+1}-{end}") return (start, end, "success", "") else: error_msg = "".join(output_lines[-20:]) # Last 20 lines print(f"[FAIL] Failed range {start+1}-{end}") return (start, end, "failed", error_msg) except Exception as e: print(f"[ERROR] Error in range {start+1}-{end}: {e}") return (start, end, "error", str(e)) def main(): parser = argparse.ArgumentParser(description='Run validator in parallel') parser.add_argument('file', help='Excel file to process') parser.add_argument('--num-processes', type=int, default=4, help='Number of parallel processes (default: 4)') parser.add_argument('--solver', default='o3-mini', help='Solver model (default: o3-mini)') parser.add_argument('--reconciler', default='gpt-4o', help='Reconciliation model (default: gpt-4o)') parser.add_argument('--images', default='when_needed', help='Image handling (default: when_needed)') parser.add_argument('--batch-size', type=int, default=5, help='Questions per batch (default: 5)') parser.add_argument('--questions-per-process', type=int, default=100, help='Questions per process (default: 100)') parser.add_argument('--output', type=str, default=None, help='Output filename for merged results') parser.add_argument('--start-range', type=int, default=0, help='Start of question range') parser.add_argument('--end-range', type=int, default=None, help='End of question range') parser.add_argument('--compile-latex', action='store_true', help='Compile LaTeX files to PDF') args = parser.parse_args() # Count total questions print(f"Loading {args.file} to count questions...") df = pd.read_excel(args.file, sheet_name='Data') # Filter for math questions if 'raw_subject' in df.columns: math_filter = df['raw_subject'].str.lower().str.contains( 'math|statistic|calculus|algebra|geometry|trigonometry', na=False, regex=True ) df = df[math_filter] # Apply range if specified if args.start_range > 0 or args.end_range: start_idx = args.start_range end_idx = args.end_range if args.end_range else len(df) df = df.iloc[start_idx:end_idx] print(f"Processing range: questions {start_idx+1} to {end_idx}") total_questions = len(df) print(f"Found {total_questions} math questions to process") # Calculate ranges questions_per_process = max(args.questions_per_process, math.ceil(total_questions / args.num_processes)) num_processes = min(args.num_processes, math.ceil(total_questions / questions_per_process)) # Generate output base filename if args.output: output_base = args.output else: from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = os.path.basename(args.file).replace('.xlsx', '') output_base = f"{base_name}_validated_{timestamp}_parallel.xlsx" ranges = [] base_start = args.start_range if args.start_range else 0 for i in range(num_processes): start = base_start + i * questions_per_process end = min(base_start + (i + 1) * questions_per_process, base_start + total_questions) if start < base_start + total_questions: ranges.append(( args.file, args.solver, args.reconciler, start, end, args.images, args.batch_size, output_base, args.compile_latex )) print(f"\nWill run {len(ranges)} parallel processes:") for i, (_, _, _, start, end, _, _, _, _) in enumerate(ranges, 1): print(f" Process {i}: questions {start+1}-{end}") # Skip confirmation in GUI mode (when output is specified) if not args.output: confirm = input("\nProceed? (Y/n): ").strip().lower() if confirm == 'n': print("Cancelled") return # Run in parallel print(f"\nStarting {len(ranges)} parallel processes...") with ProcessPoolExecutor(max_workers=num_processes) as executor: futures = {executor.submit(run_validator_range, r): r for r in ranges} completed = 0 failed = [] for future in as_completed(futures): completed += 1 start, end, status, error = future.result() if status != "success": failed.append((start, end, error)) print(f"Progress: {completed}/{len(ranges)} processes completed") # Summary print("\n" + "="*60) print("PARALLEL VALIDATION COMPLETE") print("="*60) if failed: print(f"\nFailed ranges ({len(failed)}):") for start, end, error in failed: print(f" {start}-{end}: {error[:100]}") print("\nRerun these ranges individually to retry") else: print("\nAll ranges completed successfully!") # Merge results from all processes print("\nMerging results from all processes...") merge_results(args.file, output_base, ranges) # Clean up intermediate files for _, _, _, start, end, _, _, _, _ in ranges: temp_file = output_base.replace('.xlsx', f'_p{start}_{end}.xlsx') if os.path.exists(temp_file): os.remove(temp_file) print(f" Cleaned up: {temp_file}") print(f"\nFinal results saved to: {output_base}") print(f"Results from {len(ranges)} processes have been merged") def merge_results(original_file, output_file, ranges): """Merge results from parallel processes into a single file""" import pandas as pd # Load original data original_df = pd.read_excel(original_file, sheet_name='Data') # Process each range file and update the dataframe for _, _, _, start, end, _, _, _, _ in ranges: temp_file = output_file.replace('.xlsx', f'_p{start}_{end}.xlsx') if os.path.exists(temp_file): try: temp_df = pd.read_excel(temp_file, sheet_name='Data') # Update the original dataframe with results from this range for idx in range(start, min(end, len(temp_df))): if idx < len(original_df): for col in ['model_answer_file', 'answer_match', 'latex_file', 'quality_rating', 'difficulty_level', 'quality_comment']: if col in temp_df.columns: original_df.at[idx, col] = temp_df.at[idx, col] print(f" Merged results from questions {start+1}-{end}") except Exception as e: print(f" Warning: Could not merge {temp_file}: {e}") # Save merged results with pd.ExcelWriter(output_file, engine='openpyxl') as writer: original_df.to_excel(writer, sheet_name='Data', index=False) # Copy other sheets if they exist try: xl = pd.ExcelFile(original_file) for sheet_name in xl.sheet_names: if sheet_name != 'Data': df = pd.read_excel(original_file, sheet_name=sheet_name) df.to_excel(writer, sheet_name=sheet_name, index=False) except: pass if __name__ == "__main__": main()