File size: 10,083 Bytes
b12e499 869d082 b12e499 869d082 b12e499 869d082 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
"""
Audio processing utilities for EceMotion Pictures.
Enhanced text-to-speech generation with robust error handling and fallbacks.
"""
import numpy as np
import logging
import os
from typing import Tuple, Optional, Dict, Any
from config import (
MODEL_AUDIO, MODEL_CONFIGS, AUDIO_SAMPLE_RATE, get_device, get_safe_model_name
)
logger = logging.getLogger(__name__)
# Global model cache
_tts_pipe = None
_current_tts_model = None
def get_tts_pipe(model_name: str = MODEL_AUDIO, device: str = None):
"""Get or create TTS pipeline with lazy loading and model switching."""
global _tts_pipe, _current_tts_model
if device is None:
device = get_device()
# Use safe model name
safe_model_name = get_safe_model_name(model_name, "audio")
if _tts_pipe is None or _current_tts_model != safe_model_name:
logger.info(f"Loading TTS model: {safe_model_name}")
try:
if "f5-tts" in safe_model_name.lower():
# Try F5-TTS first
_tts_pipe = _load_f5_tts(safe_model_name, device)
else:
# Use standard TTS pipeline
_tts_pipe = _load_standard_tts(safe_model_name, device)
if _tts_pipe is not None:
_current_tts_model = safe_model_name
logger.info(f"TTS model {safe_model_name} loaded successfully")
else:
raise RuntimeError("Failed to load any TTS model")
except Exception as e:
logger.error(f"Failed to load {safe_model_name}: {e}")
# Fallback to original model
_tts_pipe = _load_standard_tts("parler-tts/parler-tts-mini-v1", device)
_current_tts_model = "parler-tts/parler-tts-mini-v1"
return _tts_pipe
def _load_f5_tts(model_name: str, device: str):
"""Load F5-TTS model."""
try:
from transformers import pipeline
pipe = pipeline(
"text-to-speech",
model=model_name,
torch_dtype="auto",
device_map=device if device == "cuda" else None
)
return pipe
except Exception as e:
logger.error(f"Failed to load F5-TTS: {e}")
return None
def _load_standard_tts(model_name: str, device: str):
"""Load standard TTS model."""
try:
from transformers import pipeline
import torch
# Fix device string - convert "auto" to proper device
if device == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
pipe = pipeline(
"text-to-speech",
model=model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
)
if device == "cuda":
pipe = pipe.to(device)
return pipe
except Exception as e:
logger.error(f"Failed to load standard TTS: {e}")
return None
def synth_voice(text: str, voice_prompt: str, sr: int = AUDIO_SAMPLE_RATE,
model_name: str = MODEL_AUDIO, device: str = None) -> Tuple[int, np.ndarray]:
"""
Generate speech from text with enhanced TTS support.
"""
if device is None:
device = get_device()
tts = get_tts_pipe(model_name, device)
model_config = MODEL_CONFIGS.get(_current_tts_model, {})
# Validate text length
max_length = model_config.get("max_text_length", 500)
min_length = model_config.get("min_text_length", 10)
if len(text) > max_length:
logger.warning(f"Text too long ({len(text)} chars), truncating to {max_length}")
text = text[:max_length]
elif len(text) < min_length:
logger.warning(f"Text too short ({len(text)} chars), padding")
text = text + " " * (min_length - len(text))
try:
if "f5-tts" in _current_tts_model.lower():
# F5-TTS specific generation
result = tts(
text=text,
voice_preset=voice_prompt,
return_tensors="pt"
)
wav = result["audio"].numpy().flatten()
else:
# Standard pipeline (Parler-TTS, etc.)
result = tts({"text": text, "voice_preset": voice_prompt})
wav = result["audio"]
# Ensure proper format
if hasattr(wav, 'numpy'):
wav = wav.numpy()
elif hasattr(wav, 'detach'):
wav = wav.detach().numpy()
# Normalize audio
wav = normalize_audio(wav)
# Resample if needed
if sr != AUDIO_SAMPLE_RATE:
wav = _resample_audio(wav, AUDIO_SAMPLE_RATE, sr)
logger.info(f"Generated audio: {len(wav)/sr:.2f}s at {sr}Hz")
return sr, wav.astype(np.float32)
except Exception as e:
logger.error(f"Voice synthesis failed: {e}")
# Return fallback audio
return _create_fallback_audio(text, sr)
def _resample_audio(audio: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
"""Resample audio using available methods."""
try:
import librosa
return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr)
except ImportError:
# Simple resampling without librosa
ratio = target_sr / orig_sr
new_length = int(len(audio) * ratio)
return np.interp(
np.linspace(0, len(audio), new_length),
np.arange(len(audio)),
audio
)
def _create_fallback_audio(text: str, sr: int) -> Tuple[int, np.ndarray]:
"""Create fallback audio when TTS fails."""
try:
# Create a simple tone based on text length
duration = max(1.0, len(text) / 20.0) # Rough estimate
t = np.linspace(0, duration, int(sr * duration), endpoint=False)
# Generate a simple tone
frequency = 440.0 # A4 note
wav = 0.1 * np.sin(2 * np.pi * frequency * t)
# Add some variation
wav += 0.05 * np.sin(2 * np.pi * frequency * 1.5 * t)
logger.info(f"Created fallback audio: {duration:.2f}s")
return sr, wav.astype(np.float32)
except Exception as e:
logger.error(f"Failed to create fallback audio: {e}")
# Last resort: silence
duration = 2.0
wav = np.zeros(int(sr * duration))
return sr, wav.astype(np.float32)
def normalize_audio(audio: np.ndarray, target_lufs: float = -23.0) -> np.ndarray:
"""Normalize audio to broadcast standards."""
# Simple peak normalization first
if np.max(np.abs(audio)) > 0:
audio = audio / np.max(np.abs(audio)) * 0.95
# Apply gentle compression
audio = apply_compression(audio)
return audio
def apply_compression(audio: np.ndarray, ratio: float = 3.0, threshold: float = 0.7) -> np.ndarray:
"""Apply gentle compression for broadcast quality."""
# Simple soft-knee compression
compressed = np.copy(audio)
# Above threshold, apply compression
above_threshold = np.abs(audio) > threshold
compressed[above_threshold] = np.sign(audio[above_threshold]) * (
threshold + (np.abs(audio[above_threshold]) - threshold) / ratio
)
return compressed
def retro_bed(duration_s: float, sr: int = AUDIO_SAMPLE_RATE, bpm: int = 92):
"""Generate retro synth background music."""
try:
t = np.linspace(0, duration_s, int(sr * duration_s), endpoint=False)
# Chord progression root frequencies (A minor style)
freqs = [220.0, 174.61, 196.0, 146.83]
seg_len = int(len(t) / len(freqs)) if len(freqs) else len(t)
sig = np.zeros_like(t)
for i, f0 in enumerate(freqs):
tri_t = t[i * seg_len:(i + 1) * seg_len]
tri = 2 * np.abs(2 * ((tri_t * f0) % 1) - 1) - 1
sig[i * seg_len:(i + 1) * seg_len] = 0.15 * tri
# Add tape noise
noise = 0.01 * np.random.randn(len(t))
bed = sig + noise
# Apply gentle lowpass filter
try:
from scipy import signal
b, a = signal.butter(3, 3000, 'low', fs=sr)
bed = signal.lfilter(b, a, bed)
except ImportError:
# Simple averaging filter if scipy not available
bed = np.convolve(bed, np.ones(5)/5, mode='same')
return sr, bed.astype(np.float32)
except Exception as e:
logger.error(f"Failed to generate retro bed: {e}")
# Return silence
silence = np.zeros(int(sr * duration_s))
return sr, silence.astype(np.float32)
def mix_to_stereo(sr1, a, sr2, b, bed_gain=0.5):
"""Mix two mono signals to stereo."""
assert sr1 == sr2, "Sample rates must match"
n = max(len(a), len(b))
def pad(x):
if len(x) < n:
if len(x.shape) > 1: # Stereo
padding = np.zeros((n - len(x), x.shape[1]))
else: # Mono
padding = np.zeros(n - len(x))
x = np.concatenate([x, padding])
return x
a = pad(a)
b = pad(b)
left = a + bed_gain * b
right = a * 0.9 + bed_gain * 0.9 * b
if len(left.shape) == 1: # Mono to stereo
stereo = np.stack([left, right], axis=1)
else: # Already stereo
stereo = np.stack([left, right], axis=1)
return sr1, np.clip(stereo, -1.0, 1.0)
def write_wav(path: str, sr: int, wav: np.ndarray):
"""Write audio to WAV file."""
try:
import soundfile as sf
sf.write(path, wav, sr)
except ImportError:
# Fallback using scipy
try:
from scipy.io import wavfile
# Convert to 16-bit
wav_16bit = (wav * 32767).astype(np.int16)
wavfile.write(path, sr, wav_16bit)
except ImportError:
logger.error("No audio writing library available (soundfile or scipy)")
raise RuntimeError("Cannot write audio file - no audio library available")
|