Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import librosa | |
| import numpy as np | |
| # Basic building block for the ResNet-style CNN | |
| # Uses two convolutional layers with batch normalization | |
| class BasicBlock(nn.Module): | |
| def __init__(self, in_channels, out_channels, stride=1, downsample=None): | |
| super(BasicBlock, self).__init__() | |
| # first conv layer with specified stride | |
| self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, | |
| stride=stride, padding=1, bias=False) | |
| self.bn1 = nn.BatchNorm2d(out_channels) | |
| # second conv layer always has stride 1 | |
| self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, | |
| stride=1, padding=1, bias=False) | |
| self.bn2 = nn.BatchNorm2d(out_channels) | |
| # downsample is used when dimensions change | |
| self.downsample = downsample | |
| def forward(self, x): | |
| # save input for skip connection | |
| identity = x | |
| # pass through first conv + batchnorm + relu | |
| out = F.relu(self.bn1(self.conv1(x))) | |
| # pass through second conv + batchnorm | |
| out = self.bn2(self.conv2(out)) | |
| # apply downsample if needed to match dimensions | |
| if self.downsample is not None: | |
| identity = self.downsample(x) | |
| # add skip connection and apply relu | |
| out += identity | |
| out = F.relu(out) | |
| return out | |
| # Main CNN model for speech style classification | |
| # Architecture based on ResNet with custom layer configuration | |
| class SpeechStyleCNN(nn.Module): | |
| def __init__(self, num_classes=2): | |
| super(SpeechStyleCNN, self).__init__() | |
| # initial convolution layer - takes 3 channel input (RGB spectrogram) | |
| self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False) | |
| self.bn1 = nn.BatchNorm2d(64) | |
| self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) | |
| # stack of residual blocks with increasing channel sizes | |
| self.layer1 = self._make_layer(64, 64, 2, stride=1) | |
| self.layer2 = self._make_layer(64, 128, 2, stride=2) | |
| self.layer3 = self._make_layer(128, 256, 2, stride=2) | |
| self.layer4 = self._make_layer(256, 512, 2, stride=2) | |
| # global average pooling and final classification layer | |
| self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) | |
| self.fc = nn.Linear(512, num_classes) | |
| # helper function to create a layer of residual blocks | |
| def _make_layer(self, in_channels, out_channels, blocks, stride=1): | |
| downsample = None | |
| # need downsample when stride changes or channels don't match | |
| if stride != 1 or in_channels != out_channels: | |
| downsample = nn.Sequential( | |
| nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False), | |
| nn.BatchNorm2d(out_channels) | |
| ) | |
| # create list of blocks | |
| layers = [] | |
| # first block may have different stride | |
| layers.append(BasicBlock(in_channels, out_channels, stride, downsample)) | |
| # remaining blocks have stride 1 | |
| for _ in range(1, blocks): | |
| layers.append(BasicBlock(out_channels, out_channels)) | |
| return nn.Sequential(*layers) | |
| def forward(self, x): | |
| # initial conv block | |
| x = F.relu(self.bn1(self.conv1(x))) | |
| x = self.maxpool(x) | |
| # pass through all residual layers | |
| x = self.layer1(x) | |
| x = self.layer2(x) | |
| x = self.layer3(x) | |
| x = self.layer4(x) | |
| # global pooling and classification | |
| x = self.avgpool(x) | |
| x = torch.flatten(x, 1) | |
| x = self.fc(x) | |
| return x | |
| # Main classifier class that combines CNN with acoustic feature analysis | |
| class AudioClassifier: | |
| # dictionary of available pre-trained models | |
| AVAILABLE_MODELS = { | |
| '3s_window': 'spectrogram_cnn_3s_window.pth', | |
| } | |
| def get_model_path(cls, model_name='3s_window'): | |
| # returns the full path to a model file | |
| import os | |
| if model_name not in cls.AVAILABLE_MODELS: | |
| print(f"Model not found: {model_name}") | |
| return None | |
| return os.path.join(os.path.dirname(__file__), cls.AVAILABLE_MODELS[model_name]) | |
| def __init__(self, model_path=None, device=None): | |
| # set up device - use GPU if available | |
| if device is None: | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| else: | |
| self.device = torch.device(device) | |
| # initialize the CNN model | |
| self.model = SpeechStyleCNN().to(self.device) | |
| # use default model path if not specified | |
| if model_path is None: | |
| import os | |
| model_path = os.path.join(os.path.dirname(__file__), 'spectrogram_cnn_3s_window.pth') | |
| # load pre-trained weights | |
| try: | |
| print(f"Attempting to load model from: {model_path}") | |
| state_dict = torch.load(model_path, map_location=self.device, weights_only=False) | |
| self.model.load_state_dict(state_dict) | |
| print(f"✓ Successfully loaded trained model from: {model_path}") | |
| except FileNotFoundError: | |
| print(f"Could not find model file at {model_path}") | |
| print("Make sure the model file exists in the correct location") | |
| except Exception as e: | |
| print(f"Something went wrong loading the model: {e}") | |
| # set model to evaluation mode | |
| self.model.eval() | |
| # audio processing parameters | |
| self.sample_rate = 16000 | |
| self.n_mels = 128 | |
| self.n_fft = 2048 | |
| self.hop_length = 512 | |
| # extract mel spectrogram from audio file | |
| def extract_mel_spectrogram(self, audio_path, window_size=3.0): | |
| # load audio at target sample rate | |
| audio, sr = librosa.load(audio_path, sr=self.sample_rate) | |
| # calculate window size in samples | |
| window_samples = int(window_size * sr) | |
| # for longer audio, use multiple overlapping windows | |
| if len(audio) > window_samples * 1.5: | |
| hop_samples = window_samples // 2 | |
| windows = [] | |
| # extract overlapping windows | |
| for start in range(0, len(audio) - window_samples, hop_samples): | |
| window = audio[start:start + window_samples] | |
| windows.append(window) | |
| # add the last window | |
| if len(audio) > window_samples: | |
| windows.append(audio[-window_samples:]) | |
| # compute mel spectrogram for each window | |
| mel_specs = [] | |
| for window in windows[:5]: # limit to 5 windows | |
| mel_spec = librosa.feature.melspectrogram( | |
| y=window, | |
| sr=sr, | |
| n_mels=self.n_mels, | |
| n_fft=self.n_fft, | |
| hop_length=self.hop_length | |
| ) | |
| mel_specs.append(mel_spec) | |
| # average the spectrograms | |
| mel_spec = np.mean(mel_specs, axis=0) | |
| else: | |
| # for short audio, pad or truncate | |
| if len(audio) < window_samples: | |
| audio = np.pad(audio, (0, window_samples - len(audio)), mode='constant') | |
| else: | |
| audio = audio[:window_samples] | |
| mel_spec = librosa.feature.melspectrogram( | |
| y=audio, | |
| sr=sr, | |
| n_mels=self.n_mels, | |
| n_fft=self.n_fft, | |
| hop_length=self.hop_length | |
| ) | |
| # convert to decibels | |
| mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) | |
| # normalize to 0-1 range | |
| mel_spec_norm = (mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min()) | |
| # stack into 3 channels for CNN input | |
| mel_spec_3ch = np.stack([mel_spec_norm, mel_spec_norm, mel_spec_norm], axis=0) | |
| return mel_spec_3ch | |
| # extract acoustic features from audio | |
| def extract_acoustic_features(self, audio_path): | |
| audio, sr = librosa.load(audio_path, sr=self.sample_rate) | |
| features = {} | |
| # tempo/rhythm estimation | |
| onset_env = librosa.onset.onset_strength(y=audio, sr=sr) | |
| tempo, _ = librosa.beat.beat_track(onset_envelope=onset_env, sr=sr) | |
| features['tempo'] = float(tempo) | |
| # pitch tracking | |
| pitches, magnitudes = librosa.piptrack(y=audio, sr=sr) | |
| pitch_values = [] | |
| for t in range(pitches.shape[1]): | |
| index = magnitudes[:, t].argmax() | |
| pitch = pitches[index, t] | |
| if pitch > 0: | |
| pitch_values.append(pitch) | |
| # calculate pitch statistics | |
| if pitch_values: | |
| features['pitch_mean'] = float(np.mean(pitch_values)) | |
| features['pitch_std'] = float(np.std(pitch_values)) | |
| features['pitch_range'] = float(np.max(pitch_values) - np.min(pitch_values)) | |
| else: | |
| features['pitch_mean'] = 0.0 | |
| features['pitch_std'] = 0.0 | |
| features['pitch_range'] = 0.0 | |
| # energy/loudness features | |
| rms = librosa.feature.rms(y=audio)[0] | |
| features['energy_mean'] = float(np.mean(rms)) | |
| features['energy_std'] = float(np.std(rms)) | |
| # zero crossing rate - indicates voice quality | |
| zcr = librosa.feature.zero_crossing_rate(audio)[0] | |
| features['zcr_mean'] = float(np.mean(zcr)) | |
| features['zcr_std'] = float(np.std(zcr)) | |
| # spectral centroid - brightness of sound | |
| spectral_centroids = librosa.feature.spectral_centroid(y=audio, sr=sr)[0] | |
| features['spectral_centroid_mean'] = float(np.mean(spectral_centroids)) | |
| features['spectral_centroid_std'] = float(np.std(spectral_centroids)) | |
| return features | |
| # compute prosody scores from acoustic features | |
| # uses thresholds calibrated from training data | |
| def _compute_prosody_scores(self, features): | |
| individual_scores = {} | |
| # spectral centroid variability - best discriminating feature | |
| sc_std = features['spectral_centroid_std'] | |
| if sc_std >= 1080: | |
| spectral_score = 0.9 # strongly indicates read | |
| elif sc_std >= 1040: | |
| spectral_score = 0.7 | |
| elif sc_std >= 1000: | |
| spectral_score = 0.5 | |
| elif sc_std >= 970: | |
| spectral_score = 0.3 | |
| else: | |
| spectral_score = 0.1 # strongly spontaneous | |
| individual_scores['spectral_variability'] = { | |
| 'score': spectral_score, | |
| 'value': sc_std, | |
| 'interpretation': 'high variability (read)' if spectral_score > 0.6 else 'low variability (spontaneous)' if spectral_score < 0.4 else 'moderate' | |
| } | |
| # zero crossing rate - second best feature | |
| zcr = features['zcr_mean'] | |
| if zcr >= 0.125: | |
| zcr_score = 0.9 | |
| elif zcr >= 0.110: | |
| zcr_score = 0.7 | |
| elif zcr >= 0.100: | |
| zcr_score = 0.5 | |
| elif zcr >= 0.092: | |
| zcr_score = 0.3 | |
| else: | |
| zcr_score = 0.1 | |
| individual_scores['zcr_mean'] = { | |
| 'score': zcr_score, | |
| 'value': zcr, | |
| 'interpretation': 'high ZCR (read)' if zcr_score > 0.6 else 'low ZCR (spontaneous)' if zcr_score < 0.4 else 'moderate' | |
| } | |
| # energy level - read speech tends to be lower energy | |
| energy = features['energy_mean'] | |
| if energy < 0.055: | |
| energy_score = 0.85 | |
| elif energy < 0.062: | |
| energy_score = 0.65 | |
| elif energy < 0.070: | |
| energy_score = 0.4 | |
| else: | |
| energy_score = 0.15 | |
| individual_scores['energy_level'] = { | |
| 'score': energy_score, | |
| 'value': energy, | |
| 'interpretation': 'low energy (read)' if energy_score > 0.6 else 'high energy (spontaneous)' if energy_score < 0.4 else 'moderate' | |
| } | |
| # pitch range feature | |
| pitch_range = features.get('pitch_range', 3828) | |
| if pitch_range < 3815: | |
| pitch_range_score = 0.7 | |
| elif pitch_range < 3828: | |
| pitch_range_score = 0.5 | |
| else: | |
| pitch_range_score = 0.3 | |
| individual_scores['pitch_range'] = { | |
| 'score': pitch_range_score, | |
| 'value': pitch_range, | |
| 'interpretation': 'narrow (read)' if pitch_range_score > 0.6 else 'wide (spontaneous)' if pitch_range_score < 0.4 else 'moderate' | |
| } | |
| # energy variability | |
| energy_std = features.get('energy_std', 0.047) | |
| if energy_std < 0.042: | |
| energy_std_score = 0.7 | |
| elif energy_std < 0.048: | |
| energy_std_score = 0.5 | |
| else: | |
| energy_std_score = 0.3 | |
| individual_scores['energy_std'] = { | |
| 'score': energy_std_score, | |
| 'value': energy_std, | |
| 'interpretation': 'steady (read)' if energy_std_score > 0.6 else 'variable (spontaneous)' if energy_std_score < 0.4 else 'moderate' | |
| } | |
| # zcr variability | |
| zcr_std = features.get('zcr_std', 0.111) | |
| if zcr_std >= 0.115: | |
| zcr_std_score = 0.7 | |
| elif zcr_std >= 0.105: | |
| zcr_std_score = 0.5 | |
| else: | |
| zcr_std_score = 0.3 | |
| individual_scores['zcr_std'] = { | |
| 'score': zcr_std_score, | |
| 'value': zcr_std, | |
| 'interpretation': 'variable ZCR (read)' if zcr_std_score > 0.6 else 'steady ZCR (spontaneous)' if zcr_std_score < 0.4 else 'moderate' | |
| } | |
| # weights based on feature importance from analysis | |
| weights = { | |
| 'spectral_variability': 0.30, | |
| 'zcr_mean': 0.25, | |
| 'energy_level': 0.20, | |
| 'pitch_range': 0.10, | |
| 'energy_std': 0.08, | |
| 'zcr_std': 0.07, | |
| } | |
| # calculate weighted overall score | |
| overall_score = ( | |
| spectral_score * weights['spectral_variability'] + | |
| zcr_score * weights['zcr_mean'] + | |
| energy_score * weights['energy_level'] + | |
| pitch_range_score * weights['pitch_range'] + | |
| energy_std_score * weights['energy_std'] + | |
| zcr_std_score * weights['zcr_std'] | |
| ) | |
| # determine classification based on thresholds | |
| if overall_score > 0.58: | |
| classification = 'read' | |
| confidence = 0.5 + (overall_score - 0.5) * 0.9 | |
| elif overall_score < 0.42: | |
| classification = 'spontaneous' | |
| confidence = 0.5 + (0.5 - overall_score) * 0.9 | |
| else: | |
| classification = 'read' if overall_score >= 0.50 else 'spontaneous' | |
| confidence = 0.5 + abs(overall_score - 0.5) * 0.6 | |
| return { | |
| 'classification': classification, | |
| 'confidence': min(0.95, confidence), | |
| 'overall_score': overall_score, | |
| 'individual_scores': individual_scores | |
| } | |
| # main classification method - combines CNN and prosody analysis | |
| def classify(self, audio_path): | |
| # extract mel spectrogram for CNN | |
| mel_spec = self.extract_mel_spectrogram(audio_path) | |
| # convert to tensor and add batch dimension | |
| mel_tensor = torch.FloatTensor(mel_spec).unsqueeze(0).to(self.device) | |
| # get CNN predictions | |
| with torch.no_grad(): | |
| logits = self.model(mel_tensor) | |
| probabilities = F.softmax(logits, dim=1) | |
| predicted_class = torch.argmax(probabilities, dim=1).item() | |
| cnn_confidence = probabilities[0, predicted_class].item() | |
| print(f"CNN Logits: {logits[0].cpu().numpy()}") | |
| print(f"CNN Probabilities: Class 0 (read)={probabilities[0, 0].item():.3f}, Class 1 (spontaneous)={probabilities[0, 1].item():.3f}") | |
| print(f"CNN Prediction: Class {predicted_class} ({['read', 'spontaneous'][predicted_class]}) with confidence {cnn_confidence:.3f}") | |
| # extract acoustic features for prosody analysis | |
| acoustic_features = self.extract_acoustic_features(audio_path) | |
| # compute prosody-based scores | |
| prosody_scores = self._compute_prosody_scores(acoustic_features) | |
| prosody_classification = prosody_scores['classification'] | |
| prosody_confidence = prosody_scores['confidence'] | |
| # map CNN class to label | |
| cnn_class_name = 'read' if predicted_class == 0 else 'spontaneous' | |
| read_prob = probabilities[0, 0].item() | |
| print(f"CNN classification: {cnn_class_name}") | |
| print(f"Prosody classification: {prosody_classification} (conf={prosody_confidence:.2f})") | |
| # combine CNN and prosody - prosody is more reliable | |
| final_classification = prosody_classification | |
| final_confidence = prosody_confidence | |
| # boost confidence when both methods agree | |
| if cnn_class_name == prosody_classification: | |
| final_confidence = min(0.95, prosody_confidence * 1.15) | |
| elif read_prob > 0.85 and cnn_class_name == 'read': | |
| if prosody_confidence < 0.65: | |
| final_classification = 'read' | |
| final_confidence = 0.55 | |
| elif read_prob < 0.10 and cnn_class_name == 'spontaneous': | |
| if prosody_confidence < 0.65: | |
| final_classification = 'spontaneous' | |
| final_confidence = 0.55 | |
| return { | |
| 'classification': final_classification, | |
| 'confidence': float(final_confidence), | |
| 'cnn_classification': cnn_class_name, | |
| 'cnn_confidence': float(cnn_confidence), | |
| 'prosody_classification': prosody_classification, | |
| 'prosody_confidence': float(prosody_confidence), | |
| 'prosody_scores': prosody_scores['individual_scores'], | |
| 'acoustic_features': acoustic_features, | |
| 'interpretation': self._interpret_classification( | |
| final_classification, final_confidence, | |
| cnn_class_name, cnn_confidence, | |
| prosody_classification, prosody_confidence, | |
| prosody_scores, acoustic_features | |
| ) | |
| } | |
| # generate human-readable interpretation of classification | |
| def _interpret_classification( | |
| self, | |
| final_class, | |
| final_confidence, | |
| cnn_class, | |
| cnn_confidence, | |
| prosody_class, | |
| prosody_confidence, | |
| prosody_scores, | |
| features | |
| ): | |
| interpretation = f"## Classification: **{final_class.upper()}** SPEECH\n\n" | |
| interpretation += f"**Confidence:** {final_confidence*100:.1f}%\n\n" | |
| if final_class == 'read': | |
| interpretation += "**Description:** The speech exhibits characteristics of read or scripted content. " | |
| interpretation += "The audio shows consistent prosodic patterns typical of someone reading from prepared text, " | |
| interpretation += "with steady pacing, uniform intonation, and regular energy levels.\n\n" | |
| else: | |
| interpretation += "**Description:** The speech exhibits characteristics of spontaneous speaking. " | |
| interpretation += "The audio shows natural prosodic variation typical of extemporaneous speech, " | |
| interpretation += "with variable pacing, dynamic intonation, and natural energy fluctuations.\n\n" | |
| return interpretation | |
| # test code - runs when script is executed directly | |
| if __name__ == "__main__": | |
| classifier = AudioClassifier() | |
| print("\nAvailable pre-trained models:") | |
| for name, filename in AudioClassifier.AVAILABLE_MODELS.items(): | |
| print(f" - {name}: {filename}") | |
| print("\nModel architecture:") | |
| print(classifier.model) | |