import whisper import gradio as gr import ffmpeg from yt_dlp import YoutubeDL import os import sys from subprocess import PIPE, run youtube_livestream_codes = [ 91, 92, 93, 94, 95, 96, 300, 301, ] youtube_mp4_codes = [ 298, 18, 22, 140, 133, 134 ] def second_to_timecode(x: float) -> str: hour, x = divmod(x, 3600) minute, x = divmod(x, 60) second, x = divmod(x, 1) millisecond = int(x * 1000.) return '%.2d:%.2d:%.2d,%.3d' % (hour, minute, second, millisecond) def get_video_metadata(video_url: str = "https://www.youtube.com/watch?v=21X5lGlDOfg&ab_channel=NASA")-> dict: with YoutubeDL({'outtmpl': '%(id)s.%(ext)s'}) as ydl: info_dict = ydl.extract_info(video_url, download=False) video_title = info_dict.get('title', None) uploader_id = info_dict.get('uploader_id', None) print(f"[youtube] {video_title}: {uploader_id}") return info_dict def parse_metadata(metadata) -> dict: """ Parse metadata and send to discord. After a video is done recording, it will have both the livestream format and the mp4 format. """ # send metadata to discord formats = metadata.get("formats", []) # filter for ext = mp4 mp4_formats = [f for f in formats if f.get("ext", "") == "mp4"] try: format_ids = [int(f.get("format_id", 0)) for f in mp4_formats] video_entries = sorted(set(format_ids).intersection(youtube_mp4_codes)) is_livestream = True if len(video_entries) > 0: # use video format id over livestream id if available selected_id = video_entries[0] is_livestream = False except Exception as e: print(e) selected_id = mp4_formats[0].get("format_id") is_livestream = False return { "selected_id": selected_id, "is_livestream": is_livestream, } def get_video(url: str, config: dict): """ Get video from start time. """ # result = subprocess.run() # could delay start time by a few seconds to just sync up and capture the full video length # but would need to time how long it takes to fetch the video using youtube-dl and other adjustments and start a bit before filename = config.get("filename", "livestream01.mp4") end = config.get("end", "00:15:00") overlay_file = ffmpeg.input(filename) ( ffmpeg .input(url, t=end) .output(filename) .run() ) def get_all_files(url: str, end: str = "00:15:00"): metadata = get_video_metadata(url) temp_dict = parse_metadata(metadata) selected_id = temp_dict.get("selected_id", 0) formats = metadata.get("formats", []) selected_format = [f for f in formats if f.get("format_id", "") == str(selected_id)][0] format_url = selected_format.get("url", "") filename = "temp.mp4" get_video(format_url, {"filename": filename, "end": end}) return filename def get_text_from_mp3_whisper(inputType:str, mp3_file: str, url_path: str, taskName: str, srcLanguage: str)->str: # remove the file if it exists if os.path.exists("transcript.srt"): os.remove("transcript.srt") if os.path.exists("temp.mp4"): os.remove("temp.mp4") if os.path.exists("subtitled.mp4"): os.remove("subtitled.mp4") model = whisper.load_model("medium") # options = whisper.DecodingOptions(language="en", without_timestamps=True) options = dict(language=srcLanguage) transcribe_options = dict(task=taskName, **options) # return if url_path is not set, taskName is not set, srcLanguage is not set if inputType == "url": filename = get_all_files(url_path) print("Retrieved the file") result = model.transcribe(filename, **transcribe_options) print("transcribing the file") else: result = model.transcribe(mp3_file, **transcribe_options) # adjust for spacy mode html_text = "" lines = [] for count, segment in enumerate(result.get("segments")): # print(segment) start = segment.get("start") end = segment.get("end") lines.append(f"{count}") lines.append(f"{second_to_timecode(start)} --> {second_to_timecode(end)}") lines.append(segment.get("text", "").strip()) lines.append('') words = '\n'.join(lines) # save to transcript.srt with open("transcript.srt", "w") as f: f.write(words) print("done transcribing") input_file = 'temp.mp4' subtitles_file = 'transcript.srt' output_file = 'subtitled.mp4' try: print("attempt to output file") video = ffmpeg.input(input_file) audio = video.audio ffmpeg.concat(video.filter("subtitles", subtitles_file), audio, v=1, a=1).output(output_file).run() except Exception as e: print("failed to output file") print(e) output_file = "temp.mp4" # return temp.mp4 return result.get("segments"), words, output_file gr.Interface( title = 'Download Video From url and extract text from audio', fn=get_text_from_mp3_whisper, inputs=[ gr.Dropdown(["url", "file"], value="url"), gr.inputs.Audio(type="filepath"), gr.inputs.Textbox(), gr.Dropdown(["translate", "transcribe"], value="translate"), gr.Dropdown(["Japanese", "English"], value="Japanese") ], button_text="Go!", button_color="#333333", outputs=[ "json", "text", "file" ], live=True).launch()