|
import io |
|
import logging |
|
import os |
|
import tempfile |
|
import wave |
|
import uuid |
|
import zipfile |
|
from zipfile import ZipFile |
|
|
|
import numpy as np |
|
from scipy.io import wavfile |
|
|
|
import api.base |
|
from service.tool import audio_normalize, read_wav_file_to_numpy_array |
|
from vextract.vocal_extract import VEX |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
class VocalRemoverHandler(api.base.ApiHandler): |
|
async def post(self): |
|
try: |
|
uploaded_file = self.request.files['srcaudio'][0] |
|
audio_filebody = uploaded_file['body'] |
|
audio_filename = uploaded_file['filename'] |
|
audio_fileext = os.path.splitext(audio_filename)[-1].lower() |
|
|
|
if audio_fileext != ".wav": |
|
logger.debug(f"file format is {audio_fileext}, not wav\n" |
|
f"converting to standard wav data...") |
|
converted_file = await audio_normalize(full_filename=audio_filename, file_data=audio_filebody) |
|
with wave.open(converted_file, 'rb') as wav_file: |
|
num_frames = wav_file.getnframes() |
|
audiofile_body = wav_file.readframes(num_frames) |
|
logger.debug(f"wav conversion completed.") |
|
os.remove(converted_file) |
|
|
|
with tempfile.NamedTemporaryFile(suffix=audio_fileext, delete=False) as temp_wav: |
|
temp_wav.write(audiofile_body) |
|
temp_wav.close() |
|
|
|
converted_file = await audio_normalize(full_filename=audio_filename, file_data=audio_filebody) |
|
|
|
sampling_rate, audio_array = read_wav_file_to_numpy_array(converted_file) |
|
os.remove(converted_file) |
|
|
|
print(f"Input Audio Shape: {audio_array.shape}\n" |
|
f"Input Audio Data Type: {audio_array.dtype}") |
|
|
|
v = VEX() |
|
[(vocal_sampling_rate, vocal_audio), (accompaniment_sampling_rate, accompaniment_audio)] = v.separate((sampling_rate, audio_array)) |
|
|
|
output_dirname = f"{uuid.uuid4()}" |
|
output_dir = f"output/{output_dirname}" |
|
if not os.path.exists(output_dir): |
|
os.mkdir(output_dir) |
|
wavfile.write(f"{output_dir}/vocals.wav", vocal_sampling_rate, vocal_audio) |
|
wavfile.write(f"{output_dir}/accompaniment.wav", accompaniment_sampling_rate, accompaniment_audio) |
|
|
|
zipfilename = f"{output_dir}/output.zip" |
|
with ZipFile(zipfilename, 'w', zipfile.ZIP_DEFLATED) as zip_obj: |
|
zip_obj.write(f"{output_dir}/vocals.wav") |
|
zip_obj.write(f"{output_dir}/accompaniment.wav") |
|
|
|
logger.debug(f"start output data.") |
|
|
|
self.set_header("Content-Type", "application/zip") |
|
self.set_header("Content-Disposition", "attachment; filename=output.zip") |
|
|
|
with open(zipfilename, "rb") as file: |
|
self.write(file.read()) |
|
await self.flush() |
|
logger.debug(f"response completed.") |
|
|
|
os.remove(f"{output_dir}/vocals.wav") |
|
os.remove(f"{output_dir}/accompaniment.wav") |
|
os.remove(f"{output_dir}/output.zip") |
|
os.rmdir(output_dir) |
|
except Exception as e: |
|
logger.exception(e) |
|
self.set_status(500) |
|
self.write({ |
|
"code": 500, |
|
"msg": "system_error", |
|
"data": None |
|
}) |
|
|