import os from pathlib import Path from typing import List from loguru import logger as log import requests import streamlit as st from streamlit_option_menu import option_menu from footer import footer from header import header from helpers import ( load_audio_segment, load_list_of_songs, plot_audio, st_local_audio, url_is_valid, file_size_is_valid, delete_old_files, ) from service.demucs_runner import separator from service.vocal_remover.runner import load_model, separate from style import CSS_TABS label_sources = { "no_vocals.mp3": "🎶 Instrumental", "vocals.mp3": "🎤 Vocals", "drums.mp3": "🥁 Drums", "bass.mp3": "🎸 Bass", "guitar.mp3": "🎸 Guitar", "piano.mp3": "🎹 Piano", "other.mp3": "🎶 Other", } separation_mode_to_model = { "Vocals & Instrumental (Faster)": ("vocal_remover", ["vocals.mp3", "no_vocals.mp3"]), "Vocals & Instrumental (High Quality, Slower)": ("htdemucs", ["vocals.mp3", "no_vocals.mp3"]), "Vocals, Drums, Bass & Other (Slower)": ( "htdemucs", ["vocals.mp3", "drums.mp3", "bass.mp3", "other.mp3"], ), "Vocal, Drums, Bass, Guitar, Piano & Other (Slowest)": ( "htdemucs_6s", ["vocals.mp3", "drums.mp3", "bass.mp3", "guitar.mp3", "piano.mp3", "other.mp3"], ), } extensions = ["mp3", "wav", "ogg", "flac"] out_path = Path("/tmp") in_path = Path("/tmp") @st.cache_data(show_spinner=False) def get_sources(path, file_sources): sources = {} for file in file_sources: fullpath = path / file if fullpath.exists(): sources[file] = fullpath return sources def reset_execution(): st.session_state.executed = False def show_results(model_name: str, dir_name_output: str, file_sources: List): sources = get_sources(out_path / Path(model_name) / dir_name_output, file_sources) tab_sources = st.tabs([f"**{label_sources.get(k)}**" for k in sources.keys()]) for i, (file, pathname) in enumerate(sources.items()): with tab_sources[i]: cols = st.columns(2) with cols[0]: auseg = load_audio_segment(pathname, "mp3") st.image( plot_audio( auseg, 32767, file=file, model_name=model_name, dir_name_output=dir_name_output, ), use_column_width="always", ) with cols[1]: st_local_audio(pathname, key=f"output_{file}_{dir_name_output}") log.info(f"Displaying results for {dir_name_output} - {model_name}") def body(): filename = None name_song = None st.markdown( "

Extract Vocals & Instrumental from any song

", unsafe_allow_html=True, ) st.markdown(CSS_TABS, unsafe_allow_html=True) cols = st.columns([1, 4, 1, 3, 1]) with cols[1]: with st.columns([1, 8, 1])[1]: option = option_menu( menu_title=None, options=["Examples", "Upload File", "From URL"], icons=["cloud-upload-fill", "link-45deg", "music-note-list"], orientation="horizontal", styles={ "container": { "width": "100%", "height": "3.5rem", "margin": "0px", "padding": "0px", }, "icon": {"font-size": "1rem"}, "nav-link": { "display": "flex", "height": "3rem", "justify-content": "center", "align-items": "center", "text-align": "center", "flex-direction": "column", "font-size": "1rem", "padding-left": "0px", "padding-right": "0px", }, }, key="option_separate", ) if option == "Examples": samples_song = load_list_of_songs(path="separate_songs.json") if samples_song is not None: name_song = st.selectbox( label="Select a sample song and listen to sources separated", options=list(samples_song.keys()) + [""], format_func=lambda x: x.replace("_", " "), index=len(samples_song), key="select_example", ) full_path = f"{in_path}/{name_song}" if name_song != "" and (full_path).exists(): st.audio(full_path) else: name_song = None elif option == "Upload File": uploaded_file = st.file_uploader( "Choose a file", type=extensions, key="file", help="Supported formats: mp3, wav, ogg, flac.", ) if uploaded_file is not None: with st.spinner("Loading audio..."): with open(in_path / uploaded_file.name, "wb") as f: f.write(uploaded_file.getbuffer()) filename = uploaded_file.name st.audio(uploaded_file) elif option == "From URL": url = st.text_input( "Paste the URL of the audio file", key="url_input", help="Supported formats: mp3, wav, ogg, flac.", ) if url != "" and url_is_valid(url): with st.spinner("Downloading audio..."): filename = url.split("/")[-1] response = requests.get(url, stream=True) if response.status_code == 200 and file_size_is_valid( response.headers.get("Content-Length") ): file_size = 0 with open(in_path / filename, "wb") as audio_file: for chunk in response.iter_content(chunk_size=1024): if chunk: audio_file.write(chunk) file_size += len(chunk) if not file_size_is_valid(file_size): audio_file.close() os.remove(in_path / filename) filename = None return st.audio(f"{in_path}/{filename}") else: st.error( "Failed to download audio file. Try to download it manually and upload it." ) filename = None with cols[3]: separation_mode = st.selectbox( "Choose the separation mode", [ "Vocals & Instrumental (Faster)", "Vocals & Instrumental (High Quality, Slower)", "Vocals, Drums, Bass & Other (Slower)", "Vocal, Drums, Bass, Guitar, Piano & Other (Slowest)", ], on_change=reset_execution(), key="separation_mode", ) if separation_mode == "Vocals & Instrumental (Faster)": max_duration = 30 else: max_duration = 15 model_name, file_sources = separation_mode_to_model[separation_mode] if filename is not None: song = load_audio_segment(in_path / filename, filename.split(".")[-1]) n_secs = round(len(song) / 1000) if os.environ.get("ENV_LIMITATION", False): with cols[3]: start_time = st.number_input( "Choose the start time", min_value=0, max_value=n_secs, step=1, value=0, help=f"Maximum duration is {max_duration} seconds for this separation mode.\nDuplicate this space to [remove any limit](https://github.com/fabiogra/moseca#are-there-any-limitations).", format="%d", ) st.session_state.start_time = start_time end_time = min(start_time + max_duration, n_secs) song = song[start_time * 1000 : end_time * 1000] st.info( f"Audio source will be processed from {start_time} to {end_time} seconds.\nDuplicate this space to [remove any limit](https://github.com/fabiogra/moseca#are-there-any-limitations).", icon="⏱", ) else: start_time = 0 end_time = n_secs with st.columns([2, 1, 2])[1]: execute = st.button( "Separate Music Sources 🎶", type="primary", use_container_width=True ) if execute or st.session_state.executed: if execute: st.session_state.executed = False if not st.session_state.executed: log.info(f"{option} - Separating {filename} with {separation_mode}...") song.export(in_path / filename, format=filename.split(".")[-1]) with st.columns([1, 1, 1])[1]: with st.spinner("Separating source audio, it will take a while..."): if model_name == "vocal_remover": model, device = load_model(pretrained_model="baseline.pth") separate( input=in_path / filename, model=model, device=device, output_dir=out_path, ) else: stem = None if separation_mode == "Vocals & Instrumental (High Quality, Slower)": stem = "vocals" separator( tracks=[in_path / filename], out=out_path, model=model_name, shifts=1, overlap=0.5, stem=stem, int24=False, float32=False, clip_mode="rescale", mp3=True, mp3_bitrate=320, verbose=True, start_time=start_time, end_time=end_time, ) dir_name_output = ".".join(filename.split(".")[:-1]) filename = None st.session_state.executed = True show_results(model_name, dir_name_output, file_sources) elif name_song is not None and option == "Examples": show_results(model_name, name_song, file_sources) if __name__ == "__main__": header() body() footer() delete_old_files("/tmp", 60 * 30)