|
|
""" |
|
|
Audio processing utilities for EceMotion Pictures. |
|
|
Enhanced text-to-speech generation with robust error handling and fallbacks. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import logging |
|
|
import os |
|
|
from typing import Tuple, Optional, Dict, Any |
|
|
|
|
|
from config import ( |
|
|
MODEL_AUDIO, MODEL_CONFIGS, AUDIO_SAMPLE_RATE, get_device, get_safe_model_name |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
_tts_pipe = None |
|
|
_current_tts_model = None |
|
|
|
|
|
def get_tts_pipe(model_name: str = MODEL_AUDIO, device: str = None): |
|
|
"""Get or create TTS pipeline with lazy loading and model switching.""" |
|
|
global _tts_pipe, _current_tts_model |
|
|
|
|
|
if device is None: |
|
|
device = get_device() |
|
|
|
|
|
|
|
|
safe_model_name = get_safe_model_name(model_name, "audio") |
|
|
|
|
|
if _tts_pipe is None or _current_tts_model != safe_model_name: |
|
|
logger.info(f"Loading TTS model: {safe_model_name}") |
|
|
|
|
|
try: |
|
|
if "f5-tts" in safe_model_name.lower(): |
|
|
|
|
|
_tts_pipe = _load_f5_tts(safe_model_name, device) |
|
|
else: |
|
|
|
|
|
_tts_pipe = _load_standard_tts(safe_model_name, device) |
|
|
|
|
|
if _tts_pipe is not None: |
|
|
_current_tts_model = safe_model_name |
|
|
logger.info(f"TTS model {safe_model_name} loaded successfully") |
|
|
else: |
|
|
raise RuntimeError("Failed to load any TTS model") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load {safe_model_name}: {e}") |
|
|
|
|
|
_tts_pipe = _load_standard_tts("parler-tts/parler-tts-mini-v1", device) |
|
|
_current_tts_model = "parler-tts/parler-tts-mini-v1" |
|
|
|
|
|
return _tts_pipe |
|
|
|
|
|
def _load_f5_tts(model_name: str, device: str): |
|
|
"""Load F5-TTS model.""" |
|
|
try: |
|
|
from transformers import pipeline |
|
|
|
|
|
pipe = pipeline( |
|
|
"text-to-speech", |
|
|
model=model_name, |
|
|
torch_dtype="auto", |
|
|
device_map=device if device == "cuda" else None |
|
|
) |
|
|
|
|
|
return pipe |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load F5-TTS: {e}") |
|
|
return None |
|
|
|
|
|
def _load_standard_tts(model_name: str, device: str): |
|
|
"""Load standard TTS model.""" |
|
|
try: |
|
|
from transformers import pipeline |
|
|
import torch |
|
|
|
|
|
|
|
|
if device == "auto": |
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
pipe = pipeline( |
|
|
"text-to-speech", |
|
|
model=model_name, |
|
|
torch_dtype=torch.float16 if device == "cuda" else torch.float32 |
|
|
) |
|
|
|
|
|
if device == "cuda": |
|
|
pipe = pipe.to(device) |
|
|
|
|
|
return pipe |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load standard TTS: {e}") |
|
|
return None |
|
|
|
|
|
def synth_voice(text: str, voice_prompt: str, sr: int = AUDIO_SAMPLE_RATE, |
|
|
model_name: str = MODEL_AUDIO, device: str = None) -> Tuple[int, np.ndarray]: |
|
|
""" |
|
|
Generate speech from text with enhanced TTS support. |
|
|
""" |
|
|
if device is None: |
|
|
device = get_device() |
|
|
|
|
|
tts = get_tts_pipe(model_name, device) |
|
|
model_config = MODEL_CONFIGS.get(_current_tts_model, {}) |
|
|
|
|
|
|
|
|
max_length = model_config.get("max_text_length", 500) |
|
|
min_length = model_config.get("min_text_length", 10) |
|
|
|
|
|
if len(text) > max_length: |
|
|
logger.warning(f"Text too long ({len(text)} chars), truncating to {max_length}") |
|
|
text = text[:max_length] |
|
|
elif len(text) < min_length: |
|
|
logger.warning(f"Text too short ({len(text)} chars), padding") |
|
|
text = text + " " * (min_length - len(text)) |
|
|
|
|
|
try: |
|
|
if "f5-tts" in _current_tts_model.lower(): |
|
|
|
|
|
result = tts( |
|
|
text=text, |
|
|
voice_preset=voice_prompt, |
|
|
return_tensors="pt" |
|
|
) |
|
|
wav = result["audio"].numpy().flatten() |
|
|
else: |
|
|
|
|
|
result = tts({"text": text, "voice_preset": voice_prompt}) |
|
|
wav = result["audio"] |
|
|
|
|
|
|
|
|
if hasattr(wav, 'numpy'): |
|
|
wav = wav.numpy() |
|
|
elif hasattr(wav, 'detach'): |
|
|
wav = wav.detach().numpy() |
|
|
|
|
|
|
|
|
wav = normalize_audio(wav) |
|
|
|
|
|
|
|
|
if sr != AUDIO_SAMPLE_RATE: |
|
|
wav = _resample_audio(wav, AUDIO_SAMPLE_RATE, sr) |
|
|
|
|
|
logger.info(f"Generated audio: {len(wav)/sr:.2f}s at {sr}Hz") |
|
|
return sr, wav.astype(np.float32) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Voice synthesis failed: {e}") |
|
|
|
|
|
return _create_fallback_audio(text, sr) |
|
|
|
|
|
def _resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: |
|
|
"""Resample audio using available methods.""" |
|
|
try: |
|
|
import librosa |
|
|
return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) |
|
|
except ImportError: |
|
|
|
|
|
ratio = target_sr / orig_sr |
|
|
new_length = int(len(audio) * ratio) |
|
|
return np.interp( |
|
|
np.linspace(0, len(audio), new_length), |
|
|
np.arange(len(audio)), |
|
|
audio |
|
|
) |
|
|
|
|
|
def _create_fallback_audio(text: str, sr: int) -> Tuple[int, np.ndarray]: |
|
|
"""Create fallback audio when TTS fails.""" |
|
|
try: |
|
|
|
|
|
duration = max(1.0, len(text) / 20.0) |
|
|
t = np.linspace(0, duration, int(sr * duration), endpoint=False) |
|
|
|
|
|
|
|
|
frequency = 440.0 |
|
|
wav = 0.1 * np.sin(2 * np.pi * frequency * t) |
|
|
|
|
|
|
|
|
wav += 0.05 * np.sin(2 * np.pi * frequency * 1.5 * t) |
|
|
|
|
|
logger.info(f"Created fallback audio: {duration:.2f}s") |
|
|
return sr, wav.astype(np.float32) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create fallback audio: {e}") |
|
|
|
|
|
duration = 2.0 |
|
|
wav = np.zeros(int(sr * duration)) |
|
|
return sr, wav.astype(np.float32) |
|
|
|
|
|
def normalize_audio(audio: np.ndarray, target_lufs: float = -23.0) -> np.ndarray: |
|
|
"""Normalize audio to broadcast standards.""" |
|
|
|
|
|
if np.max(np.abs(audio)) > 0: |
|
|
audio = audio / np.max(np.abs(audio)) * 0.95 |
|
|
|
|
|
|
|
|
audio = apply_compression(audio) |
|
|
|
|
|
return audio |
|
|
|
|
|
def apply_compression(audio: np.ndarray, ratio: float = 3.0, threshold: float = 0.7) -> np.ndarray: |
|
|
"""Apply gentle compression for broadcast quality.""" |
|
|
|
|
|
compressed = np.copy(audio) |
|
|
|
|
|
|
|
|
above_threshold = np.abs(audio) > threshold |
|
|
compressed[above_threshold] = np.sign(audio[above_threshold]) * ( |
|
|
threshold + (np.abs(audio[above_threshold]) - threshold) / ratio |
|
|
) |
|
|
|
|
|
return compressed |
|
|
|
|
|
def retro_bed(duration_s: float, sr: int = AUDIO_SAMPLE_RATE, bpm: int = 92): |
|
|
"""Generate retro synth background music.""" |
|
|
try: |
|
|
t = np.linspace(0, duration_s, int(sr * duration_s), endpoint=False) |
|
|
|
|
|
|
|
|
freqs = [220.0, 174.61, 196.0, 146.83] |
|
|
seg_len = int(len(t) / len(freqs)) if len(freqs) else len(t) |
|
|
sig = np.zeros_like(t) |
|
|
|
|
|
for i, f0 in enumerate(freqs): |
|
|
tri_t = t[i * seg_len:(i + 1) * seg_len] |
|
|
tri = 2 * np.abs(2 * ((tri_t * f0) % 1) - 1) - 1 |
|
|
sig[i * seg_len:(i + 1) * seg_len] = 0.15 * tri |
|
|
|
|
|
|
|
|
noise = 0.01 * np.random.randn(len(t)) |
|
|
bed = sig + noise |
|
|
|
|
|
|
|
|
try: |
|
|
from scipy import signal |
|
|
b, a = signal.butter(3, 3000, 'low', fs=sr) |
|
|
bed = signal.lfilter(b, a, bed) |
|
|
except ImportError: |
|
|
|
|
|
bed = np.convolve(bed, np.ones(5)/5, mode='same') |
|
|
|
|
|
return sr, bed.astype(np.float32) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to generate retro bed: {e}") |
|
|
|
|
|
silence = np.zeros(int(sr * duration_s)) |
|
|
return sr, silence.astype(np.float32) |
|
|
|
|
|
def mix_to_stereo(sr1, a, sr2, b, bed_gain=0.5): |
|
|
"""Mix two mono signals to stereo.""" |
|
|
assert sr1 == sr2, "Sample rates must match" |
|
|
|
|
|
n = max(len(a), len(b)) |
|
|
|
|
|
def pad(x): |
|
|
if len(x) < n: |
|
|
if len(x.shape) > 1: |
|
|
padding = np.zeros((n - len(x), x.shape[1])) |
|
|
else: |
|
|
padding = np.zeros(n - len(x)) |
|
|
x = np.concatenate([x, padding]) |
|
|
return x |
|
|
|
|
|
a = pad(a) |
|
|
b = pad(b) |
|
|
|
|
|
left = a + bed_gain * b |
|
|
right = a * 0.9 + bed_gain * 0.9 * b |
|
|
|
|
|
if len(left.shape) == 1: |
|
|
stereo = np.stack([left, right], axis=1) |
|
|
else: |
|
|
stereo = np.stack([left, right], axis=1) |
|
|
|
|
|
return sr1, np.clip(stereo, -1.0, 1.0) |
|
|
|
|
|
def write_wav(path: str, sr: int, wav: np.ndarray): |
|
|
"""Write audio to WAV file.""" |
|
|
try: |
|
|
import soundfile as sf |
|
|
sf.write(path, wav, sr) |
|
|
except ImportError: |
|
|
|
|
|
try: |
|
|
from scipy.io import wavfile |
|
|
|
|
|
wav_16bit = (wav * 32767).astype(np.int16) |
|
|
wavfile.write(path, sr, wav_16bit) |
|
|
except ImportError: |
|
|
logger.error("No audio writing library available (soundfile or scipy)") |
|
|
raise RuntimeError("Cannot write audio file - no audio library available") |
|
|
|
|
|
|