| """ |
| Speaker detection using simple voice activity analysis. |
| No neural models needed - uses basic signal processing. |
| """ |
|
|
| import numpy as np |
| import soundfile as sf |
| import librosa |
| import os |
|
|
|
|
| def analyze_speakers(audio_path: str, output_dir: str = None) -> dict: |
| """ |
| Analyze audio to detect and count unique speakers. |
| Uses multiple cues: voice activity, energy, spectral characteristics. |
| """ |
| print(f"Loading audio: {audio_path}") |
| audio, sr = sf.read(audio_path) |
|
|
| if audio.ndim > 1: |
| audio_mono = audio.mean(axis=1) |
| else: |
| audio_mono = audio |
|
|
| print(f"Audio: {len(audio_mono) / sr:.1f}s at {sr}Hz") |
|
|
| if output_dir: |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| print("\nAnalyzing speaker segments...") |
|
|
| frame_length = 2048 |
| hop_length = 512 |
|
|
| energy = librosa.feature.rms( |
| y=audio_mono, frame_length=frame_length, hop_length=hop_length |
| )[0] |
| times = librosa.times_like(energy, sr=sr, hop_length=hop_length) |
|
|
| energy_threshold = np.percentile(energy, 15) |
| speech_mask = energy > energy_threshold |
|
|
| segment_duration = 1.0 |
| segment_samples = int(segment_duration * sr) |
| n_segments = len(audio_mono) // segment_samples |
|
|
| print(f" Splitting into {n_segments} segments of {segment_duration}s each") |
|
|
| segments_data = [] |
|
|
| for seg_idx in range(n_segments): |
| start = seg_idx * segment_samples |
| end = start + segment_samples |
| segment = audio_mono[start:end] |
|
|
| seg_energy = np.mean(segment**2) |
| if seg_energy < 0.001: |
| continue |
|
|
| f0, voiced, _ = librosa.pyin( |
| segment, fmin=70, fmax=400, sr=sr, frame_length=2048 |
| ) |
|
|
| f0_valid = f0[~np.isnan(f0)] |
|
|
| if len(f0_valid) > 10: |
| f0_median = np.median(f0_valid) |
| f0_std = np.std(f0_valid) |
| else: |
| f0_median = 0 |
| f0_std = 0 |
|
|
| spectral_centroid = np.mean( |
| librosa.feature.spectral_centroid(y=segment, sr=sr)[0] |
| ) |
|
|
| segments_data.append( |
| { |
| "segment": seg_idx, |
| "start_time": start / sr, |
| "energy": seg_energy, |
| "f0_median": f0_median, |
| "f0_std": f0_std, |
| "spectral_centroid": spectral_centroid, |
| } |
| ) |
|
|
| print(f"Analyzed {len(segments_data)} speech segments") |
|
|
| print("\nClustering segments by voice characteristics...") |
|
|
| features = [] |
| for seg in segments_data: |
| features.append( |
| [ |
| seg["f0_median"] if seg["f0_median"] > 0 else 150, |
| seg["spectral_centroid"], |
| np.log10(seg["energy"] + 1e-10) * 100, |
| ] |
| ) |
|
|
| features = np.array(features) |
|
|
| features[:, 0] = features[:, 0] / 300 |
| features[:, 1] = features[:, 1] / 5000 |
| features[:, 2] = np.clip(features[:, 2], -2, 2) |
|
|
| from scipy.cluster.hierarchy import linkage, fcluster |
|
|
| Z = linkage(features, method="average") |
|
|
| n_clusters = min(8, len(segments_data) // 3) |
| n_clusters = max(n_clusters, 2) |
|
|
| labels = fcluster(Z, n_clusters, criterion="maxclust") |
|
|
| unique_speakers = len(set(labels)) |
|
|
| print("\nResults:") |
| print(f" Total segments analyzed: {len(segments_data)}") |
| print(f" Estimated unique speakers: {unique_speakers}") |
|
|
| for cluster_id in sorted(set(labels)): |
| cluster_segs = [s for s, l in zip(segments_data, labels) if l == cluster_id] |
| avg_energy = np.mean([s["energy"] for s in cluster_segs]) |
| avg_f0 = np.mean([s["f0_median"] for s in cluster_segs if s["f0_median"] > 0]) |
|
|
| if avg_f0 > 0: |
| if avg_f0 < 140: |
| gender = "male" |
| elif avg_f0 > 185: |
| gender = "female" |
| else: |
| gender = "ambiguous" |
| else: |
| gender = "unknown" |
|
|
| distance = "near" if avg_energy > 0.03 else "far" |
|
|
| print( |
| f" Speaker {cluster_id}: {len(cluster_segs)} segments, {gender}, {distance} (energy: {avg_energy:.4f})" |
| ) |
|
|
| result = { |
| "n_speakers": unique_speakers, |
| "segments": segments_data, |
| "cluster_labels": labels.tolist(), |
| } |
|
|
| if output_dir: |
| with open(os.path.join(output_dir, "speaker_analysis.txt"), "w") as f: |
| f.write(f"Estimated unique speakers: {unique_speakers}\n\n") |
| for cluster_id in sorted(set(labels)): |
| cluster_segs = [ |
| s for s, l in zip(segments_data, labels) if l == cluster_id |
| ] |
| avg_energy = np.mean([s["energy"] for s in cluster_segs]) |
| avg_f0 = np.mean( |
| [s["f0_median"] for s in cluster_segs if s["f0_median"] > 0] |
| ) |
| gender = ( |
| "male" |
| if avg_f0 > 0 and avg_f0 < 140 |
| else ("female" if avg_f0 > 185 else "unknown") |
| ) |
| f.write( |
| f"Speaker {cluster_id}: {len(cluster_segs)} segments, gender: {gender}\n" |
| ) |
|
|
| return result |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
|
|
| audio_file = sys.argv[1] if len(sys.argv) > 1 else "../data/mixture.wav" |
| output = sys.argv[2] if len(sys.argv) > 2 else "speaker_analysis_output" |
|
|
| analyze_speakers(audio_file, output) |
|
|