|
import fastapi |
|
import shutil |
|
import os |
|
import zipfile |
|
import io |
|
import uvicorn |
|
import threading |
|
import glob |
|
from typing import List |
|
import torch |
|
import gdown |
|
from soundfile import write |
|
from torchaudio import load |
|
from librosa import resample |
|
import logging |
|
logging.basicConfig(level=logging.DEBUG) |
|
|
|
from sgmse import ScoreModel |
|
from sgmse.util.other import pad_spec |
|
|
|
class ModelAPI: |
|
|
|
def __init__(self, host, port): |
|
|
|
self.host = host |
|
self.port = port |
|
|
|
self.base_path = os.path.join(os.path.expanduser("~"), ".modelapi") |
|
self.noisy_audio_path = os.path.join(self.base_path, "noisy_audio") |
|
self.enhanced_audio_path = os.path.join(self.base_path, "enhanced_audio") |
|
self.ckpt_path = None |
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
self.corrector = "ald" |
|
self.corrector_steps = 1 |
|
self.snr = 0.33 |
|
self.N = 50 |
|
|
|
|
|
for audio_path in [self.noisy_audio_path, self.enhanced_audio_path]: |
|
if not os.path.exists(audio_path): |
|
os.makedirs(audio_path) |
|
|
|
|
|
for filename in os.listdir(audio_path): |
|
file_path = os.path.join(audio_path, filename) |
|
|
|
|
|
try: |
|
if os.path.isfile(file_path) or os.path.islink(file_path): |
|
os.unlink(file_path) |
|
elif os.path.isdir(file_path): |
|
shutil.rmtree(file_path) |
|
except Exception as e: |
|
raise e |
|
|
|
self.app = fastapi.FastAPI() |
|
self._setup_routes() |
|
|
|
def _prepare(self): |
|
"""Miners should modify this function to fit their fine-tuned models. |
|
|
|
This function will make any preparations necessary to initialize the |
|
speech enhancement model (i.e. downloading checkpoint files, etc.) |
|
""" |
|
|
|
self.ckpt_path = os.path.join(self.base_path, "train_wsj0_2cta4cov_epoch=159.ckpt") |
|
if not os.path.exists(self.ckpt_path): |
|
|
|
file_id = "1ZENQY9WaRIZXu44lPBrPPfCAbx0Lub88" |
|
url = f"https://drive.google.com/uc?id={file_id}" |
|
|
|
gdown.download(url, self.ckpt_path) |
|
|
|
self.model = ScoreModel.load_from_checkpoint(self.ckpt_path, self.device) |
|
self.model.t_eps = 0.03 |
|
self.model.eval() |
|
|
|
def _enhance(self): |
|
""" |
|
Miners should modify this function to fit their fine-tuned models. |
|
|
|
This function will: |
|
1. Open each noisy .wav file |
|
2. Enhance the audio with the model |
|
3. Save the enhanced audio in .wav format to ModelAPI.enhanced_audio_path |
|
""" |
|
|
|
|
|
if self.model.backbone == 'ncsnpp_48k': |
|
target_sr = 48000 |
|
pad_mode = "reflection" |
|
else: |
|
target_sr = 16000 |
|
pad_mode = "zero_pad" |
|
|
|
|
|
noisy_files = sorted(glob.glob(os.path.join(self.noisy_audio_path, '*.wav'))) |
|
for noisy_file in noisy_files: |
|
|
|
filename = noisy_file.replace(self.noisy_audio_path, "") |
|
filename = filename[1:] if filename.startswith("/") else filename |
|
|
|
|
|
y, sr = load(noisy_file) |
|
|
|
|
|
if sr != target_sr: |
|
y = torch.tensor(resample(y.numpy(), orig_sr=sr, target_sr=target_sr)) |
|
|
|
T_orig = y.size(1) |
|
|
|
|
|
norm_factor = y.abs().max() |
|
y = y / norm_factor |
|
|
|
|
|
Y = torch.unsqueeze(self.model._forward_transform(self.model._stft(y.to(self.device))), 0) |
|
Y = pad_spec(Y, mode=pad_mode) |
|
|
|
|
|
sampler = self.model.get_pc_sampler('reverse_diffusion', self.corrector, Y.to(self.device), N=self.N, corrector_steps=self.corrector_steps, snr=self.snr) |
|
|
|
sample, _ = sampler() |
|
|
|
x_hat = self.model.to_audio(sample.squeeze(), T_orig) |
|
|
|
x_hat = x_hat * norm_factor |
|
|
|
os.makedirs(os.path.dirname(os.path.join(self.enhanced_audio_path, filename)), exist_ok=True) |
|
write(os.path.join(self.enhanced_audio_path, filename), x_hat.cpu().numpy(), target_sr) |
|
|
|
def _setup_routes(self): |
|
""" |
|
Setup API routes: |
|
|
|
/status/ : Communicates API status |
|
/upload-audio/ : Upload audio files, save to noisy audio directory |
|
/enhance/ : Enhance audio files, save to enhanced audio directory |
|
/download-enhanced/ : Download enhanced audio files |
|
""" |
|
self.app.get("/status/")(self.get_status) |
|
self.app.post("/prepare/")(self.prepare) |
|
self.app.post("/upload-audio/")(self.upload_audio) |
|
self.app.post("/enhance/")(self.enhance_audio) |
|
self.app.get("/download-enhanced/")(self.download_enhanced) |
|
|
|
def get_status(self): |
|
try: |
|
return {"container_running": True} |
|
except: |
|
raise fastapi.HTTPException(status_code=500, detail="An error occurred while fetching API status.") |
|
|
|
def prepare(self): |
|
try: |
|
self._prepare() |
|
return {'preparations': True} |
|
except: |
|
return fastapi.HTTPException(status_code=500, detail="An error occurred while fetching API status.") |
|
|
|
def upload_audio(self, files: List[fastapi.UploadFile] = fastapi.File(...)): |
|
|
|
uploaded_files = [] |
|
|
|
for file in files: |
|
try: |
|
|
|
file_path = os.path.join(self.noisy_audio_path, file.filename) |
|
|
|
|
|
with open(file_path, "wb") as f: |
|
while contents := file.file.read(1024*1024): |
|
f.write(contents) |
|
|
|
|
|
uploaded_files.append(file.filename) |
|
|
|
except: |
|
raise fastapi.HTTPException(status_code=500, detail="An error occurred while uploading the noisy files.") |
|
finally: |
|
file.file.close() |
|
|
|
print(f"uploaded files: {uploaded_files}") |
|
|
|
return {"uploaded_files": uploaded_files, "status": True} |
|
|
|
def enhance_audio(self): |
|
try: |
|
|
|
self._enhance() |
|
|
|
wav_files = glob.glob(os.path.join(self.enhanced_audio_path, '*.wav')) |
|
|
|
enhanced_files = [os.path.basename(file) for file in wav_files] |
|
return {"status": True} |
|
|
|
except Exception as e: |
|
print(f"Exception occured during enhancement: {e}") |
|
raise fastapi.HTTPException(status_code=500, detail="An error occurred while enhancing the noisy files.") |
|
|
|
def download_enhanced(self): |
|
try: |
|
|
|
zip_buffer = io.BytesIO() |
|
|
|
with zipfile.ZipFile(zip_buffer, "w") as zip_file: |
|
|
|
for wav_file in glob.glob(os.path.join(self.enhanced_audio_path, '*.wav')): |
|
zip_file.write(wav_file, arcname=os.path.basename(wav_file)) |
|
|
|
|
|
zip_buffer.seek(0) |
|
|
|
|
|
return fastapi.responses.StreamingResponse( |
|
iter([zip_buffer.getvalue()]), |
|
media_type="application/zip", |
|
headers={"Content-Disposition": "attachment; filename=enhanced_audio_files.zip"} |
|
) |
|
|
|
except Exception as e: |
|
|
|
raise fastapi.HTTPException(status_code=500, detail=f"An error occurred while creating the download file: {str(e)}") |
|
|
|
def run(self): |
|
|
|
uvicorn.run(self.app, host=self.host, port=self.port) |