Spaces:
Running
Running
| import io | |
| import logging | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from PIL import Image | |
| from scipy import signal | |
| try: | |
| import soundfile as sf | |
| SOUNDFILE_AVAILABLE = True | |
| except ImportError: | |
| sf = None | |
| SOUNDFILE_AVAILABLE = False | |
| LOGGER = logging.getLogger(__name__) | |
| class RhythmaModulationEngine: | |
| """ | |
| Dynamic rhythm-based audio modulation engine. | |
| """ | |
| SAMPLE_RATE = 44100 | |
| EMOTIONAL_FREQUENCIES = { | |
| "anxious": 396, | |
| "stressed": 528, | |
| "calm": 741, | |
| "sad": 417, | |
| "angry": 852, | |
| "fearful": 639, | |
| "confused": 285, | |
| "happy": 432, | |
| "neutral": 440, | |
| "focused": 639, | |
| "relaxed": 741, | |
| "active": 528, | |
| } | |
| EMOTIONAL_INFO = { | |
| "anxious": { | |
| "name": "Liberating Guilt and Fear", | |
| "advice": "The 396 Hz frequency may help release fear and guilt.", | |
| }, | |
| "stressed": { | |
| "name": "Transformation and Miracles", | |
| "advice": "The 528 Hz frequency is associated with transformation.", | |
| }, | |
| "calm": { | |
| "name": "Awakening Intuition", | |
| "advice": "The 741 Hz frequency is associated with awakening intuition.", | |
| }, | |
| "sad": { | |
| "name": "Facilitating Change", | |
| "advice": "The 417 Hz frequency is linked to facilitating change.", | |
| }, | |
| "angry": { | |
| "name": "Returning to Spiritual Order", | |
| "advice": "The 852 Hz frequency may aid in returning to inner strength.", | |
| }, | |
| "fearful": { | |
| "name": "Connecting Relationships", | |
| "advice": "The 639 Hz frequency is associated with connecting relationships.", | |
| }, | |
| "confused": { | |
| "name": "Quantum Cognition", | |
| "advice": "The 285 Hz frequency is believed to influence energy fields.", | |
| }, | |
| "happy": { | |
| "name": "Harmonizing Vibrations", | |
| "advice": "The 432 Hz frequency is associated with natural harmony.", | |
| }, | |
| "neutral": { | |
| "name": "Grounded Presence", | |
| "advice": "The 440 Hz frequency provides a stable reference point.", | |
| }, | |
| "focused": { | |
| "name": "Clarity and Connection", | |
| "advice": "The 639 Hz frequency may support focus and understanding.", | |
| }, | |
| "relaxed": { | |
| "name": "Intuitive Calm", | |
| "advice": "The 741 Hz frequency is linked to intuitive states and problem-solving.", | |
| }, | |
| "active": { | |
| "name": "Dynamic Energy", | |
| "advice": "The 528 Hz frequency is associated with positive transformation.", | |
| }, | |
| } | |
| RHYTHM_CONFIGS = { | |
| "calm": { | |
| "mod_depth": 0.15, | |
| "mod_freq": 0.5, | |
| "pulse_width": 0.7, | |
| "phase_shift": 0.1, | |
| "harmonics": [1.0, 0.5, 0.25, 0.125], | |
| }, | |
| "active": { | |
| "mod_depth": 0.4, | |
| "mod_freq": 2.5, | |
| "pulse_width": 0.3, | |
| "phase_shift": 0.3, | |
| "harmonics": [1.0, 0.7, 0.5, 0.3], | |
| }, | |
| "focused": { | |
| "mod_depth": 0.25, | |
| "mod_freq": 1.5, | |
| "pulse_width": 0.5, | |
| "phase_shift": 0.2, | |
| "harmonics": [1.0, 0.6, 0.3, 0.15], | |
| }, | |
| "relaxed": { | |
| "mod_depth": 0.2, | |
| "mod_freq": 0.3, | |
| "pulse_width": 0.8, | |
| "phase_shift": 0.05, | |
| "harmonics": [1.0, 0.4, 0.2, 0.1], | |
| }, | |
| } | |
| SYMBOLIC_MAPPING = { | |
| "calm": "Resonating in the Circle Archetype: completion, wholeness, presence", | |
| "active": "Resonating in the Spiral Archetype: flow, transition, emergence", | |
| "focused": "Resonating in the Triangle Archetype: clarity, direction, purpose", | |
| "relaxed": "Resonating in the Wave Archetype: fluidity, acceptance, surrender", | |
| } | |
| def __init__( | |
| self, | |
| base_freq=None, | |
| modulation_type="sine", | |
| rhythm_pattern=None, | |
| emotional_state=None, | |
| ): | |
| self.modulation_type = modulation_type | |
| self.sample_rate = self.SAMPLE_RATE | |
| self.emotional_frequencies = self.EMOTIONAL_FREQUENCIES | |
| self.emotional_info = self.EMOTIONAL_INFO | |
| self.rhythm_configs = self.RHYTHM_CONFIGS | |
| self.symbolic_mapping = self.SYMBOLIC_MAPPING | |
| valid_emotional_state = ( | |
| emotional_state | |
| if emotional_state and emotional_state in self.emotional_frequencies | |
| else None | |
| ) | |
| self.emotional_state = valid_emotional_state | |
| if self.emotional_state: | |
| self.base_freq = self.emotional_frequencies[self.emotional_state] | |
| elif base_freq and base_freq > 0: | |
| self.base_freq = base_freq | |
| self.emotional_state = self._find_closest_state(base_freq) | |
| else: | |
| self.emotional_state = "neutral" | |
| self.base_freq = self.emotional_frequencies[self.emotional_state] | |
| valid_rhythm_pattern = ( | |
| rhythm_pattern if rhythm_pattern and rhythm_pattern in self.rhythm_configs else None | |
| ) | |
| self.rhythm_pattern = valid_rhythm_pattern or "calm" | |
| self.config = self.rhythm_configs[self.rhythm_pattern] | |
| def _find_closest_state(self, base_freq): | |
| min_diff = float("inf") | |
| closest_state = None | |
| for state, freq in self.emotional_frequencies.items(): | |
| diff = abs(freq - base_freq) | |
| if diff < min_diff: | |
| min_diff = diff | |
| closest_state = state | |
| return closest_state if min_diff <= 10 else None | |
| def _generate_base_wave(self, duration): | |
| t = np.linspace(0, duration, int(self.sample_rate * duration), endpoint=False) | |
| base_wave = np.sin(2 * np.pi * self.base_freq * t) | |
| rich_wave = np.zeros_like(base_wave) | |
| for index, harmonic_amp in enumerate(self.config["harmonics"], start=1): | |
| harmonic_freq = self.base_freq * index | |
| if harmonic_freq < self.sample_rate / 2: | |
| rich_wave += harmonic_amp * np.sin(2 * np.pi * harmonic_freq * t) | |
| if np.max(np.abs(rich_wave)) > 0: | |
| rich_wave = rich_wave / np.max(np.abs(rich_wave)) | |
| else: | |
| rich_wave = base_wave | |
| return t, rich_wave | |
| def _apply_sine_modulation(self, t, carrier): | |
| mod_env = 1.0 + self.config["mod_depth"] * np.sin( | |
| 2 * np.pi * self.config["mod_freq"] * t + self.config["phase_shift"] | |
| ) | |
| return carrier * mod_env | |
| def _apply_pulse_modulation(self, t, carrier): | |
| pulse = 0.5 * ( | |
| signal.square( | |
| 2 * np.pi * self.config["mod_freq"] * t, | |
| duty=self.config["pulse_width"], | |
| ) | |
| + 1 | |
| ) | |
| mod_env = 1.0 - self.config["mod_depth"] + self.config["mod_depth"] * pulse | |
| return carrier * mod_env | |
| def _apply_chirp_modulation(self, t, carrier): | |
| start_mod_freq = max(0.1, self.config["mod_freq"] / 2) | |
| end_mod_freq = self.config["mod_freq"] * 2 | |
| instantaneous_mod_freq = np.linspace(start_mod_freq, end_mod_freq, len(t)) | |
| phase = 2 * np.pi * np.cumsum(instantaneous_mod_freq) / self.sample_rate | |
| mod_env = 1.0 + self.config["mod_depth"] * np.sin( | |
| phase + self.config["phase_shift"] | |
| ) | |
| return carrier * mod_env | |
| def _normalize_audio(self, audio): | |
| max_amp = np.max(np.abs(audio)) | |
| if max_amp <= 0: | |
| return audio | |
| return 0.9 * audio / max_amp | |
| def _render_drone_layer(self, t, tone_center, density, config): | |
| drone = np.zeros_like(t) | |
| density = float(np.clip(density, 0.0, 1.0)) | |
| harmonic_count = 2 if density < 0.5 else 3 | |
| for index, harmonic_amp in enumerate(config["harmonics"][:harmonic_count], start=1): | |
| harmonic_freq = tone_center * index | |
| if harmonic_freq < self.sample_rate / 2: | |
| drone += harmonic_amp * np.sin(2 * np.pi * harmonic_freq * t) | |
| max_amp = np.max(np.abs(drone)) | |
| if max_amp > 0: | |
| drone = drone / max_amp | |
| return drone * (0.75 + 0.25 * density) | |
| def _render_breath_layer(self, t, tone_center, breath_rate, pattern): | |
| breath_rate = max(0.02, float(breath_rate)) | |
| breath_freq = max(40.0, tone_center * 0.5) | |
| carrier = np.sin(2 * np.pi * breath_freq * t) | |
| pattern_config = self.rhythm_configs.get(pattern, self.config) | |
| breath_env = 0.5 * ( | |
| signal.square( | |
| 2 * np.pi * breath_rate * t, | |
| duty=pattern_config["pulse_width"], | |
| ) | |
| + 1.0 | |
| ) | |
| return carrier * breath_env | |
| def _render_shimmer_layer(self, t, tone_center, brightness, shimmer): | |
| brightness = float(np.clip(brightness, 0.0, 1.0)) | |
| shimmer = float(np.clip(shimmer, 0.0, 1.0)) | |
| shimmer_layer = np.zeros_like(t) | |
| harmonic_levels = [ | |
| (2.0, 0.35 + 0.25 * brightness), | |
| (3.0, 0.2 + 0.2 * shimmer), | |
| (4.0, 0.1 + 0.15 * brightness), | |
| ] | |
| for index, (multiplier, amplitude) in enumerate(harmonic_levels, start=1): | |
| harmonic_freq = tone_center * multiplier | |
| if harmonic_freq < self.sample_rate / 2: | |
| shimmer_layer += amplitude * np.sin( | |
| 2 * np.pi * harmonic_freq * t + (index * np.pi / 7) | |
| ) | |
| max_amp = np.max(np.abs(shimmer_layer)) | |
| if max_amp > 0: | |
| shimmer_layer = shimmer_layer / max_amp | |
| shimmer_motion = 0.6 + 0.4 * np.sin( | |
| 2 * np.pi * max(0.1, 1.5 + shimmer * 3.0) * t | |
| ) | |
| return shimmer_layer * shimmer_motion * (0.2 + 0.8 * brightness) | |
| def _build_session_envelope(self, sample_count): | |
| if sample_count <= 1: | |
| return np.ones(sample_count) | |
| attack_count = max(1, int(sample_count * 0.08)) | |
| release_count = max(1, int(sample_count * 0.12)) | |
| if attack_count + release_count >= sample_count: | |
| attack_count = max(1, sample_count // 2) | |
| release_count = sample_count - attack_count | |
| sustain_count = sample_count - attack_count - release_count | |
| attack = np.linspace(0.0, 1.0, attack_count, endpoint=False) | |
| sustain = np.ones(sustain_count) | |
| release = np.linspace(1.0, 0.0, release_count, endpoint=True) | |
| return np.concatenate([attack, sustain, release]) | |
| def render_session(self, profile, duration): | |
| sample_count = int(self.sample_rate * duration) | |
| if duration <= 0 or sample_count < 1: | |
| raise ValueError("duration must produce at least one sample") | |
| tone_center = float(profile.get("tone_center", self.base_freq)) | |
| pattern = profile.get("pattern", self.rhythm_pattern) | |
| config = self.rhythm_configs.get(pattern, self.config) | |
| t = np.linspace(0, duration, sample_count, endpoint=False) | |
| drone = self._render_drone_layer(t, tone_center, profile.get("density", 0.5), config) | |
| pulse = self._render_breath_layer( | |
| t, | |
| tone_center, | |
| profile.get("breath_rate", config["mod_freq"] / 8), | |
| pattern, | |
| ) | |
| shimmer = self._render_shimmer_layer( | |
| t, | |
| tone_center, | |
| profile.get("brightness", 0.25), | |
| profile.get("shimmer", 0.1), | |
| ) | |
| combined = (0.62 * drone) + (0.25 * pulse) + (0.13 * shimmer) | |
| combined = combined * self._build_session_envelope(len(t)) | |
| return self._normalize_audio(combined) | |
| def generate_modulated_wave(self, duration): | |
| t, base_carrier = self._generate_base_wave(duration) | |
| if self.modulation_type == "sine": | |
| modulated = self._apply_sine_modulation(t, base_carrier) | |
| elif self.modulation_type == "pulse": | |
| modulated = self._apply_pulse_modulation(t, base_carrier) | |
| elif self.modulation_type == "chirp": | |
| modulated = self._apply_chirp_modulation(t, base_carrier) | |
| else: | |
| modulated = base_carrier | |
| return self._normalize_audio(modulated) | |
| def save_audio(self, duration, file_path=None): | |
| if not SOUNDFILE_AVAILABLE: | |
| LOGGER.error("soundfile is not installed; audio export is unavailable.") | |
| return None | |
| audio = self.generate_modulated_wave(duration) | |
| output_path = file_path or f"rhythma_{self.base_freq}Hz_{self.rhythm_pattern}.wav" | |
| try: | |
| sf.write(output_path, audio, self.sample_rate) | |
| LOGGER.info("Audio saved to %s", output_path) | |
| return output_path | |
| except Exception: | |
| LOGGER.exception("Failed to save audio to %s", output_path) | |
| return None | |
| def visualize_waveform(self, duration): | |
| vis_duration = min(duration, 0.5) | |
| plot_samples = int(self.sample_rate * vis_duration) | |
| t = np.linspace(0, vis_duration, plot_samples, endpoint=False) | |
| modulated = self.generate_modulated_wave(vis_duration) | |
| fig, (ax1, ax2) = plt.subplots( | |
| 2, 1, figsize=(10, 6), gridspec_kw={"height_ratios": [1, 1]} | |
| ) | |
| zoom_samples = min(plot_samples, 2000) | |
| ax1.plot(t[:zoom_samples], modulated[:zoom_samples]) | |
| title = ( | |
| f"Rhythma Waveform: {self.rhythm_pattern.capitalize()} " | |
| f"({self.modulation_type.capitalize()})" | |
| ) | |
| if self.emotional_state: | |
| title += f" - {self.emotional_state.capitalize()} ({self.base_freq} Hz)" | |
| else: | |
| title += f" - {self.base_freq} Hz" | |
| ax1.set_title(title) | |
| ax1.set_xlabel("Time (s)") | |
| ax1.set_ylabel("Amplitude") | |
| ax1.grid(True) | |
| try: | |
| full_wave = self.generate_modulated_wave(duration) | |
| freqs, times, spectrogram = signal.spectrogram( | |
| full_wave, self.sample_rate, nperseg=1024 | |
| ) | |
| freq_limit_idx = np.where(freqs >= 2000)[0] | |
| if len(freq_limit_idx) > 0: | |
| cutoff = freq_limit_idx[0] | |
| freqs = freqs[:cutoff] | |
| spectrogram = spectrogram[:cutoff, :] | |
| pcm = ax2.pcolormesh( | |
| times, | |
| freqs, | |
| 10 * np.log10(spectrogram + 1e-9), | |
| shading="gouraud", | |
| cmap="viridis", | |
| ) | |
| fig.colorbar(pcm, ax=ax2, label="Power (dB)") | |
| ax2.set_ylabel("Frequency (Hz)") | |
| ax2.set_xlabel("Time (s)") | |
| ax2.set_title("Spectrogram") | |
| except Exception: | |
| LOGGER.exception("Failed to generate spectrogram.") | |
| ax2.set_title("Spectrogram (Error)") | |
| ax2.text( | |
| 0.5, | |
| 0.5, | |
| "Could not generate spectrogram", | |
| horizontalalignment="center", | |
| verticalalignment="center", | |
| transform=ax2.transAxes, | |
| ) | |
| plt.tight_layout(rect=[0, 0.05, 1, 1]) | |
| fig_text = self.get_symbolic_interpretation() | |
| emotion_info = self.emotional_info.get(self.emotional_state, {}) | |
| if emotion_info: | |
| fig_text += ( | |
| f"\n{self.base_freq} Hz - {emotion_info.get('name', '')}: " | |
| f"{emotion_info.get('advice', '')}" | |
| ) | |
| elif not self.emotional_state: | |
| fig_text += f"\nBase Frequency: {self.base_freq} Hz" | |
| fig.text(0.5, 0.01, fig_text, ha="center", va="bottom", fontsize=9, style="italic", wrap=True) | |
| return fig | |
| def get_waveform_image(self): | |
| duration = 0.05 | |
| t = np.linspace(0, duration, int(self.sample_rate * duration), False) | |
| tone = np.sin(2 * np.pi * self.base_freq * t) | |
| plt.figure(figsize=(6, 2)) | |
| plt.plot(t, tone) | |
| plt.xlabel("Time (s)") | |
| plt.ylabel("Amplitude") | |
| plt.ylim(-1.1, 1.1) | |
| plt.grid(True) | |
| plt.tight_layout() | |
| buffer = io.BytesIO() | |
| plt.savefig(buffer, format="png", bbox_inches="tight") | |
| buffer.seek(0) | |
| plt.close() | |
| return Image.open(buffer) | |
| def get_symbolic_interpretation(self): | |
| return self.symbolic_mapping.get( | |
| self.rhythm_pattern, "Pattern Interpretation: Default" | |
| ) | |
| def get_emotional_advice(self): | |
| if not self.emotional_state: | |
| return "No specific emotional state identified." | |
| return self.emotional_info.get(self.emotional_state, {}).get( | |
| "advice", "General well-being advice applies." | |
| ) | |
| def get_complete_analysis(self): | |
| analysis = [] | |
| if self.emotional_state: | |
| emotion_info = self.emotional_info.get(self.emotional_state, {}) | |
| analysis.append(f"Detected State/Intention: {self.emotional_state.capitalize()}") | |
| analysis.append( | |
| f"Resonant Frequency: {self.base_freq} Hz - " | |
| f"{emotion_info.get('name', 'Frequency Information')}" | |
| ) | |
| analysis.append( | |
| f"Guidance: {emotion_info.get('advice', 'Focus on the sound.')}" | |
| ) | |
| else: | |
| analysis.append(f"Using Manual Frequency: {self.base_freq} Hz") | |
| analysis.append("Guidance: Tune into the custom frequency.") | |
| analysis.append(f"Rhythm Pattern: {self.rhythm_pattern.capitalize()}") | |
| analysis.append( | |
| f"Symbolic Interpretation: {self.get_symbolic_interpretation()}" | |
| ) | |
| analysis.append(f"Modulation Type: {self.modulation_type.capitalize()}") | |
| return "\n\n".join(analysis) | |