File size: 7,886 Bytes
5dcd53e 4980f4b 5dcd53e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import gradio as gr
import httpx
import asyncio
from typing import Dict, Any
import os
from config import CONFIG
# API endpoint configuration from config
API_BASE_URL = CONFIG["api"]["base_url"]
API_TIMEOUT = CONFIG["api"]["timeout"]
if API_BASE_URL is None:
raise ValueError("API_BASE_URL is not set")
async def transcribe_audio(audio_file: str, output_format: str = "text") -> Dict[str, Any]:
"""
Transcribe the audio file to text or SRT subtitles.
Args:
audio_file (str): Path to the audio file to transcribe
output_format (str): Output format - "text" for plain text, "srt" for SRT subtitles
Returns:
Dict containing the transcription result
"""
try:
# Determine the endpoint based on output format
if output_format == "srt":
endpoint = f"{API_BASE_URL}{CONFIG['api']['endpoints']['transcribe_srt']}"
else:
endpoint = f"{API_BASE_URL}{CONFIG['api']['endpoints']['transcribe']}"
# Prepare the file for upload
async with httpx.AsyncClient(timeout=API_TIMEOUT) as client:
with open(audio_file, "rb") as f:
files = {"file": (os.path.basename(audio_file), f, "audio/wav")}
response = await client.post(endpoint, files=files)
response.raise_for_status()
if output_format == "srt":
# For SRT format, return the raw text content
return {
"success": True,
"transcription": response.text,
"format": "srt"
}
else:
# For JSON format, parse the response
result = response.json()
# Handle both old format (direct text) and new format (segments array)
transcription_text = ""
if "success" in result and result["success"] is True and "segments" in result and result["segments"]:
# New format with segments array - include timestamps
formatted_segments = []
for segment in result["segments"]:
text = segment.get("text", "")
formatted_segments.append(f"{text}")
transcription_text = "\n".join(formatted_segments)
else:
transcription_text = "No transcription text found in response"
return {
"success": True,
"transcription": transcription_text,
"format": "text",
"metadata": result
}
except httpx.TimeoutException:
return {
"success": False,
"error": "Request timed out. The audio file might be too long or the server is busy.",
"format": output_format
}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
"format": output_format
}
except Exception as e:
return {
"success": False,
"error": f"Unexpected error: {str(e)}",
"format": output_format
}
def transcribe_audio_sync(audio_file: str, output_format: str = "text") -> Dict[str, Any]:
"""
Synchronous wrapper for the async transcribe function.
"""
return asyncio.run(transcribe_audio(audio_file, output_format))
def transcribe_to_text(audio_file: str) -> str:
"""
Transcribe the audio file to plain text.
Args:
audio_file (str): The URL to the audio file.
Returns:
str: Transcribed text
"""
if not audio_file:
return "Please provide an audio file."
result = transcribe_audio_sync(audio_file, "text")
if result["success"]:
return result["transcription"]
else:
return f"Error: {result['error']}"
def transcribe_to_srt(audio_file: str) -> str:
"""
Transcribe the audio file to SRT subtitle format.
Args:
audio_file (str): The URL to the audio file.
Returns:
str: SRT formatted subtitles
"""
if not audio_file:
return "Please provide an audio file."
result = transcribe_audio_sync(audio_file, "srt")
if result["success"]:
return result["transcription"]
else:
return f"Error: {result['error']}"
# Create the Gradio interface
with gr.Blocks(title="Parakeet ASR MCP Server") as demo:
gr.Markdown("""
# 🎙️ Parakeet ASR MCP Server
A Model Context Protocol (MCP) server built with Gradio interfaces with a speech-to-text API, serving the model [Parakeet TDT 0.6B V2](https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2) open-sourced by NVIDIA and hosted on Novita AI (https://novita.ai/templates-library/105929).
This server is free to use and provides high-quality English transcription capabilities, supporting both plain text and SRT subtitle formats.
## MCP Server URL
```
https://viktor-hu-parakeet-asr-mcp-server.hf.space/gradio_api/mcp/sse
```
## Available MCP Tools
- `transcribe_to_text`: Transcribe the audio file to plain text.
- `transcribe_to_srt`: Transcribe the audio file to SRT subtitle format.
## Integration
To add this MCP to clients that support SSE (e.g. Cursor, Windsurf, Cline), simply add the following configuration to your MCP config:
```
{
"mcpServers": {
"parakeet-asr": {
"url": "https://viktor-hu-parakeet-asr-mcp-server.hf.space/gradio_api/mcp/sse"
}
}
}
```
""")
with gr.Tab("Transcribe to text"):
with gr.Row():
with gr.Column():
audio_input_text = gr.Audio(
label="Upload Audio File",
type="filepath",
sources=["upload", "microphone"]
)
transcribe_text_btn = gr.Button("Transcribe to Text", variant="primary")
with gr.Column():
text_output = gr.Textbox(
label="Transcription Result",
lines=10,
placeholder="Transcribed text will appear here..."
)
transcribe_text_btn.click(
fn=transcribe_to_text,
inputs=[audio_input_text],
outputs=[text_output]
)
with gr.Tab("Transcribe to SRT Subtitles"):
with gr.Row():
with gr.Column():
audio_input_srt = gr.Audio(
label="Upload Audio File",
type="filepath",
sources=["upload", "microphone"]
)
transcribe_srt_btn = gr.Button("Transcribe to SRT", variant="primary")
with gr.Column():
srt_output = gr.Textbox(
label="SRT Subtitles",
lines=15,
placeholder="SRT formatted subtitles will appear here..."
)
transcribe_srt_btn.click(
fn=transcribe_to_srt,
inputs=[audio_input_srt],
outputs=[srt_output]
)
if __name__ == "__main__":
# Launch with MCP server enabled
try:
demo.launch(
mcp_server=True,
share=False,
server_name=CONFIG["server"]["host"],
server_port=CONFIG["server"]["port"],
)
except Exception as e:
print(f"Error launching server: {e}")
|