Spaces:
Running
Running
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
"""Module for audio feature extraction and processing.""" | |
import os | |
import subprocess | |
import time | |
from functools import reduce | |
from pathlib import Path | |
from typing import List, Tuple, Optional, Dict, Any, Union | |
import librosa | |
import numpy as np | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from sklearn.preprocessing import StandardScaler | |
from chorus_detection.config import SR, HOP_LENGTH, AUDIO_TEMP_PATH | |
from chorus_detection.utils.logging import logger | |
def extract_audio(url: str, output_path: str = str(AUDIO_TEMP_PATH)) -> Tuple[Optional[str], Optional[str]]: | |
"""Download audio from YouTube URL and save as MP3 using yt-dlp. | |
Args: | |
url: YouTube URL of the audio file | |
output_path: Path to save the downloaded audio file | |
Returns: | |
Tuple containing path to the downloaded audio file and the video title, or None if download fails | |
""" | |
try: | |
# Create output directory if it doesn't exist | |
os.makedirs(output_path, exist_ok=True) | |
# Create a unique filename using timestamp | |
timestamp = int(time.time()) | |
output_file = os.path.join(output_path, f"audio_{timestamp}.mp3") | |
# Get the video title first | |
video_title = get_video_title(url) or f"Video_{timestamp}" | |
# Download the audio | |
success, error_msg = download_audio(url, output_file) | |
if not success: | |
handle_download_error(error_msg) | |
return None, None | |
# Check if file exists and is valid | |
if os.path.exists(output_file) and os.path.getsize(output_file) > 0: | |
logger.info(f"Successfully downloaded: {video_title}") | |
return output_file, video_title | |
else: | |
logger.error("Download completed but file not found or empty") | |
return None, None | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"An error occurred during YouTube download: {e}") | |
logger.debug(f"Error details: {error_details}") | |
check_yt_dlp_installation() | |
return None, None | |
def get_video_title(url: str) -> Optional[str]: | |
"""Get the title of a YouTube video. | |
Args: | |
url: YouTube URL | |
Returns: | |
Video title if successful, None otherwise | |
""" | |
try: | |
title_command = ['yt-dlp', '--get-title', '--no-warnings', url] | |
video_title = subprocess.check_output(title_command, universal_newlines=True).strip() | |
return video_title | |
except subprocess.CalledProcessError as e: | |
logger.warning(f"Could not retrieve video title: {str(e)}") | |
return None | |
def download_audio(url: str, output_file: str) -> Tuple[bool, str]: | |
"""Download audio from YouTube URL using yt-dlp. | |
Args: | |
url: YouTube URL | |
output_file: Output file path | |
Returns: | |
Tuple containing (success, error_message) | |
""" | |
command = [ | |
'yt-dlp', | |
'-f', 'bestaudio', | |
'--extract-audio', | |
'--audio-format', 'mp3', | |
'--audio-quality', '0', # Best quality | |
'--output', output_file, | |
'--no-playlist', | |
'--verbose', | |
url | |
] | |
process = subprocess.Popen( | |
command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
universal_newlines=True | |
) | |
stdout, stderr = process.communicate() | |
if process.returncode != 0: | |
error_msg = f"Error downloading from YouTube (code {process.returncode}): {stderr}" | |
return False, error_msg | |
return True, "" | |
def handle_download_error(error_msg: str) -> None: | |
"""Handle common YouTube download errors with helpful messages. | |
Args: | |
error_msg: Error message from yt-dlp | |
""" | |
logger.error(error_msg) | |
if "Sign in to confirm you're not a bot" in error_msg: | |
logger.error("YouTube is detecting automated access. Try using a local file instead.") | |
elif any(x in error_msg.lower() for x in ["unavailable video", "private video"]): | |
logger.error("The video appears to be private or unavailable. Please try another URL.") | |
elif "copyright" in error_msg.lower(): | |
logger.error("The video may be blocked due to copyright restrictions.") | |
elif any(x in error_msg.lower() for x in ["rate limit", "429"]): | |
logger.error("YouTube rate limit reached. Please try again later.") | |
def check_yt_dlp_installation() -> None: | |
"""Check if yt-dlp is installed and provide guidance if it's not.""" | |
try: | |
subprocess.run(['yt-dlp', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
except FileNotFoundError: | |
logger.error("yt-dlp is not installed or not in PATH. Please install it with: pip install yt-dlp") | |
def strip_silence(audio_path: str) -> None: | |
"""Remove silent parts from an audio file. | |
Args: | |
audio_path: Path to the audio file | |
""" | |
try: | |
sound = AudioSegment.from_file(audio_path) | |
nonsilent_ranges = detect_nonsilent( | |
sound, min_silence_len=500, silence_thresh=-50) | |
if not nonsilent_ranges: | |
logger.warning("No non-silent parts detected in the audio. Using original file.") | |
return | |
stripped = reduce(lambda acc, val: acc + sound[val[0]:val[1]], | |
nonsilent_ranges, AudioSegment.empty()) | |
stripped.export(audio_path, format='mp3') | |
except Exception as e: | |
logger.error(f"Error stripping silence: {e}") | |
logger.info("Proceeding with original audio file") | |
class AudioFeature: | |
"""Class for extracting and processing audio features.""" | |
def __init__(self, audio_path: str, sr: int = SR, hop_length: int = HOP_LENGTH): | |
"""Initialize the AudioFeature class. | |
Args: | |
audio_path: Path to the audio file | |
sr: Sample rate for audio processing | |
hop_length: Hop length for feature extraction | |
""" | |
self.audio_path: str = audio_path | |
self.sr: int = sr | |
self.hop_length: int = hop_length | |
self.time_signature: int = 4 | |
# Initialize all features as None | |
self.y: Optional[np.ndarray] = None | |
self.y_harm: Optional[np.ndarray] = None | |
self.y_perc: Optional[np.ndarray] = None | |
self.beats: Optional[np.ndarray] = None | |
self.chroma_acts: Optional[np.ndarray] = None | |
self.chromagram: Optional[np.ndarray] = None | |
self.combined_features: Optional[np.ndarray] = None | |
self.key: Optional[str] = None | |
self.mode: Optional[str] = None | |
self.mel_acts: Optional[np.ndarray] = None | |
self.melspectrogram: Optional[np.ndarray] = None | |
self.meter_grid: Optional[np.ndarray] = None | |
self.mfccs: Optional[np.ndarray] = None | |
self.mfcc_acts: Optional[np.ndarray] = None | |
self.n_frames: Optional[int] = None | |
self.onset_env: Optional[np.ndarray] = None | |
self.rms: Optional[np.ndarray] = None | |
self.spectrogram: Optional[np.ndarray] = None | |
self.tempo: Optional[float] = None | |
self.tempogram: Optional[np.ndarray] = None | |
self.tempogram_acts: Optional[np.ndarray] = None | |
def detect_key(self, chroma_vals: np.ndarray) -> Tuple[str, str]: | |
"""Detect the key and mode (major or minor) of the audio segment. | |
Args: | |
chroma_vals: Chromagram values to analyze for key detection | |
Returns: | |
Tuple containing the detected key and mode | |
""" | |
note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] | |
# Key profiles (Krumhansl-Kessler profiles) | |
major_profile = np.array([6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88]) | |
minor_profile = np.array([6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17]) | |
# Normalize profiles | |
major_profile /= np.linalg.norm(major_profile) | |
minor_profile /= np.linalg.norm(minor_profile) | |
# Calculate correlations for all possible rotations | |
major_correlations = [np.corrcoef(chroma_vals, np.roll(major_profile, i))[0, 1] for i in range(12)] | |
minor_correlations = [np.corrcoef(chroma_vals, np.roll(minor_profile, i))[0, 1] for i in range(12)] | |
# Find max correlation | |
max_major_idx = np.argmax(major_correlations) | |
max_minor_idx = np.argmax(minor_correlations) | |
# Determine mode | |
self.mode = 'major' if major_correlations[max_major_idx] > minor_correlations[max_minor_idx] else 'minor' | |
self.key = note_names[max_major_idx if self.mode == 'major' else max_minor_idx] | |
return self.key, self.mode | |
def calculate_ki_chroma(self, waveform: np.ndarray, sr: int, hop_length: int) -> np.ndarray: | |
"""Calculate a normalized, key-invariant chromagram for the given audio waveform. | |
Args: | |
waveform: Audio waveform to analyze | |
sr: Sample rate of the waveform | |
hop_length: Hop length for feature extraction | |
Returns: | |
The key-invariant chromagram as a numpy array | |
""" | |
# Calculate chromagram | |
chromagram = librosa.feature.chroma_cqt( | |
y=waveform, sr=sr, hop_length=hop_length, bins_per_octave=24) | |
# Normalize to [0, 1] | |
chromagram = (chromagram - chromagram.min()) / (chromagram.max() - chromagram.min() + 1e-8) | |
# Detect key | |
chroma_vals = np.sum(chromagram, axis=1) | |
key, mode = self.detect_key(chroma_vals) | |
# Make key-invariant | |
key_idx = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'].index(key) | |
shift_amount = -key_idx if mode == 'major' else -(key_idx + 3) % 12 | |
return librosa.util.normalize(np.roll(chromagram, shift_amount, axis=0), axis=1) | |
def extract_features(self) -> None: | |
"""Extract various audio features from the loaded audio.""" | |
# Load audio | |
self.y, self.sr = librosa.load(self.audio_path, sr=self.sr) | |
# Harmonic-percussive source separation | |
self.y_harm, self.y_perc = librosa.effects.hpss(self.y) | |
# Extract spectrogram | |
self.spectrogram, _ = librosa.magphase(librosa.stft(self.y, hop_length=self.hop_length)) | |
# RMS energy | |
self.rms = librosa.feature.rms(S=self.spectrogram, hop_length=self.hop_length).astype(np.float32) | |
# Mel spectrogram and activations | |
self.melspectrogram = librosa.feature.melspectrogram( | |
y=self.y, sr=self.sr, n_mels=128, hop_length=self.hop_length).astype(np.float32) | |
self.mel_acts = librosa.decompose.decompose(self.melspectrogram, n_components=3, sort=True)[1].astype(np.float32) | |
# Chromagram and activations | |
self.chromagram = self.calculate_ki_chroma(self.y_harm, self.sr, self.hop_length).astype(np.float32) | |
self.chroma_acts = librosa.decompose.decompose(self.chromagram, n_components=4, sort=True)[1].astype(np.float32) | |
# Onset detection and tempogram | |
self.onset_env = librosa.onset.onset_strength(y=self.y_perc, sr=self.sr, hop_length=self.hop_length) | |
self.tempogram = np.clip(librosa.feature.tempogram( | |
onset_envelope=self.onset_env, sr=self.sr, hop_length=self.hop_length), 0, None) | |
self.tempogram_acts = librosa.decompose.decompose(self.tempogram, n_components=3, sort=True)[1] | |
# MFCCs and activations | |
self.mfccs = librosa.feature.mfcc(y=self.y, sr=self.sr, n_mfcc=20, hop_length=self.hop_length) | |
self.mfccs += abs(np.min(self.mfccs) or 0) # Handle negative values | |
self.mfcc_acts = librosa.decompose.decompose(self.mfccs, n_components=4, sort=True)[1].astype(np.float32) | |
# Combine features with weighted normalization | |
self._combine_features() | |
def _combine_features(self) -> None: | |
"""Combine all extracted features with balanced weights.""" | |
features = [self.rms, self.mel_acts, self.chroma_acts, self.tempogram_acts, self.mfcc_acts] | |
feature_names = ['rms', 'mel_acts', 'chroma_acts', 'tempogram_acts', 'mfcc_acts'] | |
# Calculate dimension-based weights | |
dims = {name: feature.shape[0] for feature, name in zip(features, feature_names)} | |
total_inv_dim = sum(1 / dim for dim in dims.values()) | |
weights = {name: 1 / (dims[name] * total_inv_dim) for name in feature_names} | |
# Normalize and weight each feature | |
std_weighted_features = [ | |
StandardScaler().fit_transform(feature.T).T * weights[name] | |
for feature, name in zip(features, feature_names) | |
] | |
# Combine features | |
self.combined_features = np.concatenate(std_weighted_features, axis=0).T.astype(np.float32) | |
self.n_frames = len(self.combined_features) | |
def create_meter_grid(self) -> np.ndarray: | |
"""Create a grid based on the meter of the song using tempo and beats. | |
Returns: | |
Numpy array containing the meter grid frame positions | |
""" | |
# Extract tempo and beat information | |
self.tempo, self.beats = librosa.beat.beat_track( | |
onset_envelope=self.onset_env, sr=self.sr, hop_length=self.hop_length) | |
# Adjust tempo if it's too slow or too fast | |
self.tempo = self._adjust_tempo(self.tempo) | |
# Create meter grid | |
self.meter_grid = self._create_meter_grid() | |
return self.meter_grid | |
def _adjust_tempo(self, tempo: float) -> float: | |
"""Adjust tempo to a reasonable range. | |
Args: | |
tempo: Detected tempo | |
Returns: | |
Adjusted tempo | |
""" | |
if tempo < 70: | |
return tempo * 2 | |
elif tempo > 140: | |
return tempo / 2 | |
return tempo | |
def _create_meter_grid(self) -> np.ndarray: | |
"""Helper function to create a meter grid for the song. | |
Returns: | |
Numpy array containing the meter grid frame positions | |
""" | |
# Calculate beat interval | |
seconds_per_beat = 60 / self.tempo | |
beat_interval = int(librosa.time_to_frames(seconds_per_beat, sr=self.sr, hop_length=self.hop_length)) | |
# Find best matching start beat | |
if len(self.beats) >= 3: | |
best_match = max( | |
(1 - abs(np.mean(self.beats[i:i+3]) - beat_interval) / beat_interval, self.beats[i]) | |
for i in range(len(self.beats) - 2) | |
) | |
anchor_frame = best_match[1] if best_match[0] > 0.95 else self.beats[0] | |
else: | |
anchor_frame = self.beats[0] if len(self.beats) > 0 else 0 | |
first_beat_time = librosa.frames_to_time(anchor_frame, sr=self.sr, hop_length=self.hop_length) | |
# Calculate beats forward and backward | |
time_duration = librosa.frames_to_time(self.n_frames, sr=self.sr, hop_length=self.hop_length) | |
num_beats_forward = int((time_duration - first_beat_time) / seconds_per_beat) | |
num_beats_backward = int(first_beat_time / seconds_per_beat) + 1 | |
# Create beat times | |
beat_times_forward = first_beat_time + np.arange(num_beats_forward) * seconds_per_beat | |
beat_times_backward = first_beat_time - np.arange(1, num_beats_backward) * seconds_per_beat | |
# Combine and create meter grid | |
beat_grid = np.concatenate((np.array([0.0]), beat_times_backward[::-1], beat_times_forward)) | |
meter_indices = np.arange(0, len(beat_grid), self.time_signature) | |
meter_grid = beat_grid[meter_indices] | |
# Ensure grid starts at 0 and ends at frame duration | |
if meter_grid[0] != 0.0: | |
meter_grid = np.insert(meter_grid, 0, 0.0) | |
# Convert to frames | |
meter_grid = librosa.time_to_frames(meter_grid, sr=self.sr, hop_length=self.hop_length) | |
# Ensure grid ends at the last frame | |
if meter_grid[-1] != self.n_frames: | |
meter_grid = np.append(meter_grid, self.n_frames) | |
return meter_grid |