| import torch |
| import torch.nn as nn |
| import torchvision.transforms as T |
| from torchvision.transforms import functional as TF |
| import torchaudio |
| import torchaudio.transforms as AT |
| import numpy as np |
| import random |
| from PIL import Image |
| import librosa |
|
|
| class AdvancedDataAugmentation: |
| """ |
| Advanced data augmentation pipeline for multi-modal training |
| """ |
|
|
| def __init__(self): |
| |
| self.vision_transforms = T.Compose([ |
| T.RandomApply([T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)], p=0.3), |
| T.RandomApply([T.GaussianBlur(kernel_size=3)], p=0.1), |
| T.RandomApply([T.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.9, 1.1))], p=0.2), |
| T.RandomHorizontalFlip(p=0.1), |
| T.RandomApply([T.RandomErasing(p=0.1, scale=(0.02, 0.1), ratio=(0.3, 3.3))], p=0.1), |
| ]) |
|
|
| |
| self.audio_sample_rate = 16000 |
|
|
| def augment_vision(self, image): |
| """ |
| Apply advanced vision augmentations |
| """ |
| if isinstance(image, np.ndarray): |
| image = Image.fromarray(image) |
|
|
| |
| augmented = self.vision_transforms(image) |
|
|
| |
| if random.random() < 0.1: |
| |
| augmented = TF.adjust_gamma(augmented, random.uniform(0.8, 1.2)) |
|
|
| if random.random() < 0.1: |
| |
| img_array = np.array(augmented) |
| noise = np.random.normal(0, 5, img_array.shape) |
| img_array = np.clip(img_array + noise, 0, 255).astype(np.uint8) |
| augmented = Image.fromarray(img_array) |
|
|
| return augmented |
|
|
| def augment_audio(self, audio, sample_rate): |
| """ |
| Apply advanced audio augmentations |
| """ |
| if isinstance(audio, torch.Tensor): |
| audio = audio.numpy() |
|
|
| augmented_audios = [audio] |
|
|
| |
| if random.random() < 0.3: |
| rate = random.uniform(0.8, 1.2) |
| stretched = librosa.effects.time_stretch(audio, rate=rate) |
| augmented_audios.append(stretched) |
|
|
| |
| if random.random() < 0.3: |
| steps = random.randint(-2, 2) |
| pitched = librosa.effects.pitch_shift(audio, sr=sample_rate, n_steps=steps) |
| augmented_audios.append(pitched) |
|
|
| |
| if random.random() < 0.2: |
| noise = np.random.normal(0, 0.01, len(audio)) |
| noisy = audio + noise |
| augmented_audios.append(noisy) |
|
|
| |
| if random.random() < 0.3: |
| volume_factor = random.uniform(0.7, 1.3) |
| volume_aug = audio * volume_factor |
| augmented_audios.append(volume_aug) |
|
|
| |
| if random.random() < 0.2: |
| target_length = int(sample_rate * random.uniform(2.5, 4.0)) |
| if len(audio) > target_length: |
| start = random.randint(0, len(audio) - target_length) |
| cropped = audio[start:start + target_length] |
| else: |
| padding = target_length - len(audio) |
| cropped = np.pad(audio, (0, padding), 'constant') |
| augmented_audios.append(cropped) |
|
|
| |
| selected = random.choice(augmented_audios) |
|
|
| |
| target_length = sample_rate * 3 |
| if len(selected) > target_length: |
| selected = selected[:target_length] |
| elif len(selected) < target_length: |
| selected = np.pad(selected, (0, target_length - len(selected)), 'constant') |
|
|
| return torch.tensor(selected, dtype=torch.float32) |
|
|
| def augment_text(self, text, tokenizer): |
| """ |
| Apply text augmentations |
| """ |
| augmented_texts = [text] |
|
|
| |
| if random.random() < 0.2: |
| words = text.split() |
| if len(words) > 3: |
| |
| idx = random.randint(0, len(words) - 1) |
| |
| if random.random() < 0.5: |
| random.shuffle(words) |
| synonym_aug = ' '.join(words) |
| augmented_texts.append(synonym_aug) |
|
|
| |
|
|
| |
| if random.random() < 0.1: |
| words = text.split() |
| if len(words) > 3: |
| keep_prob = 0.9 |
| kept_words = [w for w in words if random.random() < keep_prob] |
| if kept_words: |
| deletion_aug = ' '.join(kept_words) |
| augmented_texts.append(deletion_aug) |
|
|
| selected_text = random.choice(augmented_texts) |
| return selected_text |
|
|
| class AdvancedPreprocessingPipeline: |
| """ |
| Advanced preprocessing pipeline with quality checks and normalization |
| """ |
|
|
| def __init__(self, target_face_size=(224, 224), target_audio_length=3.0): |
| self.target_face_size = target_face_size |
| self.target_audio_length = target_audio_length |
| self.sample_rate = 16000 |
|
|
| |
| self.min_face_confidence = 0.7 |
| self.min_audio_snr = 10.0 |
|
|
| def preprocess_face(self, face_image, bbox=None, landmarks=None): |
| """ |
| Advanced face preprocessing with alignment and quality checks |
| """ |
| |
| if not self._check_face_quality(face_image): |
| return None |
|
|
| |
| if isinstance(face_image, np.ndarray): |
| face_image = Image.fromarray(face_image) |
|
|
| |
| if landmarks is not None: |
| face_image = self._align_face(face_image, landmarks) |
|
|
| |
| face_image = face_image.resize(self.target_face_size, Image.BILINEAR) |
|
|
| |
| face_tensor = TF.to_tensor(face_image) |
|
|
| |
| normalize = T.Normalize(mean=[0.48145466, 0.4578275, 0.40821073], |
| std=[0.26862954, 0.26130258, 0.27577711]) |
| face_tensor = normalize(face_tensor) |
|
|
| return face_tensor |
|
|
| def preprocess_audio(self, audio_path_or_array, sample_rate=None): |
| """ |
| Advanced audio preprocessing with quality checks |
| """ |
| |
| if isinstance(audio_path_or_array, str): |
| audio, sr = librosa.load(audio_path_or_array, sr=self.sample_rate) |
| else: |
| audio = audio_path_or_array |
| sr = sample_rate or self.sample_rate |
|
|
| |
| if sr != self.sample_rate: |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=self.sample_rate) |
|
|
| |
| if not self._check_audio_quality(audio): |
| return None |
|
|
| |
| audio = self._voice_activity_detection(audio) |
|
|
| |
| audio = self._normalize_audio(audio) |
|
|
| |
| target_samples = int(self.sample_rate * self.target_audio_length) |
| if len(audio) > target_samples: |
| |
| start = random.randint(0, len(audio) - target_samples) |
| audio = audio[start:start + target_samples] |
| elif len(audio) < target_samples: |
| |
| padding = target_samples - len(audio) |
| audio = np.pad(audio, (0, padding), 'constant') |
|
|
| return torch.tensor(audio, dtype=torch.float32) |
|
|
| def preprocess_text(self, text, tokenizer, max_length=128): |
| """ |
| Advanced text preprocessing |
| """ |
| |
| text = self._clean_text(text) |
|
|
| |
| encoding = tokenizer( |
| text, |
| max_length=max_length, |
| padding='max_length', |
| truncation=True, |
| return_tensors='pt' |
| ) |
|
|
| return encoding |
|
|
| def _check_face_quality(self, face_image): |
| """ |
| Check face image quality |
| """ |
| if isinstance(face_image, np.ndarray): |
| |
| if face_image.shape[0] < 64 or face_image.shape[1] < 64: |
| return False |
|
|
| |
| brightness = np.mean(face_image) |
| if brightness < 30 or brightness > 225: |
| return False |
|
|
| |
| contrast = np.std(face_image) |
| if contrast < 10: |
| return False |
|
|
| return True |
|
|
| def _check_audio_quality(self, audio): |
| """ |
| Check audio quality using SNR |
| """ |
| |
| signal_power = np.mean(audio ** 2) |
| noise_power = np.var(audio - np.convolve(audio, np.ones(100)/100, mode='same')) |
| snr = 10 * np.log10(signal_power / (noise_power + 1e-10)) |
|
|
| return snr >= self.min_audio_snr |
|
|
| def _align_face(self, face_image, landmarks): |
| """ |
| Align face using facial landmarks |
| """ |
| |
| |
| return face_image |
|
|
| def _voice_activity_detection(self, audio, threshold=0.01): |
| """ |
| Simple voice activity detection |
| """ |
| |
| energy = librosa.feature.rms(y=audio, frame_length=1024, hop_length=512)[0] |
|
|
| |
| active_segments = energy > threshold |
|
|
| if np.any(active_segments): |
| |
| active_indices = np.where(active_segments)[0] |
| start_idx = active_indices[0] * 512 |
| end_idx = (active_indices[-1] + 1) * 512 |
| return audio[start_idx:end_idx] |
|
|
| return audio |
|
|
| def _normalize_audio(self, audio): |
| """ |
| Normalize audio amplitude |
| """ |
| |
| max_val = np.max(np.abs(audio)) |
| if max_val > 0: |
| audio = audio / max_val |
|
|
| return audio |
|
|
| def _clean_text(self, text): |
| """ |
| Clean and normalize text |
| """ |
| import re |
|
|
| |
| text = ' '.join(text.split()) |
|
|
| |
| text = re.sub(r'[^\w\s.,!?\'"-]', '', text) |
|
|
| |
| text = text.replace('"', '"').replace('"', '"') |
| text = text.replace(''', "'").replace(''', "'") |
|
|
| return text.lower() |