TaiYouWeb's picture
Update app.py
0012aa9 verified
raw
history blame
18.9 kB
import torch
import gradio as gr
import yt_dlp as youtube_dl
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, WhisperTokenizer, pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
import tempfile
import os
import time
import requests
from playwright.sync_api import sync_playwright
from languages import get_language_names
from subtitle import text_output, subtitle_output
import datetime
import psutil
import subprocess
from gpustat import GPUStatCollection
import cpuinfo
try:
import spaces
USING_SPACES = True
except ImportError:
USING_SPACES = False
subprocess.run(
"pip install flash-attn --no-build-isolation",
env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"},
shell=True,
)
os.system("playwright install")
YT_LENGTH_LIMIT_S = 360
SPACES_GPU_DURATION = 90
device = 0 if torch.cuda.is_available() else "cpu"
def gpu_decorator(duration=60):
def actual_decorator(func):
if USING_SPACES:
return spaces.GPU(duration=duration)(func)
return func
return actual_decorator
def device_info():
try:
subprocess.run(["df", "-h"], check=True)
subprocess.run(["lsblk"], check=True)
subprocess.run(["free", "-h"], check=True)
subprocess.run(["lscpu"], check=True)
subprocess.run(["nvidia-smi"], check=True)
except subprocess.CalledProcessError as e:
print(f"Command failed: {e}")
def update_gpu_status():
if torch.cuda.is_available() == False:
return "No Nvidia Device"
try:
gpu_stats = GPUStatCollection.new_query()
for gpu in gpu_stats:
# Assuming you want to monitor the first GPU, index 0
gpu_id = gpu.index
gpu_name = gpu.name
gpu_utilization = gpu.utilization
memory_used = gpu.memory_used
memory_total = gpu.memory_total
memory_utilization = (memory_used / memory_total) * 100
gpu_status=(f"**GPU Name** {gpu_id}: {gpu_name}\nUtilization: {gpu_utilization}%\n**Memory Used**: {memory_used}MB\n**Memory Total**: {memory_total}MB\n**Memory Utilization**: {memory_utilization:.2f}%\n")
return gpu_status
except Exception as e:
return torch_update_gpu_status()
def torch_update_gpu_status():
if torch.cuda.is_available():
gpu_info = torch.cuda.get_device_name(0)
gpu_memory = torch.cuda.mem_get_info(0)
total_memory = gpu_memory[1] / (1024 * 1024 * 1024)
free_memory=gpu_memory[0] /(1024 *1024 * 1024)
used_memory = (gpu_memory[1] - gpu_memory[0]) / (1024 * 1024 * 1024)
gpu_status = f"**GPU Name**: {gpu_info}\n**Free Memory**: {free_memory:.2f}GB\n**Total Memory**: {total_memory:.2f} GB\n**Used Memory**: {used_memory:.2f} GB\n"
else:
gpu_status = "No GPU available"
return gpu_status
def update_cpu_status():
current_time = datetime.datetime.utcnow()
time_str = current_time.strftime("%Y-%m-%d %H:%M:%S")
cpu_percent = psutil.cpu_percent()
cpu_freq = psutil.cpu_freq()
cpu_count = psutil.cpu_count(logical=True)
cpu_name = cpuinfo.get_cpu_info().get("brand_raw", "Unknown CPU")
virtual_memory = psutil.virtual_memory()
cpu_status = f"**{time_str} (UTC+0)**\n\n"
cpu_status += f"**CPU Name**: {cpu_name}\n"
cpu_status += f"**CPU Usage**: {cpu_percent}%\n"
cpu_status += f"**CPU Frequency**: *Current*: {cpu_freq.current:.2f}MHz, *Max*: {cpu_freq.max:.2f}MHz, *Min*: {cpu_freq.min:.2f}MHz\n"
cpu_status += f"**CPU Cores**: {cpu_count}\n"
cpu_status += f"**Virtual Memory**: *Total*: {(virtual_memory.total / (1024 * 1024 * 1024)):.2f}GB, *Available*: {(virtual_memory.available / (1024 * 1024 * 1024)):.2f}GB, *Used*: {(virtual_memory.used / (1024 * 1024 * 1024)):.2f}GB, *Percentage*: {virtual_memory.percent}%\n\n"
return cpu_status
def update_status():
gpu_status = update_gpu_status()
cpu_status = update_cpu_status()
sys_status=cpu_status+gpu_status
return sys_status
def refresh_status():
return update_status()
@gpu_decorator(duration=SPACES_GPU_DURATION)
def transcribe(inputs, model, language, batch_size, chunk_length_s, stride_length_s, task, timestamp_mode, progress=gr.Progress(track_tqdm=True)):
try:
if inputs is None:
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
torch_dtype = torch.float16
model_gen = AutoModelForSpeechSeq2Seq.from_pretrained(
model, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model_gen.to(device)
processor = AutoProcessor.from_pretrained(model)
tokenizer = WhisperTokenizer.from_pretrained(model)
pipe = pipeline(
task="automatic-speech-recognition",
model=model_gen,
chunk_length_s=chunk_length_s,
stride_length_s=stride_length_s,
tokenizer=tokenizer,
feature_extractor=processor.feature_extractor,
torch_dtype=torch_dtype,
model_kwargs={"attn_implementation": "flash_attention_2"},
device=device,
)
generate_kwargs = {}
if language != "Automatic Detection" and model.endswith(".en") == False:
generate_kwargs["language"] = language
if model.endswith(".en") == False:
generate_kwargs["task"] = task
output = pipe(inputs, batch_size=batch_size, generate_kwargs=generate_kwargs, return_timestamps=timestamp_mode)
print(output)
print({"inputs": inputs, "model": model, "language": language, "batch_size": batch_size, "chunk_length_s": chunk_length_s, "stride_length_s": stride_length_s, "task": task, "timestamp_mode": timestamp_mode})
if not timestamp_mode:
text = output['text']
return text_output(inputs, text)
else:
chunks = output['chunks']
return subtitle_output(inputs, chunks)
except Exception as e:
error_message = str(e)
raise gr.Error(error_message, duration=20)
def _download_yt_audio(yt_url, filename):
info_loader = youtube_dl.YoutubeDL()
try:
info = info_loader.extract_info(yt_url, download=False)
except youtube_dl.utils.DownloadError as err:
raise gr.Error(str(err))
file_length = info.get("duration_string")
if not file_length:
raise gr.Error("Video duration is unavailable.")
file_h_m_s = file_length.split(":")
file_h_m_s = [int(sub_length) for sub_length in file_h_m_s]
if len(file_h_m_s) == 1:
file_h_m_s.insert(0, 0)
if len(file_h_m_s) == 2:
file_h_m_s.insert(0, 0)
file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2]
if file_length_s > YT_LENGTH_LIMIT_S:
yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S))
file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s))
raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.", duration=20)
try:
ydl_opts = {
"outtmpl": filename,
"format": "bestaudio[ext=m4a]/best",
}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download([yt_url])
except youtube_dl.utils.ExtractorError as err:
available_formats = info_loader.extract_info(yt_url, download=False)['formats']
raise gr.Error(f"Requested format not available. Available formats: {available_formats}", duration=20)
def _return_yt_video_id(yt_url):
if "youtube.com/watch?v=" in yt_url:
video_id = yt_url.split("?v=")[1].split("&")[0]
elif "youtu.be/" in yt_url:
video_id = yt_url.split("youtu.be/")[1].split("?")[0]
return video_id
def _return_yt_html_embed(yt_url):
video_id = _return_yt_video_id(yt_url)
HTML_str = (
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>'
" </center>"
)
return HTML_str
def _return_yt_thumbnail(yt_url):
video_id = _return_yt_video_id(yt_url)
if not video_id:
raise ValueError("Invalid YouTube URL: Unable to extract video ID.")
thumbnail_url = f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg"
thumbnail_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
response = requests.get(thumbnail_url)
if response.status_code == 200:
temp_file.write(response.content)
thumbnail_path = temp_file.name
else:
raise Exception(f"Failed to retrieve thumbnail. Status code: {response.status_code}")
except Exception as e:
print(f"Error occurred: {e}")
return None
return thumbnail_path
def _return_yt_info(yt_url):
video_id = _return_yt_video_id(yt_url)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(yt_url)
page.wait_for_load_state("networkidle")
title = page.title()
description = page.query_selector("meta[name='description']").get_attribute("content")
keywords = page.query_selector("meta[name='keywords']").get_attribute("content")
gr_title = gr.Textbox(label="YouTube Title", visible=True, value=title)
gr_description = gr.Textbox(label="YouTube Description", visible=True, value=description)
gr_keywords = gr.Textbox(label="YouTube Keywords", visible=True, value=keywords)
browser.close()
return gr_title, gr_description, gr_keywords
except Exception as e:
print(e)
return gr.Textbox(visible=False), gr.Textbox(visible=False), gr.Textbox(visible=False)
def return_youtube(yt_url):
html_embed_str = _return_yt_html_embed(yt_url)
thumbnail = _return_yt_thumbnail(yt_url)
gr_html = gr.HTML(label="Youtube Video", visible=True, value=html_embed_str)
gr_thumbnail = gr.Image(label="Youtube Thumbnail", visible=True, value=thumbnail)
gr_title, gr_description, gr_keywords = _return_yt_info(yt_url)
return gr_html, gr_thumbnail, gr_title, gr_description, gr_keywords
@gpu_decorator(duration=SPACES_GPU_DURATION)
def yt_transcribe(yt_url, model, language, batch_size, chunk_length_s, stride_length_s, task, timestamp_mode):
gr_html, gr_thumbnail, gr_title, gr_description, gr_keywords = return_youtube(yt_url)
try:
with tempfile.TemporaryDirectory() as tmpdirname:
filepath = os.path.join(tmpdirname, "video.mp4")
_download_yt_audio(yt_url, filepath)
with open(filepath, "rb") as f:
inputs = f.read()
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
torch_dtype = torch.float16
model_gen = AutoModelForSpeechSeq2Seq.from_pretrained(
model, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model_gen.to(device)
processor = AutoProcessor.from_pretrained(model)
tokenizer = WhisperTokenizer.from_pretrained(model)
pipe = pipeline(
task="automatic-speech-recognition",
model=model_gen,
chunk_length_s=chunk_length_s,
stride_length_s=stride_length_s,
tokenizer=tokenizer,
feature_extractor=processor.feature_extractor,
torch_dtype=torch_dtype,
model_kwargs={"attn_implementation": "flash_attention_2"},
device=device,
)
generate_kwargs = {}
if language != "Automatic Detection" and model.endswith(".en") == False:
generate_kwargs["language"] = language
if model.endswith(".en") == False:
generate_kwargs["task"] = task
output = pipe(inputs, batch_size=batch_size, generate_kwargs=generate_kwargs, return_timestamps=timestamp_mode)
print(output)
print({"inputs": yt_url, "model": model, "language": language, "batch_size": batch_size, "chunk_length_s": chunk_length_s, "stride_length_s": stride_length_s, "task": task, "timestamp_mode": timestamp_mode})
if not timestamp_mode:
text = output['text']
subtitle, files = text_output(inputs, text)
else:
chunks = output['chunks']
subtitle, files = subtitle_output(inputs, chunks)
return subtitle, files, gr_title, gr_html, gr_thumbnail, gr_description, gr_keywords
except Exception as e:
error_message = str(e)
gr.Warning(error_message, duration=20)
return gr.Textbox(visible=False),gr.Textbox(visible=False), gr_title, gr_html, gr_thumbnail, gr_description, gr_keywords
demo = gr.Blocks()
file_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources=['upload', 'microphone'], type="filepath", label="Audio file"),
gr.Dropdown(
choices=[
"openai/whisper-tiny",
"openai/whisper-base",
"openai/whisper-small",
"openai/whisper-medium",
"openai/whisper-large",
"openai/whisper-large-v1",
"openai/whisper-large-v2", "distil-whisper/distil-large-v2",
"openai/whisper-large-v3", "openai/whisper-large-v3-turbo", "distil-whisper/distil-large-v3", "xaviviro/whisper-large-v3-catalan-finetuned-v2",
],
value="openai/whisper-large-v3-turbo",
label="Model Name",
allow_custom_value=True,
),
gr.Dropdown(choices=["Automatic Detection"] + sorted(get_language_names()), value="Automatic Detection", label="Language", interactive = True,),
gr.Slider(label="Batch Size", minimum=1, maximum=32, value=16, step=1),
gr.Slider(label="Chunk Length (s)", minimum=1, maximum=60, value=17.5, step=0.1),
gr.Slider(label="Stride Length (s)", minimum=1, maximum=30, value=1, step=0.1),
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"),
gr.Dropdown(
choices=[True, False, "word"],
value=True,
label="Timestamp Mode"
),
],
outputs=[gr.Textbox(label="Output"), gr.File(label="Download Files")],
title="Whisper: Transcribe Audio",
flagging_mode="auto",
)
video_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Video(sources=["upload", "webcam"], label="Video file", show_label=False, show_download_button=False, show_share_button=False, streaming=True),
gr.Dropdown(
choices=[
"openai/whisper-tiny",
"openai/whisper-base",
"openai/whisper-small",
"openai/whisper-medium",
"openai/whisper-large",
"openai/whisper-large-v1",
"openai/whisper-large-v2", "distil-whisper/distil-large-v2",
"openai/whisper-large-v3", "openai/whisper-large-v3-turbo", "distil-whisper/distil-large-v3", "xaviviro/whisper-large-v3-catalan-finetuned-v2",
],
value="openai/whisper-large-v3-turbo",
label="Model Name",
allow_custom_value=True,
),
gr.Dropdown(choices=["Automatic Detection"] + sorted(get_language_names()), value="Automatic Detection", label="Language", interactive = True,),
gr.Slider(label="Batch Size", minimum=1, maximum=32, value=16, step=1),
gr.Slider(label="Chunk Length (s)", minimum=1, maximum=60, value=17.5, step=0.1),
gr.Slider(label="Stride Length (s)", minimum=1, maximum=30, value=1, step=0.1),
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"),
gr.Dropdown(
choices=[True, False, "word"],
value=True,
label="Timestamp Mode"
),
],
outputs=[gr.Textbox(label="Output"), gr.File(label="Download Files")],
title="Whisper: Transcribe Video",
flagging_mode="auto",
)
yt_transcribe = gr.Interface(
fn=yt_transcribe,
inputs=[
gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"),
gr.Dropdown(
choices=[
"openai/whisper-tiny",
"openai/whisper-base",
"openai/whisper-small",
"openai/whisper-medium",
"openai/whisper-large",
"openai/whisper-large-v1",
"openai/whisper-large-v2", "distil-whisper/distil-large-v2",
"openai/whisper-large-v3", "openai/whisper-large-v3-turbo", "distil-whisper/distil-large-v3", "xaviviro/whisper-large-v3-catalan-finetuned-v2",
],
value="openai/whisper-large-v3-turbo",
label="Model Name",
allow_custom_value=True,
),
gr.Dropdown(choices=["Automatic Detection"] + sorted(get_language_names()), value="Automatic Detection", label="Language", interactive = True,),
gr.Slider(label="Batch Size", minimum=1, maximum=32, value=16, step=1),
gr.Slider(label="Chunk Length (s)", minimum=1, maximum=60, value=17.5, step=0.1),
gr.Slider(label="Stride Length (s)", minimum=1, maximum=30, value=1, step=0.1),
gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"),
gr.Dropdown(
choices=[True, False, "word"],
value=True,
label="Timestamp Mode"
),
],
outputs=[
gr.Textbox(label="Output"),
gr.File(label="Download Files"),
gr.Textbox(label="Youtube Title"),
gr.HTML(label="Youtube Video"),
gr.Image(label="Youtube Thumbnail"),
gr.Textbox(label="Youtube Description"),
gr.Textbox(label="Youtube Keywords"),
],
title="Whisper: Transcribe YouTube",
flagging_mode="auto",
)
with demo:
gr.TabbedInterface(
interface_list=[file_transcribe, video_transcribe, yt_transcribe],
tab_names=["Audio", "Video", "YouTube"]
)
with gr.Group():
sys_status_output = gr.Markdown(value=refresh_status, label="System Status", container=True, line_breaks=True, show_copy_button=True, every=30)
refresh_button = gr.Button("Refresh System Status")
refresh_button.click(refresh_status, None, sys_status_output)
sys_status_output.value = refresh_status()
if __name__ == "__main__":
demo.queue().launch(ssr_mode=False)