File size: 9,166 Bytes
9fa4d05
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
Audio segment processing for creating meaningful lyric segments for video generation.
This module takes Whisper transcription results and intelligently segments them
at natural pause points for synchronized video scene changes.
"""

import re
from typing import List, Dict, Any


def segment_lyrics(transcription_result: Dict[str, Any], min_segment_duration: float = 2.0, max_segment_duration: float = 8.0) -> List[Dict[str, Any]]:
    """
    Segment the transcription into meaningful chunks for video generation.
    
    This function takes the raw Whisper transcription and creates logical segments
    by identifying natural pause points in the audio. Each segment represents
    a coherent lyrical phrase that will correspond to one video scene.
    
    Args:
        transcription_result: Dictionary from Whisper transcription containing 'segments'
        min_segment_duration: Minimum duration for a segment in seconds
        max_segment_duration: Maximum duration for a segment in seconds
        
    Returns:
        List of segment dictionaries with keys:
        - 'text': The lyrical text for this segment
        - 'start': Start time in seconds
        - 'end': End time in seconds  
        - 'words': List of word-level timestamps (if available)
    """
    if not transcription_result or 'segments' not in transcription_result:
        return []
    
    raw_segments = transcription_result['segments']
    if not raw_segments:
        return []
    
    # First, merge very short segments and split very long ones
    processed_segments = []
    
    for segment in raw_segments:
        duration = segment.get('end', 0) - segment.get('start', 0)
        text = segment.get('text', '').strip()
        
        if duration < min_segment_duration:
            # Try to merge with previous segment if it exists and won't exceed max duration
            if (processed_segments and 
                (processed_segments[-1]['end'] - processed_segments[-1]['start'] + duration) <= max_segment_duration):
                # Merge with previous segment
                processed_segments[-1]['text'] += ' ' + text
                processed_segments[-1]['end'] = segment.get('end', processed_segments[-1]['end'])
                if 'words' in segment and 'words' in processed_segments[-1]:
                    processed_segments[-1]['words'].extend(segment['words'])
            else:
                # Add as new segment even if short
                processed_segments.append({
                    'text': text,
                    'start': segment.get('start', 0),
                    'end': segment.get('end', 0),
                    'words': segment.get('words', [])
                })
        elif duration > max_segment_duration:
            # Split long segments at natural break points
            split_segments = _split_long_segment(segment, max_segment_duration)
            processed_segments.extend(split_segments)
        else:
            # Duration is just right
            processed_segments.append({
                'text': text,
                'start': segment.get('start', 0),
                'end': segment.get('end', 0),
                'words': segment.get('words', [])
            })
    
    # Second pass: apply intelligent segmentation based on content
    final_segments = _apply_intelligent_segmentation(processed_segments, max_segment_duration)
    
    # Ensure no empty segments
    final_segments = [seg for seg in final_segments if seg['text'].strip()]
    
    return final_segments


def _split_long_segment(segment: Dict[str, Any], max_duration: float) -> List[Dict[str, Any]]:
    """
    Split a long segment into smaller ones at natural break points.
    """
    text = segment.get('text', '').strip()
    words = segment.get('words', [])
    start_time = segment.get('start', 0)
    end_time = segment.get('end', 0)
    duration = end_time - start_time
    
    if not words or duration <= max_duration:
        return [segment]
    
    # Try to split at punctuation marks or word boundaries
    split_points = []
    
    # Find punctuation-based split points
    for i, word in enumerate(words):
        word_text = word.get('word', '').strip()
        if re.search(r'[.!?;,:]', word_text):
            split_points.append(i)
    
    # If no punctuation, split at word boundaries roughly evenly
    if not split_points:
        target_splits = int(duration / max_duration)
        words_per_split = len(words) // (target_splits + 1)
        split_points = [i * words_per_split for i in range(1, target_splits + 1) if i * words_per_split < len(words)]
    
    if not split_points:
        return [segment]
    
    # Create segments from split points
    segments = []
    last_idx = 0
    
    for split_idx in split_points:
        if split_idx >= len(words):
            continue
            
        segment_words = words[last_idx:split_idx + 1]
        if segment_words:
            segments.append({
                'text': ' '.join([w.get('word', '') for w in segment_words]).strip(),
                'start': segment_words[0].get('start', start_time),
                'end': segment_words[-1].get('end', end_time),
                'words': segment_words
            })
        last_idx = split_idx + 1
    
    # Add remaining words as final segment
    if last_idx < len(words):
        segment_words = words[last_idx:]
        segments.append({
            'text': ' '.join([w.get('word', '') for w in segment_words]).strip(),
            'start': segment_words[0].get('start', start_time),
            'end': segment_words[-1].get('end', end_time),
            'words': segment_words
        })
    
    return segments


def _apply_intelligent_segmentation(segments: List[Dict[str, Any]], max_duration: float) -> List[Dict[str, Any]]:
    """
    Apply intelligent segmentation rules based on lyrical content and timing.
    """
    if not segments:
        return []
    
    final_segments = []
    current_segment = None
    
    for segment in segments:
        text = segment['text'].strip()
        
        # Skip empty segments
        if not text:
            continue
        
        # If no current segment, start a new one
        if current_segment is None:
            current_segment = segment.copy()
            continue
        
        # Check if we should merge with current segment
        should_merge = _should_merge_segments(current_segment, segment, max_duration)
        
        if should_merge:
            # Merge segments
            current_segment['text'] += ' ' + segment['text']
            current_segment['end'] = segment['end']
            if 'words' in segment and 'words' in current_segment:
                current_segment['words'].extend(segment['words'])
        else:
            # Finalize current segment and start new one
            final_segments.append(current_segment)
            current_segment = segment.copy()
    
    # Add the last segment
    if current_segment is not None:
        final_segments.append(current_segment)
    
    return final_segments


def _should_merge_segments(current: Dict[str, Any], next_seg: Dict[str, Any], max_duration: float) -> bool:
    """
    Determine if two segments should be merged based on content and timing.
    """
    # Check duration constraint
    merged_duration = next_seg['end'] - current['start']
    if merged_duration > max_duration:
        return False
    
    current_text = current['text'].strip()
    next_text = next_seg['text'].strip()
    
    # Don't merge if current segment ends with strong punctuation
    if re.search(r'[.!?]$', current_text):
        return False
    
    # Merge if current segment is very short (likely incomplete phrase)
    if len(current_text.split()) < 3:
        return True
    
    # Merge if next segment starts with a lowercase word (continuation)
    if next_text and next_text[0].islower():
        return True
    
    # Merge if there's a short gap between segments (< 0.5 seconds)
    gap = next_seg['start'] - current['end']
    if gap < 0.5:
        return True
    
    # Don't merge by default
    return False


def get_segment_info(segments: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Get summary information about the segments.
    
    Args:
        segments: List of segment dictionaries
        
    Returns:
        Dictionary with segment statistics
    """
    if not segments:
        return {
            'total_segments': 0,
            'total_duration': 0,
            'average_duration': 0,
            'shortest_duration': 0,
            'longest_duration': 0
        }
    
    durations = [seg['end'] - seg['start'] for seg in segments]
    total_duration = segments[-1]['end'] - segments[0]['start'] if segments else 0
    
    return {
        'total_segments': len(segments),
        'total_duration': total_duration,
        'average_duration': sum(durations) / len(durations),
        'shortest_duration': min(durations),
        'longest_duration': max(durations),
        'segments_preview': [{'text': seg['text'][:50] + '...', 'duration': seg['end'] - seg['start']} for seg in segments[:5]]
    }