File size: 3,970 Bytes
8fb7841
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import os
import tempfile

import numpy as np
from PIL import Image, ImageOps
from moviepy import VideoFileClip

from eval_suite.prompts_raw import _image_eval
from eval_suite.utils import extract_json, convert_score_fields, calculate_geometric_mean
from mllm_tools.utils import _prepare_text_image_inputs
from src.core.parse_video import image_with_most_non_black_space

def extract_key_frames(video_path, output_dir, num_chunks):
    """Extract key frames from a video by dividing it into chunks and selecting representative frames.

    Args:
        video_path (str): Path to the input video file
        output_dir (str): Directory where extracted frames will be saved
        num_chunks (int): Number of chunks to divide the video into

    Returns:
        list: List of paths to the extracted key frames
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Extract all frames from the video
    clip = VideoFileClip(video_path)
    frames = list(clip.iter_frames(fps=1))  # one frame every second
    
    total_frames = len(frames)
    if total_frames == 0:
        print("No frames extracted from the video.")
        return []
    
    # Determine the number of frames per chunk
    frames_per_chunk = total_frames // num_chunks
    num_chunks = min(num_chunks, (total_frames + frames_per_chunk - 1) // frames_per_chunk)
    
    key_frames = []
    
    # Process each chunk of frames
    for i in range(num_chunks):
        start_idx = i * frames_per_chunk
        end_idx = min((i + 1) * frames_per_chunk, total_frames)
        chunk_frames = frames[start_idx:end_idx]
        
        if chunk_frames:
            # Save the frame with most non-black space
            output_path = os.path.join(output_dir, f"key_frame_{i+1}.jpg")
            result = image_with_most_non_black_space(chunk_frames, output_path)
        else:
            print(f"No frames in chunk {i+1}. Skipping.")
            result = None
        
        if result is not None:
            key_frames.append(output_path)
    clip.close()
    
    return key_frames


def evaluate_sampled_images(model, video_path, description="No description provided", num_chunks=10, output_folder=None):
    """Evaluate sampled frames from a video using an image evaluation model.

    Args:
        model: The image evaluation model to use
        video_path (str): Path to the input video file
        description (str, optional): Description of the video content. Defaults to "No description provided"
        num_chunks (int, optional): Number of chunks to divide the video into. Defaults to 10
        output_folder (str, optional): Directory for temporary files. Defaults to None

    Returns:
        dict: Dictionary containing evaluation scores and individual frame assessments with keys:
            - evaluation: Dictionary of averaged scores for each criterion
            - image_chunks: List of individual frame evaluation results
    """
    with tempfile.TemporaryDirectory(dir=output_folder) as temp_dir:
        key_frames = extract_key_frames(video_path, temp_dir, num_chunks)

        prompt = _image_eval.format(description=description)

        responses = []
        for key_frame in key_frames:
            inputs = _prepare_text_image_inputs(prompt, key_frame)
            response = model(inputs)
            response_json = extract_json(response)
            response_json = convert_score_fields(response_json)
            responses.append(response_json)

    criteria = list(responses[0]["evaluation"].keys())
    scores_dict = {c: [] for c in criteria}
    for response in responses:
        for key, val in response["evaluation"].items():
            scores_dict[key].append(val["score"])

    res_score = {}
    for key, scores in scores_dict.items():
        res_score[key] = {"score": calculate_geometric_mean(scores)}

    return {
        "evaluation": res_score,
        "image_chunks": responses
    }