asahi417's picture
Update app.py
4dcbad1 verified
import os
import time
import tempfile
from math import floor
from typing import Optional, List, Dict, Any
import torch
import gradio as gr
import yt_dlp as youtube_dl
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
# configuration
MODEL_NAME = "kotoba-tech/kotoba-whisper-v1.1"
BATCH_SIZE = 16
CHUNK_LENGTH_S = 15
FILE_LIMIT_MB = 1000
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files
# device setting
if torch.cuda.is_available():
torch_dtype = torch.bfloat16
device = "cuda:0"
model_kwargs = {'attn_implementation': 'sdpa'}
else:
torch_dtype = torch.float32
device = "cpu"
model_kwargs = {}
# define the pipeline
pipe = pipeline(
model=MODEL_NAME,
chunk_length_s=CHUNK_LENGTH_S,
batch_size=BATCH_SIZE,
torch_dtype=torch_dtype,
device=device,
model_kwargs=model_kwargs,
trust_remote_code=True
)
def format_time(start: Optional[float], end: Optional[float]):
def _format_time(seconds: Optional[float]):
if seconds is None:
return "complete "
minutes = floor(seconds / 60)
hours = floor(seconds / 3600)
seconds = seconds - hours * 3600 - minutes * 60
m_seconds = floor(round(seconds - floor(seconds), 3) * 10 ** 3)
seconds = floor(seconds)
return f'{hours:02}:{minutes:02}:{seconds:02}.{m_seconds:03}'
return f"[{_format_time(start)}-> {_format_time(end)}]:"
def get_prediction(inputs, prompt: Optional[str]):
generate_kwargs = {"language": "japanese", "task": "transcribe"}
if prompt:
generate_kwargs['prompt_ids'] = pipe.tokenizer.get_prompt_ids(prompt, return_tensors='pt').to(device)
prediction = pipe(inputs, return_timestamps=True, generate_kwargs=generate_kwargs)
text = "".join([c['text'] for c in prediction['chunks']])
text_timestamped = "\n".join([
f"{format_time(*c['timestamp'])} {c['text']}" for c in prediction['chunks']
])
return text, text_timestamped
def transcribe(inputs: str, prompt):
if inputs is None:
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
with open(inputs, "rb") as f:
inputs = f.read()
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
return get_prediction(inputs, prompt)
def _return_yt_html_embed(yt_url):
video_id = yt_url.split("?v=")[-1]
return f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe> </center>'
def download_yt_audio(yt_url, filename):
info_loader = youtube_dl.YoutubeDL()
try:
info = info_loader.extract_info(yt_url, download=False)
except youtube_dl.utils.DownloadError as err:
raise gr.Error(str(err))
file_length = info["duration_string"]
file_h_m_s = file_length.split(":")
file_h_m_s = [int(sub_length) for sub_length in file_h_m_s]
if len(file_h_m_s) == 1:
file_h_m_s.insert(0, 0)
if len(file_h_m_s) == 2:
file_h_m_s.insert(0, 0)
file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2]
if file_length_s > YT_LENGTH_LIMIT_S:
yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S))
file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s))
raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.")
ydl_opts = {"outtmpl": filename, "format": "worstvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"}
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([yt_url])
except youtube_dl.utils.ExtractorError as err:
raise gr.Error(str(err))
def yt_transcribe(yt_url, prompt):
html_embed_str = _return_yt_html_embed(yt_url)
with tempfile.TemporaryDirectory() as tmpdirname:
filepath = os.path.join(tmpdirname, "video.mp4")
download_yt_audio(yt_url, filepath)
with open(filepath, "rb") as f:
inputs = f.read()
inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate)
inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate}
text, text_timestamped = get_prediction(inputs, prompt)
return html_embed_str, text, text_timestamped
demo = gr.Blocks()
mf_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.inputs.Audio(source="microphone", type="filepath", optional=True),
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
],
outputs=["text", "text"],
layout="horizontal",
theme="huggingface",
title=f"Transcribe Audio with {os.path.basename(MODEL_NAME)}",
description=f"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses the Kotoba-Whisper checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and πŸ€— Transformers to transcribe audio files of arbitrary length.",
allow_flagging="never",
)
file_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.inputs.Audio(source="upload", type="filepath", optional=True, label="Audio file"),
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
],
outputs=["text", "text"],
layout="horizontal",
theme="huggingface",
title=f"Transcribe Audio with {os.path.basename(MODEL_NAME)}",
description=f"Transcribe long-form microphone or audio inputs with the click of a button! Demo uses Kotoba-Whisper checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and πŸ€— Transformers to transcribe audio files of arbitrary length.",
allow_flagging="never",
)
yt_transcribe = gr.Interface(
fn=yt_transcribe,
inputs=[
gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"),
gr.inputs.Textbox(lines=1, placeholder="Prompt", optional=True),
],
outputs=["html", "text", "text"],
layout="horizontal",
theme="huggingface",
title=f"Transcribe YouTube with {os.path.basename(MODEL_NAME)}",
description=f"Transcribe long-form YouTube videos with the click of a button! Demo uses Kotoba-Whisper checkpoint [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) and πŸ€— Transformers to transcribe video files of arbitrary length.",
allow_flagging="never",
)
with demo:
gr.TabbedInterface([mf_transcribe, file_transcribe, yt_transcribe], ["Microphone", "Audio file", "YouTube"])
demo.launch(enable_queue=True)