| import fastapi
|
| import shutil
|
| import os
|
| import zipfile
|
| import io
|
| import uvicorn
|
| import threading
|
| import glob
|
| from typing import List
|
| import torch
|
| import gdown
|
| from soundfile import write
|
| from torchaudio import load
|
| from librosa import resample
|
| import logging
|
| logging.basicConfig(level=logging.DEBUG)
|
|
|
| from sgmse import ScoreModel
|
| from sgmse.util.other import pad_spec
|
|
|
| class ModelAPI:
|
|
|
| def __init__(self, host, port):
|
|
|
| self.host = host
|
| self.port = port
|
|
|
| self.base_path = os.path.join(os.path.expanduser("~"), ".modelapi")
|
| self.noisy_audio_path = os.path.join(self.base_path, "noisy_audio")
|
| self.enhanced_audio_path = os.path.join(self.base_path, "enhanced_audio")
|
| app_dir = os.path.dirname(os.path.abspath(__file__))
|
| self.ckpt_path = os.path.join(app_dir, "epoch=326-step=408750.ckpt")
|
| self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
| self.corrector = "ald"
|
| self.corrector_steps = 1
|
| self.snr = 0.33
|
| self.N = 50
|
|
|
|
|
| for audio_path in [self.noisy_audio_path, self.enhanced_audio_path]:
|
| if not os.path.exists(audio_path):
|
| os.makedirs(audio_path)
|
|
|
|
|
| for filename in os.listdir(audio_path):
|
| file_path = os.path.join(audio_path, filename)
|
|
|
|
|
| try:
|
| if os.path.isfile(file_path) or os.path.islink(file_path):
|
| os.unlink(file_path)
|
| elif os.path.isdir(file_path):
|
| shutil.rmtree(file_path)
|
| except Exception as e:
|
| raise e
|
|
|
| self.app = fastapi.FastAPI()
|
| self._setup_routes()
|
|
|
| def _prepare(self):
|
| """Miners should modify this function to fit their fine-tuned models.
|
|
|
| This function will make any preparations necessary to initialize the
|
| speech enhancement model (i.e. downloading checkpoint files, etc.)
|
| """
|
|
|
| self.model = ScoreModel.load_from_checkpoint(self.ckpt_path, self.device)
|
| self.model.t_eps = 0.03
|
| self.model.eval()
|
|
|
| def _enhance(self):
|
| """
|
| Miners should modify this function to fit their fine-tuned models.
|
|
|
| This function will:
|
| 1. Open each noisy .wav file
|
| 2. Enhance the audio with the model
|
| 3. Save the enhanced audio in .wav format to ModelAPI.enhanced_audio_path
|
| """
|
|
|
|
|
| if self.model.backbone == 'ncsnpp_48k':
|
| target_sr = 48000
|
| pad_mode = "reflection"
|
| else:
|
| target_sr = 16000
|
| pad_mode = "zero_pad"
|
|
|
|
|
| noisy_files = sorted(glob.glob(os.path.join(self.noisy_audio_path, '*.wav')))
|
| for noisy_file in noisy_files:
|
|
|
| filename = noisy_file.replace(self.noisy_audio_path, "")
|
| filename = filename[1:] if filename.startswith("/") else filename
|
|
|
|
|
| y, sr = load(noisy_file)
|
|
|
|
|
| if sr != target_sr:
|
| y = torch.tensor(resample(y.numpy(), orig_sr=sr, target_sr=target_sr))
|
|
|
| T_orig = y.size(1)
|
|
|
|
|
| norm_factor = y.abs().max()
|
| y = y / norm_factor
|
|
|
|
|
| Y = torch.unsqueeze(self.model._forward_transform(self.model._stft(y.to(self.device))), 0)
|
| Y = pad_spec(Y, mode=pad_mode)
|
|
|
|
|
| sampler = self.model.get_pc_sampler('reverse_diffusion', self.corrector, Y.to(self.device), N=self.N, corrector_steps=self.corrector_steps, snr=self.snr)
|
|
|
| sample, _ = sampler()
|
|
|
| x_hat = self.model.to_audio(sample.squeeze(), T_orig)
|
|
|
| x_hat = x_hat * norm_factor
|
|
|
| os.makedirs(os.path.dirname(os.path.join(self.enhanced_audio_path, filename)), exist_ok=True)
|
| write(os.path.join(self.enhanced_audio_path, filename), x_hat.cpu().numpy(), target_sr)
|
|
|
| def _setup_routes(self):
|
| """
|
| Setup API routes:
|
|
|
| /status/ : Communicates API status
|
| /upload-audio/ : Upload audio files, save to noisy audio directory
|
| /enhance/ : Enhance audio files, save to enhanced audio directory
|
| /download-enhanced/ : Download enhanced audio files
|
| """
|
| self.app.get("/status/")(self.get_status)
|
| self.app.post("/prepare/")(self.prepare)
|
| self.app.post("/upload-audio/")(self.upload_audio)
|
| self.app.post("/enhance/")(self.enhance_audio)
|
| self.app.get("/download-enhanced/")(self.download_enhanced)
|
|
|
| def get_status(self):
|
| try:
|
| return {"container_running": True}
|
| except:
|
| raise fastapi.HTTPException(status_code=500, detail="An error occurred while fetching API status.")
|
|
|
| def prepare(self):
|
| try:
|
| self._prepare()
|
| return {'preparations': True}
|
| except:
|
| return fastapi.HTTPException(status_code=500, detail="An error occurred while fetching API status.")
|
|
|
| def upload_audio(self, files: List[fastapi.UploadFile] = fastapi.File(...)):
|
|
|
| uploaded_files = []
|
|
|
| for file in files:
|
| try:
|
|
|
| file_path = os.path.join(self.noisy_audio_path, file.filename)
|
|
|
|
|
| with open(file_path, "wb") as f:
|
| while contents := file.file.read(1024*1024):
|
| f.write(contents)
|
|
|
|
|
| uploaded_files.append(file.filename)
|
|
|
| except:
|
| raise fastapi.HTTPException(status_code=500, detail="An error occurred while uploading the noisy files.")
|
| finally:
|
| file.file.close()
|
|
|
| print(f"uploaded files: {uploaded_files}")
|
|
|
| return {"uploaded_files": uploaded_files, "status": True}
|
|
|
| def enhance_audio(self):
|
| try:
|
|
|
| self._enhance()
|
|
|
| wav_files = glob.glob(os.path.join(self.enhanced_audio_path, '*.wav'))
|
|
|
| enhanced_files = [os.path.basename(file) for file in wav_files]
|
| return {"status": True}
|
|
|
| except Exception as e:
|
| print(f"Exception occured during enhancement: {e}")
|
| raise fastapi.HTTPException(status_code=500, detail="An error occurred while enhancing the noisy files.")
|
|
|
| def download_enhanced(self):
|
| try:
|
|
|
| zip_buffer = io.BytesIO()
|
|
|
| with zipfile.ZipFile(zip_buffer, "w") as zip_file:
|
|
|
| for wav_file in glob.glob(os.path.join(self.enhanced_audio_path, '*.wav')):
|
| zip_file.write(wav_file, arcname=os.path.basename(wav_file))
|
|
|
|
|
| zip_buffer.seek(0)
|
|
|
|
|
| return fastapi.responses.StreamingResponse(
|
| iter([zip_buffer.getvalue()]),
|
| media_type="application/zip",
|
| headers={"Content-Disposition": "attachment; filename=enhanced_audio_files.zip"}
|
| )
|
|
|
| except Exception as e:
|
|
|
| raise fastapi.HTTPException(status_code=500, detail=f"An error occurred while creating the download file: {str(e)}")
|
|
|
| def run(self):
|
|
|
| uvicorn.run(self.app, host=self.host, port=self.port) |