|
import os |
|
import shutil |
|
import json |
|
import tempfile |
|
import gradio as gr |
|
from pydub import AudioSegment, silence |
|
from pydub.effects import low_pass_filter, high_pass_filter |
|
from tqdm import tqdm |
|
from groq import Groq |
|
import logging |
|
from langsmith import traceable |
|
from langchain.callbacks import LangChainTracer |
|
|
|
from dotenv import load_dotenv |
|
import random |
|
from temp_choose import * |
|
|
|
load_dotenv() |
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.INFO) |
|
logger.addHandler(logging.StreamHandler()) |
|
logger.addHandler(logging.FileHandler("log.txt")) |
|
|
|
|
|
@traceable(run_type="llm", name="groq_call") |
|
def make_groq_call(stems, song_name, p, section_type=None, bpm=120, bars=16): |
|
""" |
|
Make a call to the Groq API to get music production instructions. |
|
|
|
Args: |
|
stems (list): List of available stem files |
|
song_name (str): Name of the song |
|
p (float): Variation parameter (0-1) |
|
section_type (str, optional): Specific section to generate variants for |
|
bpm (int): Beats per minute |
|
bars (int): Number of bars |
|
|
|
Returns: |
|
dict: JSON response with production instructions |
|
""" |
|
client = Groq(api_key=os.getenv("GROQ_API_KEY")) |
|
|
|
|
|
if section_type: |
|
system_content = """You are a very experienced music producer and analyst, with a deep understanding of music theory and production techniques and arrangement, with specific expertise in creating popular EDM/ dance music. |
|
You are given a set of audio stems and asked to create multiple variants of a specific section of a track. |
|
Your task is to provide detailed instructions on how to arrange and process the stems for each variant. |
|
You will be given audio stems and asked to create multiple variants of a specific section of a track. |
|
For each variant, return detailed instructions on how to arrange and process the stems. |
|
Be creative and make each variant sound distinct while maintaining a coherent musical style.""" |
|
|
|
user_content = f"""I need 4 different variants for the {section_type} section of a track named "{song_name}". |
|
|
|
Available stems: {stems} |
|
|
|
BPM: {bpm} |
|
Length of the section: {bars} bars |
|
|
|
For each variant, please provide specific instructions on: |
|
1. Which stems to include |
|
2. What audio operations to apply (filters, fades, etc.) |
|
3. How the stems should be arranged |
|
|
|
Make the variants diverse but coherent, with variation level p={p} (0=minimal variation, 1=maximum variation). |
|
|
|
Return your response as a JSON object with this structure: |
|
{{ |
|
"variant1": {{ |
|
"stems": ["stem1.wav", "stem2.wav"], |
|
"operations": [ |
|
{{"stem": "stem1.wav", "operation": "low_pass_filter", "value": 500}}, |
|
{{"stem": "stem2.wav", "operation": "fade_in", "value": 1000}} |
|
], |
|
"overlay": true, |
|
"description": "A brief description of this variant" |
|
}}, |
|
"variant2": {{ ... }}, |
|
"variant3": {{ ... }}, |
|
"variant4": {{ ... }} |
|
}} |
|
""" |
|
else: |
|
system_content = """You are a very experienced music producer and analyst. For a given audio folder, that has the instruments, are combined to make a loop, and make some variations of it as well. After analyzing the code, you are supposed to return the code containing the different functions for producing the full track.""" |
|
|
|
user_content = f"""Now, you have a new song {song_name}, which has the following contents: |
|
{stems} |
|
|
|
BPM: {bpm} |
|
Bars: {bars} |
|
|
|
Return the code as per discussed in example. Make proper arrangements, for best groovy music. Create 3 variations and return code in JSON. And make sure to use as many instruments possible in each variation. |
|
|
|
NOTE: And at least once, the MAIN loop should have ALL stems. |
|
|
|
Like the variations should be like the main full loop, with some adjustments according to value of p={p} (p will remain in between 0-1; 0 means no variation in loop and 1 means high variation in loop), and then another variation can be a two times repeat of the full loop, with some effects in second time. Make intro better, with some more instruments. |
|
|
|
The main loop should have ALL wav files, don't exclude any please. Return proper JSON, with all the keys and values. |
|
""" |
|
|
|
completion = client.chat.completions.create( |
|
model="gemma2-9b-it", |
|
messages=[ |
|
{"role": "system", "content": system_content}, |
|
{"role": "user", "content": user_content}, |
|
], |
|
temperature=1, |
|
top_p=1, |
|
stream=False, |
|
response_format={"type": "json_object"}, |
|
stop=None, |
|
) |
|
|
|
print(completion.choices[0].message.content) |
|
|
|
return json.loads(completion.choices[0].message.content) |
|
|
|
|
|
def rename_files_remove_spaces(folder): |
|
"""Rename all files in the folder by removing spaces from filenames""" |
|
files_renamed = 0 |
|
for file in os.listdir(folder): |
|
if " " in file: |
|
old_path = os.path.join(folder, file) |
|
new_file = file.replace(" ", "") |
|
new_path = os.path.join(folder, new_file) |
|
|
|
|
|
if not os.path.exists(new_path): |
|
os.rename(old_path, new_path) |
|
print(f"Renamed: {file} β {new_file}") |
|
files_renamed += 1 |
|
|
|
print(f"Total files renamed: {files_renamed}") |
|
|
|
|
|
def load_audio_files(folder): |
|
"""Load all WAV files from a folder into memory""" |
|
files = sorted([f for f in os.listdir(folder) if f.endswith(".wav")]) |
|
stems = {} |
|
for file in tqdm(files, desc="Loading audio files"): |
|
path = os.path.join(folder, file) |
|
audio = AudioSegment.from_wav(path) |
|
stems[file] = audio |
|
return stems |
|
|
|
|
|
def get_stems(folder): |
|
"""Get a list of all WAV files in a folder""" |
|
files = sorted([f for f in os.listdir(folder) if f.endswith(".wav")]) |
|
return files |
|
|
|
|
|
def apply_audio_operation(audio, operation, value): |
|
"""Apply various audio operations to an AudioSegment""" |
|
if operation == "low_pass_filter" and isinstance(value, float): |
|
return low_pass_filter(audio, value) |
|
elif operation == "high_pass_filter" and isinstance(value, float): |
|
return high_pass_filter(audio, value) |
|
elif operation == "fade_in" and isinstance(value, int): |
|
return audio.fade_in(value) |
|
elif operation == "fade_out" and isinstance(value, int): |
|
return audio.fade_out(value) |
|
elif operation == "reverb" and isinstance(value, float): |
|
|
|
result = audio |
|
for delay in [50, 100, 150, 200]: |
|
attenuated = audio - (value * 10) |
|
delayed = AudioSegment.silent(duration=delay) + attenuated |
|
result = result.overlay(delayed) |
|
return result |
|
elif operation == "delay" and isinstance(value, int): |
|
|
|
result = audio |
|
delayed = AudioSegment.silent(duration=value) + (audio - 6) |
|
return result.overlay(delayed) |
|
elif operation == "distortion" and isinstance(value, float): |
|
|
|
gain = 1.0 + (value * 5) |
|
return audio + (gain) |
|
elif operation == "pitch_shift" and isinstance(value, float): |
|
|
|
print(f"Warning: Pitch shift not implemented, value: {value}") |
|
return audio |
|
elif operation == "volume" and isinstance(value, float): |
|
|
|
return audio + value |
|
return audio |
|
|
|
|
|
def create_section_from_json(section_config, stems): |
|
"""Create an audio section based on JSON configuration""" |
|
if not section_config: |
|
print("No configuration found for section") |
|
return AudioSegment.empty() |
|
|
|
section_stems = [] |
|
print(section_config) |
|
for stem_name in section_config["stems"]: |
|
|
|
if stem_name in stems: |
|
section_stems.append(stems[stem_name]) |
|
else: |
|
|
|
no_spaces_name = stem_name.replace(" ", "") |
|
if no_spaces_name in stems: |
|
section_stems.append(stems[no_spaces_name]) |
|
else: |
|
print(f"Warning: Stem {stem_name} not found (with or without spaces)") |
|
|
|
|
|
processed_stems = {name: audio for name, audio in stems.items()} |
|
for op in section_config.get("operations", []): |
|
stem_name = op["stem"] |
|
operation = op["operation"] |
|
value = op["value"] |
|
|
|
|
|
stem_key = None |
|
if stem_name in processed_stems: |
|
stem_key = stem_name |
|
else: |
|
no_spaces_name = stem_name.replace(" ", "") |
|
if no_spaces_name in processed_stems: |
|
stem_key = no_spaces_name |
|
|
|
if stem_key and operation != "overlay": |
|
processed_stems[stem_key] = apply_audio_operation( |
|
processed_stems[stem_key], operation, value |
|
) |
|
|
|
|
|
final_stems = [] |
|
for stem_name in section_config["stems"]: |
|
if stem_name in processed_stems: |
|
final_stems.append(processed_stems[stem_name]) |
|
else: |
|
no_spaces_name = stem_name.replace(" ", "") |
|
if no_spaces_name in processed_stems: |
|
final_stems.append(processed_stems[no_spaces_name]) |
|
|
|
|
|
|
|
result = final_stems[0] |
|
for stem in final_stems[1:]: |
|
result = result.overlay(stem) |
|
|
|
|
|
silence_thresh = result.dBFS - 16 |
|
silent_chunks = silence.detect_silence( |
|
result, min_silence_len=1500, silence_thresh=silence_thresh |
|
) |
|
|
|
segments = [] |
|
prev_end = 0 |
|
for start, end in silent_chunks: |
|
if prev_end < start: |
|
segments.append(result[prev_end:start]) |
|
prev_end = end |
|
segments.append(result[prev_end:]) |
|
|
|
result = sum(segments) |
|
|
|
return result |
|
|
|
return AudioSegment.empty() |
|
|
|
|
|
def generate_section_variants( |
|
stems_folder, audio_stems, section_type, bpm, bars,progress, p=0.5 |
|
): |
|
""" |
|
Generate multiple variants for a specific section |
|
|
|
Args: |
|
stems_folder (str): Path to folder containing stem files |
|
section_type (str): Type of section (intro, verse, chorus, etc.) |
|
bpm (int): Beats per minute |
|
bars (int): Number of bars |
|
p (float): Variation parameter (0-1) |
|
|
|
Returns: |
|
dict: Dictionary of variant audio segments and their descriptions |
|
""" |
|
stems = get_stems(stems_folder) |
|
try: |
|
llm_response = make_groq_call( |
|
stems, |
|
f"{section_type} section", |
|
p, |
|
section_type=section_type, |
|
bpm=bpm, |
|
bars=bars, |
|
) |
|
except Exception as e: |
|
logger.error(f"Error generating variants for {section_type} section: {e}") |
|
return {} |
|
|
|
|
|
if not audio_stems: |
|
print("No stems loaded.") |
|
return {} |
|
|
|
progress(0.2, desc=f"Generating structure and effects for variants of {section_type}...") |
|
|
|
|
|
if not isinstance(llm_response, dict): |
|
logger.error(f"Invalid LLM response: {llm_response}") |
|
|
|
|
|
|
|
try: |
|
if isinstance(llm_response, str): |
|
llm_response = json.loads(llm_response) |
|
elif isinstance(llm_response, dict): |
|
llm_response = llm_response |
|
except json.JSONDecodeError: |
|
logger.error(f"Tried Converting the LLM response to JSON, but failed: {llm_response}") |
|
|
|
variants = {} |
|
for variant_key in progress.tqdm(llm_response, desc="Getting variants as per AI arranegments..."): |
|
if variant_key.startswith("variant"): |
|
variant_config = llm_response[variant_key] |
|
logger.info(f" gsv: Variant config: {variant_config}") |
|
audio = create_section_from_json(variant_config, audio_stems) |
|
description = variant_config.get( |
|
"description", f"Variant {variant_key[-1]}" |
|
) |
|
list_stems = variant_config.get("stems", []) |
|
variants[variant_key] = { |
|
"audio": audio, |
|
"description": description, |
|
"config": variant_config, |
|
"stems": list_stems |
|
} |
|
|
|
|
|
|
|
return variants |
|
|
|
|
|
def create_full_track( |
|
sections_folder, audio_stems, selected_variants, crossfade_ms=500 |
|
): |
|
""" |
|
Create a full track from selected variants |
|
|
|
Args: |
|
sections_folder (dict): Dict mapping section names to their folder paths |
|
selected_variants (dict): Dict mapping section names to their selected variant configs |
|
crossfade_ms (int): Crossfade duration in milliseconds |
|
|
|
Returns: |
|
AudioSegment: The final track |
|
""" |
|
final_track = None |
|
|
|
|
|
section_order = [ |
|
"intro", |
|
"buildup", |
|
"full_loop", |
|
"breakdown", |
|
"bridge", |
|
"buildup2", |
|
"drop2", |
|
"breakdown2", |
|
"outro", |
|
] |
|
|
|
|
|
for section_name in section_order: |
|
if section_name not in selected_variants: |
|
continue |
|
|
|
|
|
variant_config = selected_variants[section_name] |
|
|
|
|
|
section_audio = create_section_from_json(variant_config, audio_stems) |
|
|
|
|
|
if final_track is None: |
|
final_track = section_audio |
|
else: |
|
final_track = final_track.append(section_audio, crossfade=crossfade_ms) |
|
|
|
return final_track |
|
|
|
|
|
def create_intro(llm_answer, stems): |
|
"""Create intro section from LLM answer""" |
|
return create_section_from_json(llm_answer.get("create_intro", {}), stems) |
|
|
|
|
|
def create_variation1(llm_answer, stems): |
|
"""Create variation1 section from LLM answer""" |
|
return create_section_from_json(llm_answer.get("create_variation1", {}), stems) |
|
|
|
|
|
def create_full_loop(llm_answer, stems): |
|
"""Create full loop section from LLM answer""" |
|
return create_section_from_json(llm_answer.get("create_full_loop", {}), stems) |
|
|
|
|
|
def create_variation2(llm_answer, stems): |
|
"""Create variation2 section from LLM answer""" |
|
return create_section_from_json(llm_answer.get("create_variation2", {}), stems) |
|
|
|
|
|
def create_variation3(llm_answer, stems): |
|
"""Create variation3 section from LLM answer""" |
|
return create_section_from_json(llm_answer.get("create_variation3", {}), stems) |
|
|
|
|
|
def create_outro(llm_answer, stems): |
|
"""Create outro section from LLM answer""" |
|
return create_section_from_json(llm_answer.get("create_outro", {}), stems) |
|
|
|
|
|
def calculate_duration(bpm, bars): |
|
"""Calculate duration in seconds for a given BPM and number of bars""" |
|
|
|
beats_per_bar = 4 |
|
duration_seconds = (bars * beats_per_bar * 60) / bpm |
|
return duration_seconds |
|
|
|
|
|
def get_formatted_duration(seconds): |
|
"""Format duration in seconds to MM:SS format""" |
|
minutes = int(seconds // 60) |
|
seconds = int(seconds % 60) |
|
return f"{minutes}:{seconds:02d}" |
|
|
|
|
|
def export_section_variants(variants, output_folder, section_name): |
|
"""Export section variants to audio files""" |
|
if not os.path.exists(output_folder): |
|
os.makedirs(output_folder) |
|
|
|
file_paths = {} |
|
for variant_key, variant_data in variants.items(): |
|
output_path = os.path.join(output_folder, f"{section_name}_{variant_key}.wav") |
|
variant_data["audio"].export(output_path, format="wav") |
|
file_paths[variant_key] = output_path |
|
|
|
return file_paths |
|
|
|
|
|
def edm_arrangement_tab(): |
|
|
|
with gr.Blocks() as iface: |
|
gr.Markdown("# π Interactive EDM Arrangement Tool") |
|
|
|
with gr.Row(): |
|
variation = gr.Radio( |
|
choices=list(arrangements.keys()), |
|
value="High Energy Flow", |
|
label="Choose Arrangement Variation", |
|
) |
|
|
|
out_plot = gr.Plot(label="Arrangement Diagram", value=load_variation("High Energy Flow")) |
|
variation.change(fn=load_variation, inputs=variation, outputs=out_plot) |
|
iface.load(fn=load_variation, inputs=[variation], outputs=[out_plot]) |
|
|
|
with gr.Accordion("π» Edit Section Parameters", open=False): |
|
for i, (bar, tempo, name, length, curve) in enumerate(arrangement): |
|
with gr.Row(): |
|
gr.Markdown(f"**{name}**") |
|
bar_slider = gr.Slider( |
|
minimum=0, maximum=300, value=bar, label="Start Bar" |
|
) |
|
tempo_slider = gr.Slider( |
|
minimum=20, maximum=100, value=tempo, label="Volume" |
|
) |
|
length_slider = gr.Slider( |
|
minimum=1, maximum=64, value=length, label="Length" |
|
) |
|
curve_selector = gr.Radio( |
|
choices=["Flat", "Linear", "-ve Linear"], |
|
value=curve, |
|
label="Curve Type", |
|
) |
|
update_btn = gr.Button("Update") |
|
update_btn.click( |
|
fn=update_section, |
|
inputs=[ |
|
gr.Number(value=i, visible=False), |
|
bar_slider, |
|
tempo_slider, |
|
length_slider, |
|
curve_selector, |
|
], |
|
outputs=[out_plot], |
|
) |
|
|
|
with gr.Accordion("β Insert New Section", open=False): |
|
new_index = gr.Number(value=0, label="Insert At Index") |
|
new_name = gr.Textbox(label="Section Name", value="New Section") |
|
new_bar = gr.Slider(minimum=0, maximum=300, value=0, label="Start Bar") |
|
new_tempo = gr.Slider(minimum=20, maximum=100, value=50, label="Volume") |
|
new_length = gr.Slider(minimum=1, maximum=64, value=8, label="Length") |
|
new_curve = gr.Radio( |
|
choices=["Flat", "Linear", "-ve Linear"], value="Flat", label="Curve Type" |
|
) |
|
insert_btn = gr.Button("Insert Section") |
|
insert_btn.click( |
|
fn=insert_section, |
|
inputs=[new_index, new_name, new_bar, new_tempo, new_length, new_curve], |
|
outputs=[out_plot], |
|
) |
|
|
|
gr.Markdown("## β
Finalise Your Arrangement") |
|
final_btn = gr.Button("Finalise and Export JSON") |
|
final_output = gr.Textbox(label="Final Arrangement JSON", lines=15) |
|
final_btn.click(fn=finalise, outputs=final_output) |
|
|
|
|
|
|
|
|