Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import subprocess | |
import streamlit as st | |
from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor | |
import torch | |
from datetime import timedelta | |
from deep_translator import GoogleTranslator | |
import ffmpeg | |
# Streamlit setup | |
st.title("Video Translator (English to Arabic)") | |
st.write("Upload an English video to extract speech, translate it into Arabic, and burn the subtitles into the video.") | |
def format_time(seconds): | |
"""Convert seconds to SRT format (00:00:00,000)""" | |
td = timedelta(seconds=seconds) | |
hours, remainder = divmod(td.seconds, 3600) | |
minutes, seconds = divmod(remainder, 60) | |
milliseconds = td.microseconds // 1000 | |
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
def extract_audio(video_path): | |
"""Extract audio from video using ffmpeg""" | |
temp_dir = tempfile.gettempdir() | |
audio_path = os.path.join(temp_dir, "extracted_audio.wav") | |
# Use ffmpeg to extract audio | |
ffmpeg.input(video_path).output(audio_path, format='wav').run() | |
return audio_path | |
def transcribe_audio(audio_path): | |
"""Transcribe audio to text using the fractalego/personal-speech-to-text-model""" | |
try: | |
# Try using fractalego/personal-speech-to-text-model | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model_id = "fractalego/personal-speech-to-text-model" | |
model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id) | |
processor = AutoProcessor.from_pretrained(model_id) | |
model.to(device) | |
pipe = pipeline( | |
"automatic-speech-recognition", | |
model=model, | |
tokenizer=processor.tokenizer, | |
feature_extractor=processor.feature_extractor, | |
max_new_tokens=128, | |
chunk_length_s=30, | |
batch_size=16, | |
return_timestamps=True, | |
device=device, | |
) | |
result = pipe(audio_path) | |
return result["chunks"] | |
except Exception as e: | |
print(f"Error using fractalego model: {e}") | |
print("Using whisper model as fallback...") | |
# Use whisper as fallback | |
import whisper | |
model = whisper.load_model("base") | |
result = model.transcribe(audio_path, word_timestamps=True) | |
return result["segments"] | |
def translate_text(text): | |
"""Translate text from English to Arabic""" | |
translator = GoogleTranslator(source='en', target='ar') | |
return translator.translate(text) | |
def create_srt(segments, output_path): | |
"""Create an SRT file from translated segments ensuring proper encoding""" | |
with open(output_path, 'w', encoding='utf-8-sig') as srt_file: # UTF-8 with BOM for compatibility | |
for i, segment in enumerate(segments, start=1): | |
if hasattr(segment, 'get'): # Handle variations in output models | |
start_time = segment.get('start', 0) | |
end_time = segment.get('end', 0) | |
text = segment.get('text', '') | |
translation = segment.get('translation', '') | |
else: | |
start_time = segment.start | |
end_time = segment.end | |
text = segment.text | |
translation = getattr(segment, 'translation', text) # Use the original text if no translation | |
# Write SRT data | |
srt_file.write(f"{i}\n") | |
srt_file.write(f"{format_time(start_time)} --> {format_time(end_time)}\n") | |
srt_file.write(f"{translation}\n\n") | |
def burn_subtitles(video_path, srt_path, output_path): | |
"""Burn subtitles into video using FFmpeg with Arabic support""" | |
font_path = "/usr/share/fonts/truetype/Amiri-Regular.ttf" # Path to Amiri font | |
cmd = [ | |
'ffmpeg', '-y', | |
'-i', video_path, | |
'-vf', f"subtitles='{srt_path}':force_style='FontName={font_path},FontSize=24,PrimaryColour=&HFFFFFF,OutlineColour=&H000000,BorderStyle=3,Alignment=2,Encoding=1'", | |
'-sub_charenc', 'UTF-8', | |
'-c:v', 'libx264', '-crf', '18', | |
'-c:a', 'copy', | |
output_path | |
] | |
try: | |
subprocess.run(cmd, check=True) | |
return output_path | |
except subprocess.CalledProcessError as e: | |
print(f"FFmpeg error: {e}") | |
return None | |
def process_video(video_path): | |
"""Process the video: extract audio, transcribe, translate, create SRT, burn subtitles""" | |
temp_dir = tempfile.gettempdir() | |
file_name = os.path.splitext(os.path.basename(video_path))[0] | |
audio_path = extract_audio(video_path) | |
segments = transcribe_audio(audio_path) | |
translated_segments = [] | |
for i, segment in enumerate(segments): | |
text = segment.text if hasattr(segment, 'text') else segment.get('text', '') | |
translated_text = translate_text(text) | |
segment.translation = translated_text | |
translated_segments.append(segment) | |
srt_path = os.path.join(temp_dir, f"{file_name}.srt") | |
create_srt(translated_segments, srt_path) | |
output_path = os.path.join(temp_dir, f"{file_name}_translated.mp4") | |
result_path = burn_subtitles(video_path, srt_path, output_path) | |
return result_path, srt_path | |
# Streamlit UI | |
uploaded_video = st.file_uploader("Upload your video", type=["mp4", "mov", "avi"]) | |
if uploaded_video: | |
# Save the uploaded video temporarily | |
temp_video_path = os.path.join(tempfile.gettempdir(), uploaded_video.name) | |
with open(temp_video_path, "wb") as f: | |
f.write(uploaded_video.read()) | |
st.write("Processing your video...") | |
result_path, srt_path = process_video(temp_video_path) | |
# Show download links for processed video and subtitle file | |
st.video(result_path) | |
st.download_button("Download SRT File", srt_path) |