vidtranslator / download.py
altryne's picture
Removing vtt_file t0 make better jsons
7b98a4b unverified
import shutil
import sys
import time
from pathlib import Path
import anvil.server
import anvil.media
from whisper.utils import write_srt, write_vtt
from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadError
import os
import tempfile
import json
import argparse
import whisper
from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE
import ffmpeg
from utils.subs import bake_subs, get_srt
from utils.utils import get_args
original_dir = os.getcwd()
output_dir = Path('output')
args = get_args()
model_size: str = args.get("model", os.environ.get("WHISPER_MODEL", "large"))
preload_model: bool = args.get("preload")
if preload_model:
print("Preloading model")
model = whisper.load_model(model_size)
def download_generator(url, translate_action=True, source_language='Autodetect', corrected_subtitles=None):
# Step 1 : check if video is available
yield {"message": f"Checking {url} for videos"}
try:
meta = check_download(url)
# print(json.dumps(meta, indent=2))
# if(meta['duration'] > 159) :
# raise Exception("Video is too long, please use videos less than 159 seconds")
yield {"message": f"Found video with {meta['duration']} seconds duration from {meta['extractor']}", "meta": meta}
tempdir = output_dir/f"{meta['id']}"
except Exception as e:
yield {"message": f"{e}"}
return
# Step 2 : Download video and extract audio
try:
# check if we already have the folder and the main files
if(tempdir.is_dir() and (tempdir/f"{meta['id']}.{meta['ext']}").is_file() and (tempdir/f"{meta['id']}.mp3").is_file()):
yield {"message": f"Using cached files"}
video = str((tempdir/f"{meta['id']}.{meta['ext']}").resolve())
audio = str((tempdir/f"{meta['id']}.mp3").resolve())
else:
yield {"message": f"Starting download with URL {url}, this may take a while"}
meta, video, audio = download(url, tempdir)
yield {"message": f"Downloaded video and extracted audio", "video": video, "audio": audio, "meta": meta}
except Exception as e:
os.chdir(original_dir)
yield {"message": f"{e}"}
raise e
srt_path = tempdir / f"{meta['id']}.srt"
vtt_path = tempdir / f"{meta['id']}.vtt"
if not corrected_subtitles:
### Step 3 : Transcribe with whisper
yield {"message": f"[PLEASE WAIT] Starting whisper transcribe with {meta['id']}.mp3"}
try:
whisper_result = transcribe(audio, translate_action, source_language)
with open(srt_path, "w", encoding="utf-8") as srt:
write_srt(whisper_result["segments"], file=srt)
with open(vtt_path, "w", encoding="utf-8") as vtt:
write_vtt(whisper_result["segments"], file=vtt)
whisper_result["srt"] = Path(srt_path).read_text()
whisper_result["vtt"] = Path(vtt_path).read_text()
yield {"message": f"Transcribe successful", "whisper_result": whisper_result, "meta": meta, "srt_path": srt_path, "vtt_path": vtt_path}
except Exception as e:
os.chdir(original_dir)
yield {"message": f"{e}"}
raise e
else:
### step 3.5 : use corrected subtitles
yield {"message": f"Using corrected subtitles"}
with open(srt_path, "w", encoding="utf-8") as srt:
srt.write(corrected_subtitles)
yield {"message": f"Transcribe successful", "srt_path": srt_path, "meta": meta}
### Step 4 : Bake subtitles into video with ffmpeg
yield {"message": f"[PLEASE WAIT] baking subtitles into video"}
try:
print('Stating to bake subtitles')
subbed_video_path = tempdir / f"{meta['id']}_translated.mp4"
fontsdir = Path('fonts')
bake_subs(video, subbed_video_path.absolute() , srt_path.absolute(), fontsdir, translate_action)
yield {"message": f"Subtitled video ready!", "sub_video": str(subbed_video_path.absolute()), "meta": meta, "vtt_path": vtt_path}
except ffmpeg.Error as e:
print('stdout:', e.stdout.decode('utf8'))
print('stderr:', e.stderr.decode('utf8'))
raise e
except Exception as e:
print('stdout:', e.stdout.decode('utf8'))
print('stderr:', e.stderr.decode('utf8'))
os.chdir(original_dir)
print('error', file=sys.stderr)
raise e
yield {"message": f"{e}"}
def user_uploaded_video_generator(video, translate_action=True, source_language='Autodetect', corrected_subtitles=None):
video_name = Path(video).stem
# create tempdir
tempdir = output_dir / video_name
tempdir.mkdir(parents=True, exist_ok=True)
# copy video with shutil.copy2
video_path = tempdir / Path(video).name
shutil.copy2(video, video_path)
yield {"message": f"Extracting audio from {video_name}", "video": video_path}
# TODO : extract audio from videos
output_audio = tempdir / f"{video_name}.mp3"
ffmpeg.input(video_path).output(filename=output_audio).run()
yield {"message": f"Got audio from {video_name}", "video": video, "audio": output_audio}
# Run whisper on the audio with language unless auto
try:
audio_file = output_audio
print(f"Starting whisper transcribe with {output_audio}")
transcribe_whisper_result = transcribe(audio_file, translate_action=False, language='Autodetect', override_model_size=model_size)
yield {"message": f"Finished transcription, starting translation to {transcribe_whisper_result['language']}"}
detected_language = LANGUAGES[transcribe_whisper_result["language"]]
translate_whisper_result = transcribe(audio_file, translate_action=True, language=detected_language, override_model_size=model_size)
yield {"message": f"Finished translation to English, preparing subtitle files"}
with open(tempdir / f"{video_name}.vtt", "w", encoding="utf-8") as vtt:
write_vtt(transcribe_whisper_result['segments'], file=vtt)
# yield {"message": f"Created VTT files", "vtt_path": f"{video_name}.vtt", "vtt_en_path": f"{video_name}.en.vtt"}
# write_srt(transcribe_whisper_result['segments'], tempdir / f"{video_name}.srt")
# write_srt(translate_whisper_result['segments'], tempdir / f"{video_name}_en.srt")
# yield {"message": f"Created SRT files", "srt_path": f"{video_name}.srt", "srt_en_path": f"{video_name}.en.srt"}
# print(f"Transcribe successful!")
except Exception as e:
print(f"Could not transcribe file: {e}")
return
def caption_generator(social_media_url,uid, language="Autodetect", model_size=model_size):
with tempfile.TemporaryDirectory() as tempdir:
tempdir = Path(tempdir)
# try:
# print(f"Downloading {social_media_url} ")
# meta = check_download(social_media_url)
# print(f"Downloaded {meta['id']}.mp3 from {meta['uploader_id']} and url {meta['webpage_url']}")
# except Exception as e:
# print(f"Could not download file: {e}")
# raise
try:
print(f"Starting audio only download with URL {social_media_url}, this may take a while")
meta, audio = download_audio(social_media_url, tempdir, id=uid)
print(f"Downloaded video and extracted audio")
except Exception as e:
print(f"Could not download file: {e}")
raise
# Run whisper on the audio with language unless auto
try:
print(f"Starting whisper transcribe with {uid}.mp3")
transcribe_whisper_result = transcribe(audio, translate_action=False, language=language, override_model_size=model_size)
detected_language = LANGUAGES[transcribe_whisper_result["language"]]
print(f"Transcribe successful!, writing files")
vtt_path = tempdir / f"{transcribe_whisper_result['language']}.vtt"
with open(vtt_path.resolve(), "w", encoding="utf-8") as vtt:
write_vtt(transcribe_whisper_result["segments"], file=vtt)
whisper_result_captions = [
{
"language_tag": transcribe_whisper_result["language"],
"vtt_text": vtt_path.read_text(encoding="utf-8"),
},
]
if detected_language != "en":
print(f"Transcribe successful! Starting translation to English")
translate_whisper_result = transcribe(audio, translate_action=True, language=detected_language, override_model_size=model_size)
en_vtt_path = tempdir / f"en.vtt"
with open(en_vtt_path.resolve(), "w", encoding="utf-8") as en_vtt:
write_vtt(translate_whisper_result["segments"], file=en_vtt)
print(f"Finished translation to English, preparing subtitle files")
whisper_result_captions.append(
{
"language_tag": "en",
"vtt_text": en_vtt_path.read_text(encoding="utf-8"),
}
)
except Exception as e:
print(f"Could not transcribe file: {e}")
raise
print(f"Finished processing {uid} file, returning results")
print(whisper_result_captions)
return 'success', whisper_result_captions, detected_language
# Run whisper with translation task enabled (and save to different srt file)
# Call anvil background task with both files, and both the plain texts
def progress_hook(d):
if d['status'] == 'downloading':
print("downloading " + str(round(float(d['downloaded_bytes']) / float(d['total_bytes']) * 100, 1)) + "%")
yield f"{d['_percent_str']} downloaded"
if d['status'] == 'finished':
filename = d['filename']
print(filename)
yield f"Downloaded {filename}"
def download(url, tempdir, format="bestvideo[ext=mp4]+bestaudio/best", verbose=False, keepVideo=True, filename="%(id)s.%(ext)s"):
try:
ydl_opts = {
"format": format,
"keepvideo": keepVideo,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
"skip_download": False,
"outtmpl": f"{tempdir}/{filename}",
"noplaylist": True,
"verbose": verbose,
"quiet": False,
"progress_hooks": [progress_hook],
}
ydl = YoutubeDL(ydl_opts)
meta = ydl.extract_info(
url,
download=True,
)
except DownloadError as e:
raise e
else:
audio = tempdir / f"{meta['id']}.mp3"
if (keepVideo):
video = tempdir / f"{meta['id']}.{meta['ext']}"
return meta, str(video.resolve()), str(audio.resolve())
else:
return meta, None, str(audio.resolve())
def download_audio(url, tempdir, format="bestaudio/best", verbose=False, id=None):
filename = f"{id}.%(ext)s"
try:
ydl_opts = {
"format": format,
"keepvideo": False,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
"skip_download": False,
"outtmpl": f"{tempdir}/{filename}",
"noplaylist": True,
"verbose": verbose,
"quiet": False,
"progress_hooks": [progress_hook],
}
ydl = YoutubeDL(ydl_opts)
meta = ydl.extract_info(
url,
download=True,
)
except DownloadError as e:
raise e
else:
audio = tempdir / f"{id}.mp3"
return meta, str(audio.resolve())
def check_download(url):
ydl_opts = {
"format": "bestvideo[ext=mp4]+bestaudio/best",
"skip_download": True,
"verbose": False,
}
ydl = YoutubeDL(ydl_opts)
try:
meta = ydl.extract_info(
url,
download=False,
)
except DownloadError as e:
raise e
else:
return meta
def transcribe(audio, translate_action=True, language='Autodetect', override_model_size=''):
"""
Transcribe audio file with whisper
:param audio: - The audio file to transcribe
:param translate_action: Bool - Whether to translate to English or keep original language
:param language: String - The language to transcribe to, default is Autodetect
:param override_model_size: Bool - Whether to override the model size
:return:
"""
task = "translate" if translate_action else "transcribe"
model_size_to_load = override_model_size if override_model_size else model_size
print(f'Starting {task} with whisper size {model_size_to_load} on {audio}')
global model
if not preload_model or model_size != override_model_size:
model = whisper.load_model(model_size_to_load)
props = {
"task": task,
}
if language != 'Autodetect':
props["language"] = TO_LANGUAGE_CODE[language.lower()] if len(language) > 2 else language
output = model.transcribe(audio, verbose=True, **props)
output['segments'] = output['segments']
output['requested_language'] = language.lower()
print(f'Finished transcribe from {LANGUAGES[output["language"]].capitalize()}', output["text"])
return output