|
|
|
|
|
""" |
|
|
BeigeTTS - Standard Inference Script |
|
|
Research release for high-quality neural speech synthesis |
|
|
Based on BlandAI's production Khaki TTS system |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import soundfile as sf |
|
|
import numpy as np |
|
|
from neucodec import NeuCodec |
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
import argparse |
|
|
from typing import Optional, List, Tuple |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TTSConfig: |
|
|
"""Configuration for BeigeTTS inference""" |
|
|
|
|
|
AUDIO_START_TOKEN = 262145 |
|
|
AUDIO_END_TOKEN = 262146 |
|
|
NEUCODEC_BASE_OFFSET = 262154 |
|
|
NEUCODEC_VOCABULARY_SIZE = 65536 |
|
|
AUDIO_TOKEN_MIN = NEUCODEC_BASE_OFFSET |
|
|
AUDIO_TOKEN_MAX = NEUCODEC_BASE_OFFSET + NEUCODEC_VOCABULARY_SIZE |
|
|
|
|
|
|
|
|
DEFAULT_TEMPERATURE = 0.1 |
|
|
DEFAULT_TOP_P = 0.97 |
|
|
DEFAULT_MAX_TOKENS = 500 |
|
|
SAMPLE_RATE = 24000 |
|
|
|
|
|
|
|
|
MAX_AUDIO_TOKENS = 1000 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BeigeTTS: |
|
|
"""BeigeTTS synthesis engine - research version of Khaki TTS""" |
|
|
|
|
|
def __init__(self, model_path: str = "BlandAI/BeigeTTS", device: str = "auto"): |
|
|
"""Initialize BeigeTTS engine |
|
|
|
|
|
Args: |
|
|
model_path: HuggingFace model path or local directory |
|
|
device: Device for inference ("auto", "cuda", "cpu") |
|
|
""" |
|
|
self.config = TTSConfig() |
|
|
self.device = self._setup_device(device) |
|
|
|
|
|
print("Loading BeigeTTS model (research release)...") |
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
model_path, |
|
|
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, |
|
|
device_map="auto" if device == "auto" else None, |
|
|
trust_remote_code=True, |
|
|
) |
|
|
if device != "auto": |
|
|
self.model = self.model.to(self.device) |
|
|
self.model.eval() |
|
|
|
|
|
print("Loading tokenizer...") |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path) |
|
|
if self.tokenizer.pad_token is None: |
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
print("Loading NeuCodec...") |
|
|
self.neucodec = NeuCodec.from_pretrained("neuphonic/neucodec") |
|
|
self.neucodec.eval() |
|
|
if self.device.type == "cuda": |
|
|
self.neucodec = self.neucodec.to(self.device) |
|
|
|
|
|
def _setup_device(self, device: str) -> torch.device: |
|
|
"""Setup compute device""" |
|
|
if device == "auto": |
|
|
return torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
return torch.device(device) |
|
|
|
|
|
def synthesize( |
|
|
self, |
|
|
text: str, |
|
|
temperature: float = None, |
|
|
top_p: float = None, |
|
|
max_tokens: int = None, |
|
|
voice_prompt: Optional[str] = None |
|
|
) -> Tuple[np.ndarray, int]: |
|
|
"""Synthesize speech from text |
|
|
|
|
|
Note: Full Khaki system supports 57 languages, voice cloning, |
|
|
and unlimited duration. This research release is English-only. |
|
|
|
|
|
Args: |
|
|
text: Input text to synthesize |
|
|
temperature: Sampling temperature (lower = more deterministic) |
|
|
top_p: Nucleus sampling parameter |
|
|
max_tokens: Maximum tokens to generate |
|
|
voice_prompt: Optional voice conditioning (limited in BeigeTTS) |
|
|
|
|
|
Returns: |
|
|
Tuple of (audio_array, sample_rate) |
|
|
""" |
|
|
|
|
|
temperature = temperature or self.config.DEFAULT_TEMPERATURE |
|
|
top_p = top_p or self.config.DEFAULT_TOP_P |
|
|
max_tokens = max_tokens or self.config.DEFAULT_MAX_TOKENS |
|
|
|
|
|
|
|
|
prompt = self._format_prompt(text, voice_prompt) |
|
|
|
|
|
|
|
|
audio_tokens = self._generate_tokens(prompt, temperature, top_p, max_tokens) |
|
|
|
|
|
if not audio_tokens: |
|
|
raise ValueError("No audio tokens generated") |
|
|
|
|
|
|
|
|
audio = self._decode_audio(audio_tokens) |
|
|
|
|
|
return audio, self.config.SAMPLE_RATE |
|
|
|
|
|
def _format_prompt(self, text: str, voice_prompt: Optional[str] = None) -> str: |
|
|
"""Format text into model prompt""" |
|
|
|
|
|
base_prompt = f"<start_of_turn>user\n{text}<end_of_turn>\n<start_of_turn>model\n<start_of_speech>" |
|
|
|
|
|
|
|
|
if voice_prompt: |
|
|
base_prompt = f"[Voice: {voice_prompt}]\n{base_prompt}" |
|
|
|
|
|
return base_prompt |
|
|
|
|
|
def _generate_tokens( |
|
|
self, |
|
|
prompt: str, |
|
|
temperature: float, |
|
|
top_p: float, |
|
|
max_tokens: int |
|
|
) -> List[int]: |
|
|
"""Generate audio tokens from prompt""" |
|
|
|
|
|
|
|
|
inputs = self.tokenizer(prompt, return_tensors="pt") |
|
|
input_ids = inputs.input_ids.to(self.model.device) |
|
|
|
|
|
print(f"Generating audio tokens (temp={temperature}, top_p={top_p})...") |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
input_ids, |
|
|
max_new_tokens=max_tokens, |
|
|
temperature=temperature, |
|
|
top_p=top_p, |
|
|
do_sample=True, |
|
|
pad_token_id=self.tokenizer.pad_token_id, |
|
|
eos_token_id=[self.tokenizer.eos_token_id, self.config.AUDIO_END_TOKEN], |
|
|
) |
|
|
|
|
|
|
|
|
generated = outputs[0, input_ids.shape[1]:].cpu().tolist() |
|
|
audio_tokens = [] |
|
|
|
|
|
for token_id in generated: |
|
|
if self.config.AUDIO_TOKEN_MIN <= token_id < self.config.AUDIO_TOKEN_MAX: |
|
|
audio_tokens.append(token_id - self.config.NEUCODEC_BASE_OFFSET) |
|
|
elif token_id == self.config.AUDIO_END_TOKEN: |
|
|
break |
|
|
|
|
|
if len(audio_tokens) >= self.config.MAX_AUDIO_TOKENS: |
|
|
print(f"Reached maximum audio length ({self.config.MAX_AUDIO_TOKENS} tokens)") |
|
|
break |
|
|
|
|
|
print(f"Generated {len(audio_tokens)} audio tokens") |
|
|
return audio_tokens |
|
|
|
|
|
def _decode_audio(self, audio_tokens: List[int]) -> np.ndarray: |
|
|
"""Decode audio tokens to waveform""" |
|
|
|
|
|
|
|
|
audio_array = np.array(audio_tokens, dtype=np.int32) |
|
|
audio_array = np.clip(audio_array, 0, self.config.NEUCODEC_VOCABULARY_SIZE - 1) |
|
|
|
|
|
|
|
|
fsq_codes = torch.tensor(audio_array, dtype=torch.long) |
|
|
fsq_codes = fsq_codes.unsqueeze(0).unsqueeze(1) |
|
|
|
|
|
if self.device.type == "cuda": |
|
|
fsq_codes = fsq_codes.to(self.device) |
|
|
|
|
|
print(f"Decoding audio (shape: {fsq_codes.shape})...") |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
wav = self.neucodec.decode_code(fsq_codes).cpu() |
|
|
|
|
|
|
|
|
if wav.dim() == 3: |
|
|
wav = wav[0, 0] |
|
|
elif wav.dim() == 2: |
|
|
wav = wav[0] |
|
|
|
|
|
wav = wav.numpy() |
|
|
|
|
|
|
|
|
if np.abs(wav).max() > 0: |
|
|
wav = wav / np.abs(wav).max() * 0.95 |
|
|
|
|
|
return wav |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="BeigeTTS Synthesis (Research Release)") |
|
|
parser.add_argument("text", type=str, help="Text to synthesize") |
|
|
parser.add_argument("-o", "--output", type=str, default="output.wav", help="Output WAV file") |
|
|
parser.add_argument("-m", "--model", type=str, default="BlandAI/BeigeTTS", help="Model path") |
|
|
parser.add_argument("-t", "--temperature", type=float, default=0.1, help="Sampling temperature") |
|
|
parser.add_argument("-p", "--top-p", type=float, default=0.97, help="Top-p sampling") |
|
|
parser.add_argument("--max-tokens", type=int, default=500, help="Maximum tokens to generate") |
|
|
parser.add_argument("--voice", type=str, help="Voice conditioning prompt") |
|
|
parser.add_argument("--device", type=str, default="auto", help="Device (auto/cuda/cpu)") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
tts = BeigeTTS(model_path=args.model, device=args.device) |
|
|
|
|
|
|
|
|
try: |
|
|
audio, sr = tts.synthesize( |
|
|
text=args.text, |
|
|
temperature=args.temperature, |
|
|
top_p=args.top_p, |
|
|
max_tokens=args.max_tokens, |
|
|
voice_prompt=args.voice |
|
|
) |
|
|
|
|
|
|
|
|
sf.write(args.output, audio, sr) |
|
|
duration = len(audio) / sr |
|
|
print(f"β
Saved {duration:.1f}s of audio to {args.output}") |
|
|
print("Note: This is a research release. Production Khaki TTS supports 57 languages and unlimited duration.") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Synthesis failed: {e}") |
|
|
return 1 |
|
|
|
|
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
exit(main()) |
|
|
|