Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import argparse | |
| import tempfile | |
| from typing import Dict, List, Union | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from moviepy import VideoFileClip | |
| from mllm_tools.litellm import LiteLLMWrapper | |
| from mllm_tools.gemini import GeminiWrapper | |
| from eval_suite.utils import calculate_geometric_mean | |
| from eval_suite.text_utils import parse_srt_to_text, fix_transcript, evaluate_text | |
| from eval_suite.video_utils import evaluate_video_chunk_new | |
| from eval_suite.image_utils import evaluate_sampled_images | |
| load_dotenv() | |
| with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "src", "utils", "allowed_models.json")) as f: | |
| ALLOWED_MODELS = json.load(f)["allowed_models"] | |
| def combine_results(output_folder: str, combined_file: str, results: Dict[str, Dict]) -> None: | |
| """ | |
| Combine all evaluation results into a single file. | |
| Args: | |
| output_folder (str): Directory to store the combined file. | |
| combined_file (str): Name of the combined file. | |
| results (Dict[str, Dict]): Dictionary of evaluation results with file names as keys. | |
| Returns: | |
| None | |
| """ | |
| combined_path = os.path.join(output_folder, combined_file) | |
| with open(combined_path, 'w') as output_file: | |
| json.dump(results, output_file, indent=4) | |
| def save_individual_result(output_folder: str, file_name: str, result: Dict) -> None: | |
| """ | |
| Save individual evaluation result to a file. | |
| Args: | |
| output_folder (str): Directory to store the evaluation file. | |
| file_name (str): Name of the file. | |
| result (Dict): Evaluation result. | |
| Returns: | |
| None | |
| """ | |
| current_time = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| result_file = f"evaluation_{file_name}_{current_time}.json" | |
| os.makedirs(output_folder, exist_ok=True) | |
| result_path = os.path.join(output_folder, result_file) | |
| with open(result_path, 'w') as output_file: | |
| json.dump(result, output_file, indent=4) | |
| def evaluate_text_file(model, transcript_path, retry_limit): | |
| """ | |
| Evaluate a text file using the provided model. | |
| Args: | |
| model: The model to use for evaluation. | |
| transcript_path (str): Path to the transcript file (.srt or .txt). | |
| retry_limit (int): Number of retry attempts for evaluation. | |
| Returns: | |
| Dict or None: Evaluation results if successful, None if file format unsupported. | |
| """ | |
| if not transcript_path.endswith(('.srt', '.txt')): | |
| print(f"Skipping {transcript_path}: Unsupported file format for text evaluation.") | |
| return None | |
| if transcript_path.endswith(".srt"): | |
| transcript = parse_srt_to_text(transcript_path) | |
| elif transcript_path.endswith(".txt"): | |
| with open(transcript_path) as f: | |
| transcript = f.read().strip() | |
| else: | |
| raise ValueError("Unrecognized transcript file format.") | |
| capital_letter_proportion = sum(1 for c in transcript if c.isupper()) / sum(1 for c in transcript if c.isalpha()) | |
| if capital_letter_proportion < 0.01: | |
| transcript = fix_transcript(model, transcript) | |
| print(f"Performing text evaluation: {os.path.basename(transcript_path)}") | |
| result = evaluate_text(model, transcript, retry_limit) | |
| return result | |
| def evaluate_video_file(model, video_path, transcript_path, description_path, target_fps=None, output_folder=None): | |
| """ | |
| Evaluate a video file using the provided model. | |
| Args: | |
| model: The model to use for evaluation. | |
| video_path (str): Path to the video file. | |
| transcript_path (str): Path to the transcript file. | |
| description_path (str): Path to the description file. | |
| target_fps (int, optional): Target frames per second for video processing. | |
| output_folder (str, optional): Directory to store output files. | |
| Returns: | |
| Dict or None: Evaluation results if successful, None if file format unsupported. | |
| """ | |
| if not video_path.endswith(('.mp4', '.mkv')): | |
| print(f"Skipping {video_path}: Unsupported file format for video evaluation.") | |
| return None | |
| moviepy_temp_dir = os.path.join(output_folder, "moviepy_temp") | |
| # Chunking | |
| num_chunks = 10 | |
| with VideoFileClip(video_path) as clip: | |
| duration = clip.duration | |
| chunk_duration = duration / num_chunks | |
| results = [] | |
| # Create a temporary directory in the output_folder | |
| temp_dir_parent = output_folder or os.getcwd() | |
| with tempfile.TemporaryDirectory(dir=temp_dir_parent) as temp_dir: | |
| for i in range(10): | |
| start = i * chunk_duration | |
| end = min(start + chunk_duration, duration) | |
| chunk = clip.subclipped(start, end) | |
| chunk_path = os.path.join(temp_dir, f"chunk_{i+1}.mp4") | |
| # Explicitly set the temp_audiofile path with matching codec | |
| temp_audiofile = os.path.join(moviepy_temp_dir, f"temp_audio_chunk_{i+1}.m4a") | |
| chunk.write_videofile( | |
| chunk_path, | |
| codec="libx264", | |
| audio_codec="aac", | |
| temp_audiofile=temp_audiofile, | |
| audio_bitrate="192k", | |
| preset="ultrafast", # Speed up encoding | |
| logger=None | |
| ) | |
| # Create processed videos folder inside output_folder | |
| processed_videos_dir = os.path.join(output_folder, "processed_videos") | |
| save_path = os.path.join(processed_videos_dir, f"processed_chunk_{i+1}.mp4") | |
| result = evaluate_video_chunk_new( | |
| model, | |
| chunk_path, | |
| transcript_path, | |
| description_path, | |
| target_fps=target_fps, | |
| save_processed_video=save_path | |
| ) | |
| results.append(result) | |
| score_dict = {} | |
| for key in results[0]["evaluation"].keys(): | |
| score_dict[key] = [] | |
| for result in results: | |
| score_dict[key].append(result["evaluation"][key]["score"]) | |
| evaluation = {} | |
| for key, scores in score_dict.items(): | |
| evaluation[key] = {"score": calculate_geometric_mean(scores)} | |
| result_json = { | |
| "evaluation": evaluation, | |
| "video_chunks": results | |
| } | |
| return result_json | |
| def extract_scores(data: Union[Dict, List]) -> List[int]: | |
| """ | |
| Extract all score values from a nested dictionary or list structure. | |
| Args: | |
| data (Union[Dict, List]): The data structure to extract scores from. | |
| Returns: | |
| List[int]: List of extracted score values. | |
| """ | |
| scores = [] | |
| if isinstance(data, dict): | |
| for key, value in data.items(): | |
| if "chunks" in key: | |
| continue | |
| elif isinstance(value, dict) or isinstance(value, list): | |
| scores.extend(extract_scores(value)) | |
| elif key == 'score': | |
| scores.append(value) | |
| elif isinstance(data, list): | |
| for item in data: | |
| scores.extend(extract_scores(item)) | |
| return scores | |
| def calculate_overall_score(result: Dict) -> float: | |
| """ | |
| Calculate the overall score from evaluation results. | |
| Args: | |
| result (Dict): Dictionary containing evaluation results. | |
| Returns: | |
| float: The calculated overall score. | |
| """ | |
| scores = extract_scores(result) | |
| overall_score = calculate_geometric_mean(scores) | |
| return overall_score | |
| def process_topic_name(topic_name: str) -> str: | |
| """ | |
| Process a topic name by capitalizing words and handling special characters. | |
| Args: | |
| topic_name (str): The topic name to process. | |
| Returns: | |
| str: The processed topic name. | |
| """ | |
| words = topic_name.replace("_s_", "'s_").split("_") | |
| return " ".join([word.capitalize() for word in words]) | |
| def merge_dicts(dict1: dict, dict2: dict) -> dict: | |
| """ | |
| Recursively merge two dictionaries. | |
| Args: | |
| dict1 (dict): First dictionary. | |
| dict2 (dict): Second dictionary. | |
| Returns: | |
| dict: Merged dictionary. | |
| """ | |
| merged = dict1.copy() | |
| for key, value in dict2.items(): | |
| if key in merged and isinstance(merged[key], dict) and isinstance(value, dict): | |
| merged[key] = merge_dicts(merged[key], value) | |
| else: | |
| merged[key] = value | |
| return merged | |
| def process_theorem(models, file_path: str, eval_type: str, retry_limit: int, | |
| target_fps: int = None, use_parent_folder_as_topic: bool = False, | |
| output_folder: str = None) -> tuple[str, dict]: | |
| """ | |
| Process a theorem file or directory for evaluation. | |
| Args: | |
| models: Dictionary of models for different evaluation types. | |
| file_path (str): Path to the file or directory to evaluate. | |
| eval_type (str): Type of evaluation to perform. | |
| retry_limit (int): Number of retry attempts. | |
| target_fps (int, optional): Target frames per second for video processing. | |
| use_parent_folder_as_topic (bool, optional): Use parent folder name as topic. | |
| output_folder (str, optional): Directory to store output files. | |
| Returns: | |
| tuple[str, dict]: Tuple of file name and evaluation results. | |
| """ | |
| ext_map = { | |
| 'text': ('.txt', '.srt'), | |
| 'video': ('.mp4', '.mkv') | |
| } | |
| # Handle single file evaluation | |
| if os.path.isfile(file_path): | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| file_name = os.path.basename(file_path) | |
| if eval_type == "text" and file_ext in ext_map['text']: | |
| return file_name, evaluate_text_file(models['text'], file_path, retry_limit) | |
| elif eval_type == "video" and file_ext in ext_map['video']: | |
| if use_parent_folder_as_topic: | |
| topic_name = os.path.basename(os.path.dirname(file_path)) | |
| else: | |
| topic_name = None | |
| topic_name = process_topic_name(topic_name) | |
| return file_name, evaluate_video_file(models['video'], file_path, None, topic_name, target_fps, output_folder) | |
| elif eval_type == "image" and file_ext in ext_map['video']: | |
| if use_parent_folder_as_topic: | |
| topic_name = os.path.basename(os.path.dirname(file_path)) | |
| else: | |
| topic_name = None | |
| topic_name = process_topic_name(topic_name) | |
| return file_name, evaluate_sampled_images(models['image'], file_path, topic_name, num_chunks=10, output_folder=output_folder) | |
| elif eval_type == "all": | |
| raise ValueError("Evaluation type 'all' is not supported for a single file. Try passing a folder with both a video and a subtitle file.") | |
| else: | |
| raise ValueError(f"File type of {file_path} does not match evaluation type {eval_type!r}") | |
| # Handle directory evaluation | |
| theorem_dir = file_path | |
| all_files = os.listdir(theorem_dir) | |
| # Look for transcript files, prioritizing .srt over .txt if both exist | |
| transcript_file_candidates = [f for f in all_files if f.endswith(ext_map['text']) and not f.endswith('_scene_outline.txt')] | |
| srt_files = [f for f in transcript_file_candidates if f.endswith('.srt')] | |
| txt_files = [f for f in transcript_file_candidates if f.endswith('.txt')] | |
| transcript_path = None | |
| if srt_files: | |
| transcript_path = os.path.join(theorem_dir, srt_files[0]) | |
| elif txt_files: | |
| transcript_path = os.path.join(theorem_dir, txt_files[0]) | |
| video_file_candidates = [f for f in all_files if f.endswith(ext_map['video'])] | |
| video_path = os.path.join(theorem_dir, video_file_candidates[0]) if len(video_file_candidates) == 1 else None | |
| topic_name = os.path.basename(theorem_dir) | |
| topic_name = process_topic_name(topic_name) | |
| if not video_path: | |
| print(f"Skipping {theorem_dir}: No video file found") | |
| return None, None | |
| text_result = video_result = image_result = None | |
| if eval_type == "text" or eval_type == "all": | |
| if transcript_path is None: | |
| print(f"Warning: No suitable transcript file found in {theorem_dir}") | |
| else: | |
| text_result = evaluate_text_file(models['text'], transcript_path, retry_limit) | |
| if eval_type == "video" or eval_type == "all": | |
| assert video_path is not None, f"Expected 1 video file, got {len(video_file_candidates)} for {theorem_dir}" | |
| video_result = evaluate_video_file(models['video'], video_path, transcript_path, topic_name, target_fps, output_folder) | |
| if eval_type == "image" or eval_type == "all": | |
| assert video_path is not None, f"Expected 1 video file, got {len(video_file_candidates)} for {theorem_dir}" | |
| image_result = evaluate_sampled_images(models['image'], video_path, topic_name, num_chunks=10, output_folder=output_folder) | |
| if eval_type == "all": | |
| result = {} | |
| if text_result: | |
| result = merge_dicts(result, text_result) | |
| if video_result: | |
| result = merge_dicts(result, video_result) | |
| if image_result: | |
| result = merge_dicts(result, image_result) | |
| if result: | |
| result["evaluation"]["overall_score"] = calculate_overall_score(result) | |
| else: | |
| result = text_result if eval_type == "text" else video_result if eval_type == "video" else image_result if eval_type == "image" else None | |
| file_name = os.path.basename(theorem_dir) | |
| return file_name, result | |
| def main(): | |
| """ | |
| Main function to run the evaluation script. | |
| Parses command line arguments and orchestrates the evaluation process | |
| for text, video, and image content using specified AI models. | |
| """ | |
| parser = argparse.ArgumentParser(description='Automatic evaluation of theorem explanation videos with LLMs') | |
| parser.add_argument('--model_text', type=str, | |
| choices=ALLOWED_MODELS, | |
| default='azure/gpt-4o', | |
| help='Select the AI model to use for text evaluation') | |
| parser.add_argument('--model_video', type=str, | |
| choices=['gemini/gemini-1.5-pro-002', | |
| 'gemini/gemini-2.0-flash-exp', | |
| 'gemini/gemini-2.0-pro-exp-02-05'], | |
| default='gemini/gemini-1.5-pro-002', | |
| help='Select the AI model to use for video evaluation') | |
| parser.add_argument('--model_image', type=str, | |
| choices=ALLOWED_MODELS, | |
| default='azure/gpt-4o', | |
| help='Select the AI model to use for image evaluation') | |
| parser.add_argument('--eval_type', type=str, choices=['text', 'video', 'image', 'all'], default='all', help='Type of evaluation to perform') | |
| parser.add_argument('--file_path', type=str, help='Path to a file or a theorem folder', required=True) | |
| parser.add_argument('--output_folder', type=str, help='Directory to store the evaluation files', required=True) | |
| parser.add_argument('--retry_limit', type=int, default=3, help='Number of retry attempts for each inference') | |
| parser.add_argument('--combine', action='store_true', help='Combine all results into a single JSON file') | |
| parser.add_argument('--bulk_evaluate', action='store_true', help='Evaluate a folder of theorems together', default=False) | |
| parser.add_argument('--target_fps', type=int, help='Target FPS for video processing. If not set, original video FPS will be used', required=False) | |
| parser.add_argument('--use_parent_folder_as_topic', action='store_true', help='Use parent folder name as topic name for single file evaluation', default=True) | |
| parser.add_argument('--max_workers', type=int, default=4, help='Maximum number of concurrent workers for parallel processing') | |
| args = parser.parse_args() | |
| # Initialize separate models | |
| text_model = LiteLLMWrapper( | |
| model_name=args.model_text, | |
| temperature=0.0, | |
| ) | |
| video_model = GeminiWrapper( | |
| model_name=args.model_video, | |
| temperature=0.0, | |
| ) | |
| image_model = LiteLLMWrapper( | |
| model_name=args.model_image, | |
| temperature=0.0, | |
| ) | |
| models = { | |
| 'text': text_model, | |
| 'video': video_model, | |
| 'image': image_model | |
| } | |
| theorem_dirs = [] | |
| if args.bulk_evaluate: | |
| assert os.path.isdir(args.file_path), "File path must be a folder for --bulk_evaluate" | |
| for root, dirnames, _ in os.walk(args.file_path): | |
| if not any(f.endswith(".mp4") for f in os.listdir(root)): | |
| continue | |
| theorem_dirs.append(root) | |
| elif os.path.isdir(args.file_path): | |
| assert any(f.endswith(".mp4") for f in os.listdir(args.file_path)), "The provided folder must contain a video file" | |
| theorem_dirs.append(args.file_path) | |
| # Create output directory and its temp subdirectories if it doesn't exist | |
| os.makedirs(args.output_folder, exist_ok=True) | |
| moviepy_temp_dir = os.path.join(args.output_folder, "moviepy_temp") | |
| os.makedirs(moviepy_temp_dir, exist_ok=True) | |
| VideoFileClip.DEFAULT_TEMP_DIR = moviepy_temp_dir | |
| processed_videos_dir = os.path.join(args.output_folder, "processed_videos") | |
| os.makedirs(processed_videos_dir, exist_ok=True) | |
| results = {} | |
| if theorem_dirs: | |
| for theorem_dir in theorem_dirs: | |
| file_name, result = process_theorem( | |
| models, | |
| theorem_dir, | |
| args.eval_type, | |
| args.retry_limit, | |
| args.target_fps, | |
| args.use_parent_folder_as_topic, | |
| args.output_folder | |
| ) | |
| if result is not None: | |
| results[file_name] = result | |
| if not args.combine: | |
| save_individual_result(args.output_folder, file_name, result) | |
| else: | |
| file_name, result = process_theorem( | |
| models, | |
| args.file_path, | |
| args.eval_type, | |
| args.retry_limit, | |
| args.target_fps, | |
| args.use_parent_folder_as_topic, | |
| args.output_folder | |
| ) | |
| if result is not None: | |
| results[file_name] = result | |
| if not args.combine: | |
| save_individual_result(args.output_folder, file_name, result) | |
| if args.combine: | |
| if len(results) > 1: | |
| current_time = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| combined_file = f"evaluation_{current_time}.json" | |
| combine_results(args.output_folder, combined_file, results) | |
| print("Combining results completed.") | |
| else: | |
| for file_name, result in results.items(): | |
| save_individual_result(args.output_folder, file_name, result) | |
| os.rmdir(moviepy_temp_dir) | |
| if __name__ == "__main__": | |
| main() | |