EceMotion_Pictures / utils_audio.py
drvsbrkcn's picture
Upload 3 files
869d082 verified
"""
Audio processing utilities for EceMotion Pictures.
Enhanced text-to-speech generation with robust error handling and fallbacks.
"""
import numpy as np
import logging
import os
from typing import Tuple, Optional, Dict, Any
from config import (
MODEL_AUDIO, MODEL_CONFIGS, AUDIO_SAMPLE_RATE, get_device, get_safe_model_name
)
logger = logging.getLogger(__name__)
# Global model cache
_tts_pipe = None
_current_tts_model = None
def get_tts_pipe(model_name: str = MODEL_AUDIO, device: str = None):
"""Get or create TTS pipeline with lazy loading and model switching."""
global _tts_pipe, _current_tts_model
if device is None:
device = get_device()
# Use safe model name
safe_model_name = get_safe_model_name(model_name, "audio")
if _tts_pipe is None or _current_tts_model != safe_model_name:
logger.info(f"Loading TTS model: {safe_model_name}")
try:
if "f5-tts" in safe_model_name.lower():
# Try F5-TTS first
_tts_pipe = _load_f5_tts(safe_model_name, device)
else:
# Use standard TTS pipeline
_tts_pipe = _load_standard_tts(safe_model_name, device)
if _tts_pipe is not None:
_current_tts_model = safe_model_name
logger.info(f"TTS model {safe_model_name} loaded successfully")
else:
raise RuntimeError("Failed to load any TTS model")
except Exception as e:
logger.error(f"Failed to load {safe_model_name}: {e}")
# Fallback to original model
_tts_pipe = _load_standard_tts("parler-tts/parler-tts-mini-v1", device)
_current_tts_model = "parler-tts/parler-tts-mini-v1"
return _tts_pipe
def _load_f5_tts(model_name: str, device: str):
"""Load F5-TTS model."""
try:
from transformers import pipeline
pipe = pipeline(
"text-to-speech",
model=model_name,
torch_dtype="auto",
device_map=device if device == "cuda" else None
)
return pipe
except Exception as e:
logger.error(f"Failed to load F5-TTS: {e}")
return None
def _load_standard_tts(model_name: str, device: str):
"""Load standard TTS model."""
try:
from transformers import pipeline
import torch
# Fix device string - convert "auto" to proper device
if device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe = pipeline(
"text-to-speech",
model=model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
)
if device == "cuda":
pipe = pipe.to(device)
return pipe
except Exception as e:
logger.error(f"Failed to load standard TTS: {e}")
return None
def synth_voice(text: str, voice_prompt: str, sr: int = AUDIO_SAMPLE_RATE,
model_name: str = MODEL_AUDIO, device: str = None) -> Tuple[int, np.ndarray]:
"""
Generate speech from text with enhanced TTS support.
"""
if device is None:
device = get_device()
tts = get_tts_pipe(model_name, device)
model_config = MODEL_CONFIGS.get(_current_tts_model, {})
# Validate text length
max_length = model_config.get("max_text_length", 500)
min_length = model_config.get("min_text_length", 10)
if len(text) > max_length:
logger.warning(f"Text too long ({len(text)} chars), truncating to {max_length}")
text = text[:max_length]
elif len(text) < min_length:
logger.warning(f"Text too short ({len(text)} chars), padding")
text = text + " " * (min_length - len(text))
try:
if "f5-tts" in _current_tts_model.lower():
# F5-TTS specific generation
result = tts(
text=text,
voice_preset=voice_prompt,
return_tensors="pt"
)
wav = result["audio"].numpy().flatten()
else:
# Standard pipeline (Parler-TTS, etc.)
result = tts({"text": text, "voice_preset": voice_prompt})
wav = result["audio"]
# Ensure proper format
if hasattr(wav, 'numpy'):
wav = wav.numpy()
elif hasattr(wav, 'detach'):
wav = wav.detach().numpy()
# Normalize audio
wav = normalize_audio(wav)
# Resample if needed
if sr != AUDIO_SAMPLE_RATE:
wav = _resample_audio(wav, AUDIO_SAMPLE_RATE, sr)
logger.info(f"Generated audio: {len(wav)/sr:.2f}s at {sr}Hz")
return sr, wav.astype(np.float32)
except Exception as e:
logger.error(f"Voice synthesis failed: {e}")
# Return fallback audio
return _create_fallback_audio(text, sr)
def _resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
"""Resample audio using available methods."""
try:
import librosa
return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
except ImportError:
# Simple resampling without librosa
ratio = target_sr / orig_sr
new_length = int(len(audio) * ratio)
return np.interp(
np.linspace(0, len(audio), new_length),
np.arange(len(audio)),
audio
)
def _create_fallback_audio(text: str, sr: int) -> Tuple[int, np.ndarray]:
"""Create fallback audio when TTS fails."""
try:
# Create a simple tone based on text length
duration = max(1.0, len(text) / 20.0) # Rough estimate
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
# Generate a simple tone
frequency = 440.0 # A4 note
wav = 0.1 * np.sin(2 * np.pi * frequency * t)
# Add some variation
wav += 0.05 * np.sin(2 * np.pi * frequency * 1.5 * t)
logger.info(f"Created fallback audio: {duration:.2f}s")
return sr, wav.astype(np.float32)
except Exception as e:
logger.error(f"Failed to create fallback audio: {e}")
# Last resort: silence
duration = 2.0
wav = np.zeros(int(sr * duration))
return sr, wav.astype(np.float32)
def normalize_audio(audio: np.ndarray, target_lufs: float = -23.0) -> np.ndarray:
"""Normalize audio to broadcast standards."""
# Simple peak normalization first
if np.max(np.abs(audio)) > 0:
audio = audio / np.max(np.abs(audio)) * 0.95
# Apply gentle compression
audio = apply_compression(audio)
return audio
def apply_compression(audio: np.ndarray, ratio: float = 3.0, threshold: float = 0.7) -> np.ndarray:
"""Apply gentle compression for broadcast quality."""
# Simple soft-knee compression
compressed = np.copy(audio)
# Above threshold, apply compression
above_threshold = np.abs(audio) > threshold
compressed[above_threshold] = np.sign(audio[above_threshold]) * (
threshold + (np.abs(audio[above_threshold]) - threshold) / ratio
)
return compressed
def retro_bed(duration_s: float, sr: int = AUDIO_SAMPLE_RATE, bpm: int = 92):
"""Generate retro synth background music."""
try:
t = np.linspace(0, duration_s, int(sr * duration_s), endpoint=False)
# Chord progression root frequencies (A minor style)
freqs = [220.0, 174.61, 196.0, 146.83]
seg_len = int(len(t) / len(freqs)) if len(freqs) else len(t)
sig = np.zeros_like(t)
for i, f0 in enumerate(freqs):
tri_t = t[i * seg_len:(i + 1) * seg_len]
tri = 2 * np.abs(2 * ((tri_t * f0) % 1) - 1) - 1
sig[i * seg_len:(i + 1) * seg_len] = 0.15 * tri
# Add tape noise
noise = 0.01 * np.random.randn(len(t))
bed = sig + noise
# Apply gentle lowpass filter
try:
from scipy import signal
b, a = signal.butter(3, 3000, 'low', fs=sr)
bed = signal.lfilter(b, a, bed)
except ImportError:
# Simple averaging filter if scipy not available
bed = np.convolve(bed, np.ones(5)/5, mode='same')
return sr, bed.astype(np.float32)
except Exception as e:
logger.error(f"Failed to generate retro bed: {e}")
# Return silence
silence = np.zeros(int(sr * duration_s))
return sr, silence.astype(np.float32)
def mix_to_stereo(sr1, a, sr2, b, bed_gain=0.5):
"""Mix two mono signals to stereo."""
assert sr1 == sr2, "Sample rates must match"
n = max(len(a), len(b))
def pad(x):
if len(x) < n:
if len(x.shape) > 1: # Stereo
padding = np.zeros((n - len(x), x.shape[1]))
else: # Mono
padding = np.zeros(n - len(x))
x = np.concatenate([x, padding])
return x
a = pad(a)
b = pad(b)
left = a + bed_gain * b
right = a * 0.9 + bed_gain * 0.9 * b
if len(left.shape) == 1: # Mono to stereo
stereo = np.stack([left, right], axis=1)
else: # Already stereo
stereo = np.stack([left, right], axis=1)
return sr1, np.clip(stereo, -1.0, 1.0)
def write_wav(path: str, sr: int, wav: np.ndarray):
"""Write audio to WAV file."""
try:
import soundfile as sf
sf.write(path, wav, sr)
except ImportError:
# Fallback using scipy
try:
from scipy.io import wavfile
# Convert to 16-bit
wav_16bit = (wav * 32767).astype(np.int16)
wavfile.write(path, sr, wav_16bit)
except ImportError:
logger.error("No audio writing library available (soundfile or scipy)")
raise RuntimeError("Cannot write audio file - no audio library available")