FriendlyUser's picture
Update app.py
fe8b1e3
raw
history blame
5.55 kB
import whisper
import gradio as gr
import ffmpeg
from yt_dlp import YoutubeDL
import os
import sys
from subprocess import PIPE, run
youtube_livestream_codes = [
91,
92,
93,
94,
95,
96,
300,
301,
]
youtube_mp4_codes = [
298,
18,
22,
140,
133,
134
]
def second_to_timecode(x: float) -> str:
hour, x = divmod(x, 3600)
minute, x = divmod(x, 60)
second, x = divmod(x, 1)
millisecond = int(x * 1000.)
return '%.2d:%.2d:%.2d,%.3d' % (hour, minute, second, millisecond)
def get_video_metadata(video_url: str = "https://www.youtube.com/watch?v=21X5lGlDOfg&ab_channel=NASA")-> dict:
with YoutubeDL({'outtmpl': '%(id)s.%(ext)s'}) as ydl:
info_dict = ydl.extract_info(video_url, download=False)
video_title = info_dict.get('title', None)
uploader_id = info_dict.get('uploader_id', None)
print(f"[youtube] {video_title}: {uploader_id}")
return info_dict
def parse_metadata(metadata) -> dict:
"""
Parse metadata and send to discord.
After a video is done recording,
it will have both the livestream format and the mp4 format.
"""
# send metadata to discord
formats = metadata.get("formats", [])
# filter for ext = mp4
mp4_formats = [f for f in formats if f.get("ext", "") == "mp4"]
try:
format_ids = [int(f.get("format_id", 0)) for f in mp4_formats]
video_entries = sorted(set(format_ids).intersection(youtube_mp4_codes))
is_livestream = True
if len(video_entries) > 0:
# use video format id over livestream id if available
selected_id = video_entries[0]
is_livestream = False
except Exception as e:
print(e)
selected_id = mp4_formats[0].get("format_id")
is_livestream = False
return {
"selected_id": selected_id,
"is_livestream": is_livestream,
}
def get_video(url: str, config: dict):
"""
Get video from start time.
"""
# result = subprocess.run()
# could delay start time by a few seconds to just sync up and capture the full video length
# but would need to time how long it takes to fetch the video using youtube-dl and other adjustments and start a bit before
filename = config.get("filename", "livestream01.mp4")
end = config.get("end", "00:15:00")
overlay_file = ffmpeg.input(filename)
(
ffmpeg
.input(url, t=end)
.output(filename)
.run()
)
def get_all_files(url: str, end: str = "00:15:00"):
metadata = get_video_metadata(url)
temp_dict = parse_metadata(metadata)
selected_id = temp_dict.get("selected_id", 0)
formats = metadata.get("formats", [])
selected_format = [f for f in formats if f.get("format_id", "") == str(selected_id)][0]
format_url = selected_format.get("url", "")
filename = "temp.mp4"
get_video(format_url, {"filename": filename, "end": end})
return filename
def get_text_from_mp3_whisper(inputType:str, mp3_file: str, url_path: str, taskName: str, srcLanguage: str)->str:
# remove the file if it exists
if os.path.exists("transcript.srt"):
os.remove("transcript.srt")
if os.path.exists("temp.mp4"):
os.remove("temp.mp4")
if os.path.exists("subtitled.mp4"):
os.remove("subtitled.mp4")
model = whisper.load_model("medium")
# options = whisper.DecodingOptions(language="en", without_timestamps=True)
options = dict(language=srcLanguage)
transcribe_options = dict(task=taskName, **options)
# return if url_path is not set, taskName is not set, srcLanguage is not set
if inputType == "url":
filename = get_all_files(url_path)
print("Retrieved the file")
result = model.transcribe(filename, **transcribe_options)
print("transcribing the file")
else:
result = model.transcribe(mp3_file, **transcribe_options)
# adjust for spacy mode
html_text = ""
lines = []
for count, segment in enumerate(result.get("segments")):
# print(segment)
start = segment.get("start")
end = segment.get("end")
lines.append(f"{count}")
lines.append(f"{second_to_timecode(start)} --> {second_to_timecode(end)}")
lines.append(segment.get("text", "").strip())
lines.append('')
words = '\n'.join(lines)
# save to transcript.srt
with open("transcript.srt", "w") as f:
f.write(words)
print("done transcribing")
input_file = 'temp.mp4'
subtitles_file = 'transcript.srt'
output_file = 'subtitled.mp4'
try:
print("attempt to output file")
video = ffmpeg.input(input_file)
audio = video.audio
ffmpeg.concat(video.filter("subtitles", subtitles_file), audio, v=1, a=1).output(output_file).run()
except Exception as e:
print("failed to output file")
print(e)
output_file = "temp.mp4"
# return temp.mp4
return result.get("segments"), words, output_file
gr.Interface(
title = 'Download Video From url and extract text from audio',
fn=get_text_from_mp3_whisper,
inputs=[
gr.Dropdown(["url", "file"], value="url"),
gr.inputs.Audio(type="filepath"),
gr.inputs.Textbox(),
gr.Dropdown(["translate", "transcribe"], value="translate"),
gr.Dropdown(["Japanese", "English"], value="Japanese")
],
button_text="Go!",
button_color="#333333",
outputs=[
"json", "text", "file"
],
live=True).launch()