Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Request | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
import base64 | |
import sys | |
import os | |
import numpy as np | |
import io | |
import librosa | |
import torch | |
from scipy.signal import resample | |
import soundfile as sf | |
from logic import plot_waveforms, plot_data, plot_heatmap | |
from encoder import inference as encoder | |
from encoder.params_model import model_embedding_size as speaker_embedding_size | |
from synthesizer.inference import Synthesizer | |
from utils.argutils import print_args | |
from vocoder import inference as vocoder | |
app = FastAPI() | |
def read_root(): | |
data = {"Voice": "Cloning", "Status": "Success"} | |
return JSONResponse(content=data) | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
async def synthesize(request: Request): | |
print("API call successful") | |
json = await request.json() | |
print(json) | |
input_text = json['input_text'] | |
cloning_audio = json['cloning_audio'] | |
print("Preparing the encoder, the synthesizer and the vocoder...") | |
encoder.load_model('saved_models/encoder.pt') | |
synthesizer = Synthesizer('saved_models/synthesizer.pt') | |
vocoder.load_model('saved_models/vocoder.pt') | |
# Speaker Encoder Pipeline | |
print("Extracting Speaker Embeddings...") | |
# Decode base64 audio data | |
audio_data = base64.b64decode(cloning_audio) | |
# Load the audio data directly without saving it to a file | |
original_wav, sampling_rate = librosa.load(io.BytesIO(audio_data), sr=None, mono=True) | |
print(f"Original Sampling rate: {sampling_rate}") | |
target_sr = 16000 # Target sample rate | |
resampled_wav = resample(original_wav, int(len(original_wav) * target_sr / sampling_rate)) | |
# Convert the resampled waveform to float type | |
resampled_wav = resampled_wav.astype(np.float32) | |
# Preprocess the resampled waveform | |
preprocessed_wav = encoder.preprocess_wav(resampled_wav) | |
embed = encoder.embed_utterance(preprocessed_wav) | |
print(f"Speaker Embeddings of size {np.shape(embed)} extracted successfully.") | |
# Plot the heatmap of the embeddings | |
original_heatmap_base64 = plot_heatmap(embed) | |
# Synthesizer Pipeline | |
print("Generating Mel-spectrogram...") | |
texts = [input_text] | |
embeds = [embed] | |
specs, alignments = synthesizer.synthesize_spectrograms(texts, embeds, True) | |
spec = specs[0] | |
alignments = alignments[0].detach().cpu().numpy() | |
spec_channels = spec.shape | |
print(f"Number of Mel channels, Number of timesteps: {spec_channels}") | |
# Convert mel-spectrogram and alignment to base64 for display in HTML | |
mel_output_base64 = plot_data(spec, index=0) | |
alignment_base64 = plot_data(alignments, index=1) | |
# For original audio | |
mel = synthesizer.make_spectrogram(original_wav) | |
print(f"Shape of original: {mel.shape}") | |
original_mel_base64 = plot_data(mel, index=0) | |
# Vocoder pipeline | |
print("Starting audio synthesis...") | |
generated_wav = vocoder.infer_waveform(spec) | |
print("\nPost-processing generated audio...") | |
# Trim excess silences to compensate for gaps in spectrograms | |
generated_wav = encoder.preprocess_wav(generated_wav) | |
print("Audio generated successfully.") | |
# Create an in-memory buffer to store the audio data | |
audio_buffer = io.BytesIO() | |
# Open the in-memory buffer as a sound file | |
with sf.SoundFile(audio_buffer, mode='w', format='WAV', samplerate=synthesizer.sample_rate, channels=1, subtype='PCM_16') as file: | |
file.write(generated_wav) | |
# Get the bytes from the buffer | |
cloned_audio_data = audio_buffer.getvalue() | |
# Encode the audio data as Base64 | |
audio_base64 = base64.b64encode(cloned_audio_data).decode('utf-8') | |
print("Creating time-domain waveform...") | |
# Plot the waveform | |
cloned_wave_base64 = plot_waveforms(cloned_audio_data) | |
original_wave_base64 = plot_waveforms(audio_data) | |
# Cloned Audio Analysis Pipeline | |
print("Extracting Speaker Embeddings from cloned audio...") | |
cloned_wav, cloned_sr = librosa.load(io.BytesIO(cloned_audio_data), sr=None, mono=True) | |
print(f"Cloned Sampling rate: {cloned_sr}") | |
cloned_resampled_wav = resample(cloned_wav, int(len(cloned_wav) * target_sr / cloned_sr)) | |
cloned_resampled_wav = cloned_resampled_wav.astype(np.float32) | |
cloned_preprocessed_wav = encoder.preprocess_wav(cloned_resampled_wav) | |
cloned_embed = encoder.embed_utterance(cloned_preprocessed_wav) | |
print(f"Speaker Embeddings of size {np.shape(cloned_embed)} extracted successfully from Cloned Audio.") | |
# Plot the heatmap of the embeddings | |
cloned_heatmap_base64 = plot_heatmap(cloned_embed) | |
# Customize the response based on the information you want to send to the frontend | |
response_data = { | |
'mel_spectrogram': mel_output_base64, | |
'original_mel_spectrogram': original_mel_base64, | |
'cloned_audio_data': audio_base64, | |
'cloned_waveform': cloned_wave_base64, | |
'original_waveform': original_wave_base64, | |
'original_embeddings_heatmap': original_heatmap_base64, | |
'cloned_embeddings_heatmap': cloned_heatmap_base64, | |
'alignment': alignment_base64, | |
} | |
print("Response generated successfully...") | |
return JSONResponse(content=response_data) |