Spaces:
Runtime error
Runtime error
import whisper | |
import gradio as gr | |
import ffmpeg | |
from yt_dlp import YoutubeDL | |
import os | |
import sys | |
from subprocess import PIPE, run | |
youtube_livestream_codes = [ | |
91, | |
92, | |
93, | |
94, | |
95, | |
96, | |
300, | |
301, | |
] | |
youtube_mp4_codes = [ | |
298, | |
18, | |
22, | |
140, | |
133, | |
134 | |
] | |
def second_to_timecode(x: float) -> str: | |
hour, x = divmod(x, 3600) | |
minute, x = divmod(x, 60) | |
second, x = divmod(x, 1) | |
millisecond = int(x * 1000.) | |
return '%.2d:%.2d:%.2d,%.3d' % (hour, minute, second, millisecond) | |
def get_video_metadata(video_url: str = "https://www.youtube.com/watch?v=21X5lGlDOfg&ab_channel=NASA")-> dict: | |
with YoutubeDL({'outtmpl': '%(id)s.%(ext)s'}) as ydl: | |
info_dict = ydl.extract_info(video_url, download=False) | |
video_title = info_dict.get('title', None) | |
uploader_id = info_dict.get('uploader_id', None) | |
print(f"[youtube] {video_title}: {uploader_id}") | |
return info_dict | |
def parse_metadata(metadata) -> dict: | |
""" | |
Parse metadata and send to discord. | |
After a video is done recording, | |
it will have both the livestream format and the mp4 format. | |
""" | |
# send metadata to discord | |
formats = metadata.get("formats", []) | |
# filter for ext = mp4 | |
mp4_formats = [f for f in formats if f.get("ext", "") == "mp4"] | |
try: | |
format_ids = [int(f.get("format_id", 0)) for f in mp4_formats] | |
video_entries = sorted(set(format_ids).intersection(youtube_mp4_codes)) | |
is_livestream = True | |
if len(video_entries) > 0: | |
# use video format id over livestream id if available | |
selected_id = video_entries[0] | |
is_livestream = False | |
except Exception as e: | |
print(e) | |
selected_id = mp4_formats[0].get("format_id") | |
is_livestream = False | |
return { | |
"selected_id": selected_id, | |
"is_livestream": is_livestream, | |
} | |
def get_video(url: str, config: dict): | |
""" | |
Get video from start time. | |
""" | |
# result = subprocess.run() | |
# could delay start time by a few seconds to just sync up and capture the full video length | |
# but would need to time how long it takes to fetch the video using youtube-dl and other adjustments and start a bit before | |
filename = config.get("filename", "livestream01.mp4") | |
end = config.get("end", "00:15:00") | |
overlay_file = ffmpeg.input(filename) | |
( | |
ffmpeg | |
.input(url, t=end) | |
.output(filename) | |
.run() | |
) | |
def get_all_files(url: str, end: str = "00:15:00"): | |
metadata = get_video_metadata(url) | |
temp_dict = parse_metadata(metadata) | |
selected_id = temp_dict.get("selected_id", 0) | |
formats = metadata.get("formats", []) | |
selected_format = [f for f in formats if f.get("format_id", "") == str(selected_id)][0] | |
format_url = selected_format.get("url", "") | |
filename = "temp.mp4" | |
get_video(format_url, {"filename": filename, "end": end}) | |
return filename | |
def get_text_from_mp3_whisper(inputType:str, mp3_file: str, url_path: str, taskName: str, srcLanguage: str)->str: | |
# remove the file if it exists | |
if os.path.exists("transcript.srt"): | |
os.remove("transcript.srt") | |
if os.path.exists("temp.mp4"): | |
os.remove("temp.mp4") | |
if os.path.exists("subtitled.mp4"): | |
os.remove("subtitled.mp4") | |
model = whisper.load_model("medium") | |
# options = whisper.DecodingOptions(language="en", without_timestamps=True) | |
options = dict(language=srcLanguage) | |
transcribe_options = dict(task=taskName, **options) | |
# return if url_path is not set, taskName is not set, srcLanguage is not set | |
if inputType == "url": | |
filename = get_all_files(url_path) | |
print("Retrieved the file") | |
result = model.transcribe(filename, **transcribe_options) | |
print("transcribing the file") | |
else: | |
result = model.transcribe(mp3_file, **transcribe_options) | |
# adjust for spacy mode | |
html_text = "" | |
lines = [] | |
for count, segment in enumerate(result.get("segments")): | |
# print(segment) | |
start = segment.get("start") | |
end = segment.get("end") | |
lines.append(f"{count}") | |
lines.append(f"{second_to_timecode(start)} --> {second_to_timecode(end)}") | |
lines.append(segment.get("text", "").strip()) | |
lines.append('') | |
words = '\n'.join(lines) | |
# save to transcript.srt | |
with open("transcript.srt", "w") as f: | |
f.write(words) | |
print("done transcribing") | |
input_file = 'temp.mp4' | |
subtitles_file = 'transcript.srt' | |
output_file = 'subtitled.mp4' | |
try: | |
print("attempt to output file") | |
video = ffmpeg.input(input_file) | |
audio = video.audio | |
ffmpeg.concat(video.filter("subtitles", subtitles_file), audio, v=1, a=1).output(output_file).run() | |
except Exception as e: | |
print("failed to output file") | |
print(e) | |
output_file = "temp.mp4" | |
# return temp.mp4 | |
return result.get("segments"), words, output_file | |
gr.Interface( | |
title = 'Download Video From url and extract text from audio', | |
fn=get_text_from_mp3_whisper, | |
inputs=[ | |
gr.Dropdown(["url", "file"], value="url"), | |
gr.inputs.Audio(type="filepath"), | |
gr.inputs.Textbox(), | |
gr.Dropdown(["translate", "transcribe"], value="translate"), | |
gr.Dropdown(["Japanese", "English"], value="Japanese") | |
], | |
button_text="Go!", | |
button_color="#333333", | |
outputs=[ | |
"json", "text", "file" | |
], | |
live=True).launch() |