Spaces:
Build error
Build error
import os | |
os.environ["HF_HOME"] = "/tmp" | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp" | |
os.environ["TORCH_HOME"] = "/tmp" | |
os.environ["XDG_CACHE_HOME"] = "/tmp" | |
import io | |
import re | |
import math | |
import numpy as np | |
import scipy.io.wavfile | |
import torch | |
from fastapi import FastAPI, Query | |
from fastapi.responses import StreamingResponse | |
from pydantic import BaseModel | |
from transformers import VitsModel, AutoTokenizer | |
app = FastAPI() | |
model = VitsModel.from_pretrained("Somali-tts/somali_tts_model") | |
tokenizer = AutoTokenizer.from_pretrained("saleolow/somali-mms-tts") | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
model.eval() | |
number_words = { | |
0: "eber", 1: "koow", 2: "labo", 3: "seddex", 4: "afar", 5: "shan", | |
6: "lix", 7: "todobo", 8: "sideed", 9: "sagaal", 10: "toban", | |
11: "toban iyo koow", 12: "toban iyo labo", 13: "toban iyo seddex", | |
14: "toban iyo afar", 15: "toban iyo shan", 16: "toban iyo lix", | |
17: "toban iyo todobo", 18: "toban iyo sideed", 19: "toban iyo sagaal", | |
20: "labaatan", 30: "sodon", 40: "afartan", 50: "konton", | |
60: "lixdan", 70: "todobaatan", 80: "sideetan", 90: "sagaashan", | |
100: "boqol", 1000: "kun" | |
} | |
def number_to_words(number: int) -> str: | |
if number < 20: | |
return number_words[number] | |
elif number < 100: | |
tens, unit = divmod(number, 10) | |
return number_words[tens * 10] + (" iyo " + number_words[unit] if unit else "") | |
elif number < 1000: | |
hundreds, remainder = divmod(number, 100) | |
part = (number_words[hundreds] + " boqol") if hundreds > 1 else "boqol" | |
if remainder: | |
part += " iyo " + number_to_words(remainder) | |
return part | |
elif number < 1000000: | |
thousands, remainder = divmod(number, 1000) | |
words = [] | |
if thousands == 1: | |
words.append("kun") | |
else: | |
words.append(number_to_words(thousands) + " kun") | |
if remainder: | |
words.append("iyo " + number_to_words(remainder)) | |
return " ".join(words) | |
elif number < 1000000000: | |
millions, remainder = divmod(number, 1000000) | |
words = [] | |
if millions == 1: | |
words.append("milyan") | |
else: | |
words.append(number_to_words(millions) + " milyan") | |
if remainder: | |
words.append(number_to_words(remainder)) | |
return " ".join(words) | |
else: | |
return str(number) | |
def normalize_text(text: str) -> str: | |
numbers = re.findall(r'\d+', text) | |
for num in numbers: | |
text = text.replace(num, number_to_words(int(num))) | |
text = text.replace("KH", "qa").replace("Z", "S") | |
text = text.replace("SH", "SHa'a").replace("DH", "Dha'a") | |
text = text.replace("ZamZam", "SamSam") | |
return text | |
def waveform_to_wav_bytes(waveform: torch.Tensor, sample_rate: int = 22050) -> bytes: | |
np_waveform = waveform.cpu().numpy() | |
if np_waveform.ndim == 3: | |
np_waveform = np_waveform[0] | |
if np_waveform.ndim == 2: | |
np_waveform = np_waveform.mean(axis=0) | |
np_waveform = np.clip(np_waveform, -1.0, 1.0).astype(np.float32) | |
pcm_waveform = (np_waveform * 32767).astype(np.int16) | |
buf = io.BytesIO() | |
scipy.io.wavfile.write(buf, rate=sample_rate, data=pcm_waveform) | |
buf.seek(0) | |
return buf.read() | |
class TextIn(BaseModel): | |
inputs: str | |
async def synthesize_post(data: TextIn): | |
text = normalize_text(data.inputs) | |
inputs = tokenizer(text, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
output = model(**inputs) | |
if hasattr(output, "waveform"): | |
waveform = output.waveform | |
elif isinstance(output, dict) and "waveform" in output: | |
waveform = output["waveform"] | |
elif isinstance(output, (tuple, list)): | |
waveform = output[0] | |
else: | |
return {"error": "Waveform not found in model output"} | |
sample_rate = getattr(model.config, "sampling_rate", 22050) | |
wav_bytes = waveform_to_wav_bytes(waveform, sample_rate=sample_rate) | |
return StreamingResponse(io.BytesIO(wav_bytes), media_type="audio/wav") | |
async def synthesize_get(text: str = Query(..., description="Text to synthesize"), test: bool = Query(False)): | |
if test: | |
duration_s = 2.0 | |
sample_rate = 22050 | |
t = np.linspace(0, duration_s, int(sample_rate * duration_s), endpoint=False) | |
freq = 440 | |
waveform = 0.5 * np.sin(2 * math.pi * freq * t).astype(np.float32) | |
pcm_waveform = (waveform * 32767).astype(np.int16) | |
buf = io.BytesIO() | |
scipy.io.wavfile.write(buf, rate=sample_rate, data=pcm_waveform) | |
buf.seek(0) | |
return StreamingResponse(buf, media_type="audio/wav") | |
normalized = normalize_text(text) | |
inputs = tokenizer(normalized, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
output = model(**inputs) | |
if hasattr(output, "waveform"): | |
waveform = output.waveform | |
elif isinstance(output, dict) and "waveform" in output: | |
waveform = output["waveform"] | |
elif isinstance(output, (tuple, list)): | |
waveform = output[0] | |
else: | |
return {"error": "Waveform not found in model output"} | |
sample_rate = getattr(model.config, "sampling_rate", 22050) | |
wav_bytes = waveform_to_wav_bytes(waveform, sample_rate=sample_rate) | |
return StreamingResponse(io.BytesIO(wav_bytes), media_type="audio/wav") | |