vidtranslator / download.py
altryne's picture
Removing vtt_file t0 make better jsons
7b98a4b unverified
raw
history blame contribute delete
No virus
12.4 kB
import shutil
import sys
import time
from pathlib import Path
import anvil.server
import anvil.media
from whisper.utils import write_srt, write_vtt
from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadError
import os
import tempfile
import json
import argparse
import whisper
from whisper.tokenizer import LANGUAGES, TO_LANGUAGE_CODE
import ffmpeg
from utils.subs import bake_subs, get_srt
from utils.utils import get_args
original_dir = os.getcwd()
output_dir = Path('output')
args = get_args()
model_size: str = args.get("model", os.environ.get("WHISPER_MODEL", "large"))
preload_model: bool = args.get("preload")
if preload_model:
print("Preloading model")
model = whisper.load_model(model_size)
def download_generator(url, translate_action=True, source_language='Autodetect', corrected_subtitles=None):
# Step 1 : check if video is available
yield {"message": f"Checking {url} for videos"}
try:
meta = check_download(url)
# print(json.dumps(meta, indent=2))
# if(meta['duration'] > 159) :
# raise Exception("Video is too long, please use videos less than 159 seconds")
yield {"message": f"Found video with {meta['duration']} seconds duration from {meta['extractor']}", "meta": meta}
tempdir = output_dir/f"{meta['id']}"
except Exception as e:
yield {"message": f"{e}"}
return
# Step 2 : Download video and extract audio
try:
# check if we already have the folder and the main files
if(tempdir.is_dir() and (tempdir/f"{meta['id']}.{meta['ext']}").is_file() and (tempdir/f"{meta['id']}.mp3").is_file()):
yield {"message": f"Using cached files"}
video = str((tempdir/f"{meta['id']}.{meta['ext']}").resolve())
audio = str((tempdir/f"{meta['id']}.mp3").resolve())
else:
yield {"message": f"Starting download with URL {url}, this may take a while"}
meta, video, audio = download(url, tempdir)
yield {"message": f"Downloaded video and extracted audio", "video": video, "audio": audio, "meta": meta}
except Exception as e:
os.chdir(original_dir)
yield {"message": f"{e}"}
raise e
srt_path = tempdir / f"{meta['id']}.srt"
vtt_path = tempdir / f"{meta['id']}.vtt"
if not corrected_subtitles:
### Step 3 : Transcribe with whisper
yield {"message": f"[PLEASE WAIT] Starting whisper transcribe with {meta['id']}.mp3"}
try:
whisper_result = transcribe(audio, translate_action, source_language)
with open(srt_path, "w", encoding="utf-8") as srt:
write_srt(whisper_result["segments"], file=srt)
with open(vtt_path, "w", encoding="utf-8") as vtt:
write_vtt(whisper_result["segments"], file=vtt)
whisper_result["srt"] = Path(srt_path).read_text()
whisper_result["vtt"] = Path(vtt_path).read_text()
yield {"message": f"Transcribe successful", "whisper_result": whisper_result, "meta": meta, "srt_path": srt_path, "vtt_path": vtt_path}
except Exception as e:
os.chdir(original_dir)
yield {"message": f"{e}"}
raise e
else:
### step 3.5 : use corrected subtitles
yield {"message": f"Using corrected subtitles"}
with open(srt_path, "w", encoding="utf-8") as srt:
srt.write(corrected_subtitles)
yield {"message": f"Transcribe successful", "srt_path": srt_path, "meta": meta}
### Step 4 : Bake subtitles into video with ffmpeg
yield {"message": f"[PLEASE WAIT] baking subtitles into video"}
try:
print('Stating to bake subtitles')
subbed_video_path = tempdir / f"{meta['id']}_translated.mp4"
fontsdir = Path('fonts')
bake_subs(video, subbed_video_path.absolute() , srt_path.absolute(), fontsdir, translate_action)
yield {"message": f"Subtitled video ready!", "sub_video": str(subbed_video_path.absolute()), "meta": meta, "vtt_path": vtt_path}
except ffmpeg.Error as e:
print('stdout:', e.stdout.decode('utf8'))
print('stderr:', e.stderr.decode('utf8'))
raise e
except Exception as e:
print('stdout:', e.stdout.decode('utf8'))
print('stderr:', e.stderr.decode('utf8'))
os.chdir(original_dir)
print('error', file=sys.stderr)
raise e
yield {"message": f"{e}"}
def user_uploaded_video_generator(video, translate_action=True, source_language='Autodetect', corrected_subtitles=None):
video_name = Path(video).stem
# create tempdir
tempdir = output_dir / video_name
tempdir.mkdir(parents=True, exist_ok=True)
# copy video with shutil.copy2
video_path = tempdir / Path(video).name
shutil.copy2(video, video_path)
yield {"message": f"Extracting audio from {video_name}", "video": video_path}
# TODO : extract audio from videos
output_audio = tempdir / f"{video_name}.mp3"
ffmpeg.input(video_path).output(filename=output_audio).run()
yield {"message": f"Got audio from {video_name}", "video": video, "audio": output_audio}
# Run whisper on the audio with language unless auto
try:
audio_file = output_audio
print(f"Starting whisper transcribe with {output_audio}")
transcribe_whisper_result = transcribe(audio_file, translate_action=False, language='Autodetect', override_model_size=model_size)
yield {"message": f"Finished transcription, starting translation to {transcribe_whisper_result['language']}"}
detected_language = LANGUAGES[transcribe_whisper_result["language"]]
translate_whisper_result = transcribe(audio_file, translate_action=True, language=detected_language, override_model_size=model_size)
yield {"message": f"Finished translation to English, preparing subtitle files"}
with open(tempdir / f"{video_name}.vtt", "w", encoding="utf-8") as vtt:
write_vtt(transcribe_whisper_result['segments'], file=vtt)
# yield {"message": f"Created VTT files", "vtt_path": f"{video_name}.vtt", "vtt_en_path": f"{video_name}.en.vtt"}
# write_srt(transcribe_whisper_result['segments'], tempdir / f"{video_name}.srt")
# write_srt(translate_whisper_result['segments'], tempdir / f"{video_name}_en.srt")
# yield {"message": f"Created SRT files", "srt_path": f"{video_name}.srt", "srt_en_path": f"{video_name}.en.srt"}
# print(f"Transcribe successful!")
except Exception as e:
print(f"Could not transcribe file: {e}")
return
def caption_generator(social_media_url,uid, language="Autodetect", model_size=model_size):
with tempfile.TemporaryDirectory() as tempdir:
tempdir = Path(tempdir)
# try:
# print(f"Downloading {social_media_url} ")
# meta = check_download(social_media_url)
# print(f"Downloaded {meta['id']}.mp3 from {meta['uploader_id']} and url {meta['webpage_url']}")
# except Exception as e:
# print(f"Could not download file: {e}")
# raise
try:
print(f"Starting audio only download with URL {social_media_url}, this may take a while")
meta, audio = download_audio(social_media_url, tempdir, id=uid)
print(f"Downloaded video and extracted audio")
except Exception as e:
print(f"Could not download file: {e}")
raise
# Run whisper on the audio with language unless auto
try:
print(f"Starting whisper transcribe with {uid}.mp3")
transcribe_whisper_result = transcribe(audio, translate_action=False, language=language, override_model_size=model_size)
detected_language = LANGUAGES[transcribe_whisper_result["language"]]
print(f"Transcribe successful!, writing files")
vtt_path = tempdir / f"{transcribe_whisper_result['language']}.vtt"
with open(vtt_path.resolve(), "w", encoding="utf-8") as vtt:
write_vtt(transcribe_whisper_result["segments"], file=vtt)
whisper_result_captions = [
{
"language_tag": transcribe_whisper_result["language"],
"vtt_text": vtt_path.read_text(encoding="utf-8"),
},
]
if detected_language != "en":
print(f"Transcribe successful! Starting translation to English")
translate_whisper_result = transcribe(audio, translate_action=True, language=detected_language, override_model_size=model_size)
en_vtt_path = tempdir / f"en.vtt"
with open(en_vtt_path.resolve(), "w", encoding="utf-8") as en_vtt:
write_vtt(translate_whisper_result["segments"], file=en_vtt)
print(f"Finished translation to English, preparing subtitle files")
whisper_result_captions.append(
{
"language_tag": "en",
"vtt_text": en_vtt_path.read_text(encoding="utf-8"),
}
)
except Exception as e:
print(f"Could not transcribe file: {e}")
raise
print(f"Finished processing {uid} file, returning results")
print(whisper_result_captions)
return 'success', whisper_result_captions, detected_language
# Run whisper with translation task enabled (and save to different srt file)
# Call anvil background task with both files, and both the plain texts
def progress_hook(d):
if d['status'] == 'downloading':
print("downloading " + str(round(float(d['downloaded_bytes']) / float(d['total_bytes']) * 100, 1)) + "%")
yield f"{d['_percent_str']} downloaded"
if d['status'] == 'finished':
filename = d['filename']
print(filename)
yield f"Downloaded {filename}"
def download(url, tempdir, format="bestvideo[ext=mp4]+bestaudio/best", verbose=False, keepVideo=True, filename="%(id)s.%(ext)s"):
try:
ydl_opts = {
"format": format,
"keepvideo": keepVideo,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
"skip_download": False,
"outtmpl": f"{tempdir}/{filename}",
"noplaylist": True,
"verbose": verbose,
"quiet": False,
"progress_hooks": [progress_hook],
}
ydl = YoutubeDL(ydl_opts)
meta = ydl.extract_info(
url,
download=True,
)
except DownloadError as e:
raise e
else:
audio = tempdir / f"{meta['id']}.mp3"
if (keepVideo):
video = tempdir / f"{meta['id']}.{meta['ext']}"
return meta, str(video.resolve()), str(audio.resolve())
else:
return meta, None, str(audio.resolve())
def download_audio(url, tempdir, format="bestaudio/best", verbose=False, id=None):
filename = f"{id}.%(ext)s"
try:
ydl_opts = {
"format": format,
"keepvideo": False,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
"skip_download": False,
"outtmpl": f"{tempdir}/{filename}",
"noplaylist": True,
"verbose": verbose,
"quiet": False,
"progress_hooks": [progress_hook],
}
ydl = YoutubeDL(ydl_opts)
meta = ydl.extract_info(
url,
download=True,
)
except DownloadError as e:
raise e
else:
audio = tempdir / f"{id}.mp3"
return meta, str(audio.resolve())
def check_download(url):
ydl_opts = {
"format": "bestvideo[ext=mp4]+bestaudio/best",
"skip_download": True,
"verbose": False,
}
ydl = YoutubeDL(ydl_opts)
try:
meta = ydl.extract_info(
url,
download=False,
)
except DownloadError as e:
raise e
else:
return meta
def transcribe(audio, translate_action=True, language='Autodetect', override_model_size=''):
"""
Transcribe audio file with whisper
:param audio: - The audio file to transcribe
:param translate_action: Bool - Whether to translate to English or keep original language
:param language: String - The language to transcribe to, default is Autodetect
:param override_model_size: Bool - Whether to override the model size
:return:
"""
task = "translate" if translate_action else "transcribe"
model_size_to_load = override_model_size if override_model_size else model_size
print(f'Starting {task} with whisper size {model_size_to_load} on {audio}')
global model
if not preload_model or model_size != override_model_size:
model = whisper.load_model(model_size_to_load)
props = {
"task": task,
}
if language != 'Autodetect':
props["language"] = TO_LANGUAGE_CODE[language.lower()] if len(language) > 2 else language
output = model.transcribe(audio, verbose=True, **props)
output['segments'] = output['segments']
output['requested_language'] = language.lower()
print(f'Finished transcribe from {LANGUAGES[output["language"]].capitalize()}', output["text"])
return output