Spaces:
Sleeping
Sleeping
| import os | |
| import math | |
| import re | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
| from moviepy.editor import VideoFileClip | |
| def timestamp_to_seconds(timestamp): | |
| """Convert SRT timestamp to seconds""" | |
| # Split hours, minutes, and seconds (with milliseconds) | |
| hours, minutes, rest = timestamp.split(':') | |
| # Handle seconds and milliseconds (separated by comma) | |
| seconds, milliseconds = rest.split(',') | |
| total_seconds = ( | |
| int(hours) * 3600 + | |
| int(minutes) * 60 + | |
| int(seconds) + | |
| int(milliseconds) / 1000 | |
| ) | |
| return total_seconds | |
| def format_time(seconds): | |
| """Convert seconds to SRT timestamp format""" | |
| m, s = divmod(seconds, 60) | |
| h, m = divmod(m, 60) | |
| return f"{int(h):02d}:{int(m):02d}:{s:06.3f}".replace('.', ',') | |
| def clean_srt_duplicates(srt_content, time_threshold=30, similarity_threshold=0.9): | |
| """ | |
| Remove duplicate captions within a specified time range in SRT format, | |
| keeping only the last occurrence. | |
| """ | |
| # Pattern to match each SRT block, including newlines in text | |
| srt_pattern = re.compile(r"(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n(.*?)(?=\n\n|\Z)", re.DOTALL) | |
| # Store blocks with their timing information | |
| blocks = [] | |
| seen_texts = {} # Track last occurrence of each text | |
| for match in srt_pattern.finditer(srt_content): | |
| index, start_time, end_time, text = match.groups() | |
| text = text.strip() | |
| # Convert start time to seconds for comparison | |
| start_seconds = timestamp_to_seconds(start_time) | |
| # Check for similar existing captions within the time threshold | |
| is_duplicate = False | |
| for existing_text, (existing_time, existing_idx) in list(seen_texts.items()): | |
| time_diff = abs(start_seconds - existing_time) | |
| # Check if texts are identical or very similar | |
| if (text == existing_text or | |
| (len(text) > 0 and len(existing_text) > 0 and | |
| (text in existing_text or existing_text in text))): | |
| if time_diff < time_threshold: | |
| # Remove the previous occurrence if this is a duplicate | |
| blocks = [b for b in blocks if b[0] != str(existing_idx)] | |
| is_duplicate = True | |
| break | |
| if not is_duplicate or start_seconds - seen_texts.get(text, (0, 0))[0] >= time_threshold: | |
| blocks.append((index, start_time, end_time, text)) | |
| seen_texts[text] = (start_seconds, len(blocks)) | |
| # Rebuild the SRT content with proper formatting and sequential numbering | |
| cleaned_srt = [] | |
| for i, (_, start_time, end_time, text) in enumerate(blocks, 1): | |
| cleaned_srt.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n") | |
| return ''.join(cleaned_srt) | |
| def transcribe(video_file, transcribe_to_text, transcribe_to_srt, language): | |
| """ | |
| Main transcription function that processes video files and generates | |
| text and/or SRT transcriptions. | |
| """ | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| model_id = "openai/whisper-large-v3" | |
| try: | |
| # Initialize model and processor | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model_id, | |
| torch_dtype=torch_dtype, | |
| low_cpu_mem_usage=True, | |
| use_safetensors=True | |
| ) | |
| model.to(device) | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| pipe = pipeline( | |
| "automatic-speech-recognition", | |
| model=model, | |
| tokenizer=processor.tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| max_new_tokens=128, | |
| chunk_length_s=60, | |
| batch_size=4, | |
| return_timestamps=True, | |
| torch_dtype=torch_dtype, | |
| device=device, | |
| ) | |
| if video_file is None: | |
| yield "Error: No video file provided.", None | |
| return | |
| # Handle video file path | |
| video_path = video_file.name if hasattr(video_file, 'name') else video_file | |
| try: | |
| video = VideoFileClip(video_path) | |
| except Exception as e: | |
| yield f"Error processing video file: {str(e)}", None | |
| return | |
| # Process video in chunks | |
| audio = video.audio | |
| duration = video.duration | |
| n_chunks = math.ceil(duration / 10) | |
| transcription_txt = "" | |
| transcription_srt = [] | |
| for i in range(n_chunks): | |
| start = i * 10 | |
| end = min((i + 1) * 10, duration) | |
| audio_chunk = audio.subclip(start, end) | |
| temp_file_path = f"temp_audio_{i}.wav" | |
| try: | |
| # Save audio chunk to temporary file | |
| audio_chunk.write_audiofile( | |
| temp_file_path, | |
| codec='pcm_s16le', | |
| verbose=False, | |
| logger=None | |
| ) | |
| # Process audio chunk | |
| with open(temp_file_path, "rb") as temp_file: | |
| result = pipe( | |
| temp_file_path, | |
| generate_kwargs={"language": language} | |
| ) | |
| transcription_txt += result["text"] | |
| if transcribe_to_srt: | |
| for chunk in result["chunks"]: | |
| start_time, end_time = chunk["timestamp"] | |
| if start_time is not None and end_time is not None: | |
| transcription_srt.append({ | |
| "start": start_time + i * 10, | |
| "end": end_time + i * 10, | |
| "text": chunk["text"].strip() | |
| }) | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(temp_file_path): | |
| os.remove(temp_file_path) | |
| # Report progress | |
| yield f"Progress: {int(((i + 1) / n_chunks) * 100)}%", None | |
| # Prepare output | |
| output = "" | |
| srt_file_path = None | |
| if transcribe_to_text: | |
| output += "Text Transcription:\n" + transcription_txt.strip() + "\n\n" | |
| if transcribe_to_srt: | |
| output += "SRT Transcription:\n" | |
| srt_content = "" | |
| # Generate initial SRT content | |
| for i, sub in enumerate(transcription_srt, 1): | |
| srt_entry = f"{i}\n{format_time(sub['start'])} --> {format_time(sub['end'])}\n{sub['text']}\n\n" | |
| srt_content += srt_entry | |
| # Clean up duplicates | |
| cleaned_srt_content = clean_srt_duplicates(srt_content) | |
| # Save SRT content to file | |
| srt_file_path = "transcription.srt" | |
| with open(srt_file_path, "w", encoding="utf-8") as srt_file: | |
| srt_file.write(cleaned_srt_content) | |
| output += f"\nSRT file saved as: {srt_file_path}" | |
| # Clean up video object | |
| video.close() | |
| yield output, srt_file_path | |
| except Exception as e: | |
| yield f"Error during transcription: {str(e)}", None | |
| # Create Gradio interface | |
| iface = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Video(label="Upload Video"), | |
| gr.Checkbox(label="Transcribe to Text", value=True), | |
| gr.Checkbox(label="Transcribe to SRT", value=True), | |
| gr.Dropdown( | |
| choices=['en', 'he', 'it', 'es', 'fr', 'de', 'zh', 'ar'], | |
| value='en', | |
| label="Input Video Language" | |
| ) | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Transcription Output"), | |
| gr.File(label="Download SRT") | |
| ], | |
| title="WhisperCap Video Transcription", | |
| description=""" | |
| Upload a video file to transcribe. | |
| """, | |
| allow_flagging="never" | |
| ) | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| iface.launch(share=True) |