voxtral-studio / core.py
mehdilaalali's picture
fix(core): migrate extraction to PCM-WAV to bypass libmp3lame missing codec and enforce 25s limit on cloning endpoints
e5362b5 verified
import os
import base64
import tempfile
import requests
import subprocess
from pathlib import Path
from mistralai.client import Mistral
# ─── Client ───────────────────────────────────────────────────────────────────
def get_client():
api_key = os.environ.get("MISTRAL_API_KEY")
if not api_key:
raise Exception("MISTRAL_API_KEY secret is not set. Please set it as an environment variable.")
return Mistral(api_key=api_key)
# ─── Utility ──────────────────────────────────────────────────────────────────
def trim_audio_if_needed(audio_path, max_seconds=25):
"""Trims audio to max_seconds using ffmpeg."""
out_path = tempfile.mktemp(suffix=".wav")
try:
subprocess.run(["ffmpeg", "-y", "-i", audio_path, "-t", str(max_seconds), "-c:a", "pcm_s16le", out_path], check=True)
return out_path
except Exception as e:
print(f"Warning: Failed to trim audio, returning original: {e}")
return audio_path
def list_user_voices():
try:
client = get_client()
result = client.audio.voices.list(limit=100, offset=0)
if result.total == 0:
return "No voices found in your account."
out = f"**Total Voices:** {result.total}\n\n"
for voice in result.items:
out += f"- **{voice.name}**\n - ID: `{voice.id}`\n - Languages: {', '.join(voice.languages) if hasattr(voice, 'languages') else 'unknown'}\n"
return out
except Exception as e:
return f"Error fetching voices: {str(e)}"
def get_voice_choices():
try:
client = get_client()
res = client.audio.voices.list(limit=100, offset=0)
# Filter for Official Mistral Voices (Paul, Oliver, Jane, Marie) so we hide randomly cloned user voices
official_names = ("Paul", "Oliver", "Jane", "Marie")
official = []
for v in res.items:
if v.name.startswith(official_names) and " - " in v.name:
official.append((f"{v.name}", v.id))
return official
except:
return []
# ─── STT ──────────────────────────────────────────────────────────────────────
def transcribe_audio(audio_path, language):
"""Convert audio file β†’ text using Voxtral Mini Transcribe."""
if audio_path is None:
raise ValueError("Please provide an audio file path.")
client = get_client()
lang_param = language if language != "Auto-detect" else None
with open(audio_path, "rb") as f:
kwargs = dict(
model="voxtral-mini-latest",
file={"content": f, "file_name": Path(audio_path).name},
)
if lang_param:
kwargs["language"] = lang_param
response = client.audio.transcriptions.complete(**kwargs)
return response.text
# ─── TTS ──────────────────────────────────────────────────────────────────────
def synthesize_speech(text, voice_id=None, ref_audio_path=None, audio_format="mp3"):
"""Convert text β†’ speech using Voxtral Mini TTS."""
if not text.strip():
raise ValueError("Please enter some text.")
client = get_client()
kwargs = dict(
model="voxtral-mini-tts-2603",
input=text,
response_format=audio_format,
)
if voice_id:
kwargs["voice_id"] = voice_id
# Add Reference Audio for Zero-shot tone/voice cloning
if ref_audio_path:
clipped_ref_path = trim_audio_if_needed(ref_audio_path, max_seconds=25)
with open(clipped_ref_path, "rb") as f:
ref_audio_b64 = base64.b64encode(f.read()).decode("utf-8")
kwargs["ref_audio"] = ref_audio_b64
# Cleanup
if clipped_ref_path != ref_audio_path and os.path.exists(clipped_ref_path):
try: os.remove(clipped_ref_path)
except: pass
if not voice_id and not ref_audio_path:
raise ValueError("Mistral API requires a voice! Please provide either a reference audio or a valid Voice ID.")
response = client.audio.speech.complete(**kwargs)
audio_bytes = base64.b64decode(response.audio_data)
# Write to temp file
suffix = f".{audio_format}"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp.write(audio_bytes)
tmp.close()
return tmp.name, len(audio_bytes)
# ─── Voice Cloning ────────────────────────────────────────────────────────────
def clone_voice(audio_path, url_input, voice_name, gender, languages_str):
"""Upload a sample audio or provide a URL to create a reusable cloned voice."""
if not audio_path and not url_input.strip():
raise ValueError("Please upload an audio clip or provide a media URL.")
if not voice_name.strip():
raise KeyError("Please enter a name for the voice.")
final_audio_path = audio_path
# If URL is provided, handle direct links or yt-dlp
if url_input.strip():
url = url_input.strip()
base_out = tempfile.mktemp()
# If it's a direct audio file link, bypass yt-dlp and download it directly
if url.lower().endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a')):
ext = url.split('.')[-1]
final_audio_path = f"{base_out}.{ext}"
with requests.get(url, stream=True, timeout=15) as r:
r.raise_for_status()
with open(final_audio_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# Otherwise use yt-dlp
else:
import yt_dlp
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': base_out + '.%(ext)s',
'quiet': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '128',
}],
'postprocessor_args': [
'-t', '25' # Hard Limit to 25 seconds to bypass API 30s limit
],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
final_audio_path = base_out + '.mp3'
# Ensure any direct MP3 or uploaded file is ALSO strictly trimmed
final_audio_path = trim_audio_if_needed(final_audio_path, max_seconds=25)
client = get_client()
sample_b64 = base64.b64encode(Path(final_audio_path).read_bytes()).decode()
langs = [l.strip() for l in languages_str.split(",") if l.strip()] or ["en"]
voice = client.audio.voices.create(
name=voice_name.strip(),
sample_audio=sample_b64,
sample_filename=Path(final_audio_path).name,
languages=langs,
gender=gender.lower(),
)
# Clean up downloaded file
if url_input.strip() and os.path.exists(final_audio_path):
try: os.remove(final_audio_path)
except: pass
return voice