csukuangfj's picture
small fixes
298d6c8
raw
history blame
7.75 kB
#!/usr/bin/env python3
#
# Copyright 2023 Xiaomi Corp. (authors: Fangjun Kuang)
#
# See LICENSE for clarification regarding multiple authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# References:
# https://gradio.app/docs/#dropdown
import logging
import tempfile
import time
import urllib.request
from datetime import datetime
import gradio as gr
import torch
from pydub import AudioSegment
from separate import load_audio, load_model, separate
def build_html_output(s: str, style: str = "result_item_success"):
return f"""
<div class='result'>
<div class='result_item {style}'>
{s}
</div>
</div>
"""
def process_url(url: str):
logging.info(f"Processing URL: {url}")
with tempfile.NamedTemporaryFile() as f:
try:
urllib.request.urlretrieve(url, f.name)
return process(in_filename=f.name)
except Exception as e:
logging.info(str(e))
return "", build_html_output(str(e), "result_item_error")
def process_uploaded_file(in_filename: str):
if in_filename is None or in_filename == "":
return "", build_html_output(
"Please first upload a file and then click "
'the button "submit for separation"',
"result_item_error",
)
logging.info(f"Processing uploaded file: {in_filename}")
try:
return process(in_filename=in_filename)
except Exception as e:
logging.info(str(e))
return "", build_html_output(str(e), "result_item_error")
def process_microphone(in_filename: str):
if in_filename is None or in_filename == "":
return "", build_html_output(
"Please first click 'Record from microphone', speak, "
"click 'Stop recording', and then "
"click the button 'submit for separation'",
"result_item_error",
)
logging.info(f"Processing microphone: {in_filename}")
try:
return process(in_filename=in_filename)
except Exception as e:
logging.info(str(e))
return "", build_html_output(str(e), "result_item_error")
@torch.no_grad()
def process(in_filename: str):
logging.info(f"in_filename: {in_filename}")
waveform = load_audio(in_filename)
duration = waveform.shape[0] / 44100 # in seconds
vocals = load_model("vocals.pt")
accompaniment = load_model("accompaniment.pt")
now = datetime.now()
date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f")
logging.info(f"Started at {date_time}")
start = time.time()
vocals_wave, accompaniment_wave = separate(vocals, accompaniment, waveform)
date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f")
end = time.time()
vocals_wave = (vocals_wave.t() * 32768).to(torch.int16)
accompaniment_wave = (accompaniment_wave.t() * 32768).to(torch.int16)
vocals_sound = AudioSegment(
data=vocals_wave.numpy().tobytes(), sample_width=2, frame_rate=44100, channels=2
)
vocals_filename = in_filename + "-vocals.mp3"
vocals_sound.export(vocals_filename, format="mp3", bitrate="128k")
accompaniment_sound = AudioSegment(
data=accompaniment_wave.numpy().tobytes(),
sample_width=2,
frame_rate=44100,
channels=2,
)
accompaniment_filename = in_filename + "-accompaniment.mp3"
accompaniment_sound.export(accompaniment_filename, format="mp3", bitrate="128k")
rtf = (end - start) / duration
logging.info(f"Finished at {date_time} s. Elapsed: {end - start: .3f} s")
info = f"""
Input duration : {duration: .3f} s <br/>
Processing time: {end - start: .3f} s <br/>
RTF: {end - start: .3f}/{duration: .3f} = {rtf:.3f} <br/>
"""
logging.info(info)
return vocals_filename, accompaniment_filename, build_html_output(info)
title = "# Music source separation with Spleeter in PyTorch"
# css style is copied from
# https://huggingface.co/spaces/alphacep/asr/blob/main/app.py#L113
css = """
.result {display:flex;flex-direction:column}
.result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%}
.result_item_success {background-color:mediumaquamarine;color:white;align-self:start}
.result_item_error {background-color:#ff7070;color:white;align-self:start}
"""
demo = gr.Blocks(css=css)
with demo:
gr.Markdown(title)
with gr.Tabs():
with gr.TabItem("Upload from disk"):
uploaded_file = gr.Audio(
source="upload", # Choose between "microphone", "upload"
type="filepath",
optional=False,
label="Upload from disk",
)
upload_button = gr.Button("Submit for separation")
uploaded_html_info = gr.HTML(label="Info")
uploaded_vocals = gr.Audio()
uploaded_accompaniment = gr.Audio()
gr.Examples(
examples=["./yesterday-once-more-Carpenters.wav"],
inputs=[uploaded_file],
outputs=[uploaded_vocals, uploaded_accompaniment, uploaded_html_info],
fn=process_uploaded_file,
)
with gr.TabItem("Record from microphone"):
microphone = gr.Audio(
source="microphone", # Choose between "microphone", "upload"
type="filepath",
optional=False,
label="Record from microphone",
)
record_button = gr.Button("Submit for separation")
recorded_html_info = gr.HTML(label="Info")
recorded_vocals = gr.Audio()
recorded_accompaniment = gr.Audio()
gr.Examples(
examples=["./yesterday-once-more-Carpenters.wav"],
inputs=[microphone],
outputs=[recorded_vocals, recorded_accompaniment, recorded_html_info],
fn=process_microphone,
)
with gr.TabItem("From URL"):
url_textbox = gr.Textbox(
max_lines=1,
placeholder="URL to an audio file",
label="URL",
interactive=True,
)
url_button = gr.Button("Submit for separation")
url_html_info = gr.HTML(label="Info")
url_vocals = gr.Audio()
url_accompaniment = gr.Audio()
upload_button.click(
process_uploaded_file,
inputs=[uploaded_file],
outputs=[uploaded_vocals, uploaded_accompaniment, uploaded_html_info],
)
record_button.click(
process_microphone,
inputs=[microphone],
outputs=[recorded_vocals, recorded_accompaniment, recorded_html_info],
)
url_button.click(
process_url,
inputs=[url_textbox],
outputs=[url_vocals, url_accompaniment, url_html_info],
)
torch.set_num_threads(1)
torch.set_num_interop_threads(1)
torch._C._jit_set_profiling_executor(False)
torch._C._jit_set_profiling_mode(False)
torch._C._set_graph_executor_optimize(False)
if __name__ == "__main__":
formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s"
logging.basicConfig(format=formatter, level=logging.INFO)
demo.launch()