Spaces:
Runtime error
Runtime error
""" | |
Unit Tests for TTS Pipeline Components | |
====================================== | |
Comprehensive test suite for the optimized TTS system. | |
""" | |
import unittest | |
import numpy as np | |
import tempfile | |
import os | |
import sys | |
from unittest.mock import Mock, patch, MagicMock | |
# Add src to path | |
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src')) | |
from src.preprocessing import TextProcessor | |
from src.audio_processing import AudioProcessor | |
class TestTextProcessor(unittest.TestCase): | |
"""Test cases for text preprocessing functionality.""" | |
def setUp(self): | |
"""Set up test fixtures.""" | |
self.processor = TextProcessor(max_chunk_length=100, overlap_words=3) | |
def test_empty_text_processing(self): | |
"""Test handling of empty text.""" | |
result = self.processor.process_text("") | |
self.assertEqual(result, "") | |
result = self.processor.process_text(None) | |
self.assertEqual(result, "") | |
def test_number_conversion_cache(self): | |
"""Test number conversion with caching.""" | |
# First call should populate cache | |
result1 = self.processor._convert_number_to_armenian_words(42) | |
# Second call should use cache | |
result2 = self.processor._convert_number_to_armenian_words(42) | |
self.assertEqual(result1, result2) | |
self.assertIn("42", self.processor.number_cache) | |
def test_text_chunking_short_text(self): | |
"""Test chunking behavior with short text.""" | |
short_text = "Կարճ տեքստ:" | |
chunks = self.processor.chunk_text(short_text) | |
self.assertEqual(len(chunks), 1) | |
self.assertEqual(chunks[0], short_text) | |
def test_text_chunking_long_text(self): | |
"""Test chunking behavior with long text.""" | |
long_text = "Այս շատ երկար տեքստ է, որը պետք է բաժանվի մի քանի մասի: " * 5 | |
chunks = self.processor.chunk_text(long_text) | |
self.assertGreater(len(chunks), 1) | |
# Check that each chunk is within limits | |
for chunk in chunks: | |
self.assertLessEqual(len(chunk), self.processor.max_chunk_length + 50) # Some tolerance | |
def test_sentence_splitting(self): | |
"""Test sentence splitting functionality.""" | |
text = "Առաջին նախադասություն: Երկրորդ նախադասություն! Երրորդ նախադասություն?" | |
sentences = self.processor._split_into_sentences(text) | |
self.assertEqual(len(sentences), 3) | |
self.assertIn("Առաջին նախադասություն", sentences[0]) | |
def test_overlap_addition(self): | |
"""Test overlap addition between chunks.""" | |
chunks = ["Առաջին մաս շատ կարևոր է", "Երկրորդ մասը նույնպես կարևոր"] | |
overlapped = self.processor._add_overlap(chunks) | |
self.assertEqual(len(overlapped), 2) | |
# Second chunk should contain words from first | |
self.assertIn("կարևոր", overlapped[1]) | |
def test_cache_clearing(self): | |
"""Test cache clearing functionality.""" | |
# Add some data to caches | |
self.processor.number_cache["test"] = "test_value" | |
self.processor._cached_translate("test") | |
# Clear caches | |
self.processor.clear_cache() | |
self.assertEqual(len(self.processor.number_cache), 0) | |
def test_cache_stats(self): | |
"""Test cache statistics functionality.""" | |
stats = self.processor.get_cache_stats() | |
self.assertIn("translation_cache_size", stats) | |
self.assertIn("number_cache_size", stats) | |
self.assertIn("lru_cache_hits", stats) | |
self.assertIn("lru_cache_misses", stats) | |
class TestAudioProcessor(unittest.TestCase): | |
"""Test cases for audio processing functionality.""" | |
def setUp(self): | |
"""Set up test fixtures.""" | |
self.processor = AudioProcessor( | |
crossfade_duration=0.1, | |
sample_rate=16000, | |
apply_noise_gate=True, | |
normalize_audio=True | |
) | |
def test_empty_audio_processing(self): | |
"""Test handling of empty audio.""" | |
empty_audio = np.array([], dtype=np.int16) | |
result = self.processor.process_audio(empty_audio) | |
self.assertEqual(len(result), 0) | |
self.assertEqual(result.dtype, np.int16) | |
def test_audio_normalization(self): | |
"""Test audio normalization.""" | |
# Create test audio with known peak | |
test_audio = np.array([1000, -2000, 3000, -1500], dtype=np.int16) | |
normalized = self.processor._normalize_audio(test_audio) | |
# Peak should be close to target | |
peak = np.max(np.abs(normalized)) | |
expected_peak = 0.95 * 32767 | |
self.assertAlmostEqual(peak, expected_peak, delta=100) | |
def test_crossfade_window_creation(self): | |
"""Test crossfade window creation.""" | |
length = 100 | |
fade_out, fade_in = self.processor._create_crossfade_window(length) | |
self.assertEqual(len(fade_out), length) | |
self.assertEqual(len(fade_in), length) | |
# Windows should sum to approximately 1 | |
window_sum = fade_out + fade_in | |
np.testing.assert_allclose(window_sum, 1.0, atol=0.01) | |
def test_single_segment_crossfade(self): | |
"""Test crossfading with single audio segment.""" | |
audio = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
result = self.processor.crossfade_audio_segments([audio]) | |
np.testing.assert_array_equal(result, audio) | |
def test_multiple_segment_crossfade(self): | |
"""Test crossfading with multiple audio segments.""" | |
segment1 = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
segment2 = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
result = self.processor.crossfade_audio_segments([segment1, segment2]) | |
# Result should be longer than either segment but shorter than sum | |
self.assertGreater(len(result), len(segment1)) | |
self.assertLess(len(result), len(segment1) + len(segment2)) | |
def test_silence_addition(self): | |
"""Test silence padding.""" | |
audio = np.random.randint(-1000, 1000, 1000, dtype=np.int16) | |
padded = self.processor.add_silence(audio, start_silence=0.1, end_silence=0.1) | |
expected_padding = int(0.1 * self.processor.sample_rate) | |
expected_length = len(audio) + 2 * expected_padding | |
self.assertEqual(len(padded), expected_length) | |
# Start and end should be silent | |
self.assertTrue(np.all(padded[:expected_padding] == 0)) | |
self.assertTrue(np.all(padded[-expected_padding:] == 0)) | |
def test_audio_stats(self): | |
"""Test audio statistics calculation.""" | |
# Create test audio | |
audio = np.random.randint(-10000, 10000, 16000, dtype=np.int16) # 1 second | |
stats = self.processor.get_audio_stats(audio) | |
self.assertAlmostEqual(stats["duration_seconds"], 1.0, places=2) | |
self.assertEqual(stats["sample_count"], 16000) | |
self.assertIn("peak_amplitude", stats) | |
self.assertIn("rms_level", stats) | |
self.assertIn("dynamic_range_db", stats) | |
def test_empty_audio_stats(self): | |
"""Test statistics for empty audio.""" | |
empty_audio = np.array([], dtype=np.int16) | |
stats = self.processor.get_audio_stats(empty_audio) | |
self.assertIn("error", stats) | |
def test_process_and_concatenate(self): | |
"""Test full processing and concatenation pipeline.""" | |
segments = [ | |
np.random.randint(-1000, 1000, 500, dtype=np.int16), | |
np.random.randint(-1000, 1000, 600, dtype=np.int16), | |
np.random.randint(-1000, 1000, 700, dtype=np.int16) | |
] | |
result = self.processor.process_and_concatenate(segments) | |
self.assertGreater(len(result), 0) | |
self.assertEqual(result.dtype, np.int16) | |
class TestModelIntegration(unittest.TestCase): | |
"""Integration tests for model components.""" | |
def setUp(self): | |
"""Set up mock components for testing.""" | |
self.mock_processor = Mock() | |
self.mock_model = Mock() | |
self.mock_vocoder = Mock() | |
def test_model_initialization_mocked(self, mock_np, mock_torch, | |
mock_vocoder_class, mock_model_class, | |
mock_processor_class): | |
"""Test model initialization with mocked dependencies.""" | |
# Configure mocks | |
mock_torch.cuda.is_available.return_value = False | |
mock_torch.device.return_value = Mock() | |
mock_processor_instance = Mock() | |
mock_processor_class.from_pretrained.return_value = mock_processor_instance | |
mock_model_instance = Mock() | |
mock_model_class.from_pretrained.return_value = mock_model_instance | |
mock_vocoder_instance = Mock() | |
mock_vocoder_class.from_pretrained.return_value = mock_vocoder_instance | |
# Create temporary numpy file | |
with tempfile.NamedTemporaryFile(suffix='.npy', delete=False) as tmp: | |
test_embedding = np.random.rand(512).astype(np.float32) | |
np.save(tmp.name, test_embedding) | |
tmp_path = tmp.name | |
try: | |
# This would normally import and test OptimizedTTSModel | |
# But since we're testing in isolation, we'll verify the mocks were called | |
mock_processor_class.from_pretrained.assert_called_once() | |
mock_model_class.from_pretrained.assert_called_once() | |
mock_vocoder_class.from_pretrained.assert_called_once() | |
finally: | |
# Clean up temporary file | |
if os.path.exists(tmp_path): | |
os.unlink(tmp_path) | |
class TestPipelineIntegration(unittest.TestCase): | |
"""Integration tests for the complete pipeline.""" | |
def test_empty_text_handling(self): | |
"""Test pipeline handling of empty text.""" | |
# This would test the actual pipeline with mocked components | |
# For now, we test the concept | |
text = "" | |
expected_output = (16000, np.zeros(0, dtype=np.int16)) | |
# Mock pipeline behavior | |
if not text.strip(): | |
result = expected_output | |
self.assertEqual(result[0], 16000) | |
self.assertEqual(len(result[1]), 0) | |
def test_chunking_decision_logic(self): | |
"""Test the logic for deciding when to use chunking.""" | |
max_chunk_length = 200 | |
short_text = "Կարճ տեքստ" | |
long_text = "a" * 300 # Longer than max_chunk_length | |
should_chunk_short = len(short_text) > max_chunk_length | |
should_chunk_long = len(long_text) > max_chunk_length | |
self.assertFalse(should_chunk_short) | |
self.assertTrue(should_chunk_long) | |
def run_performance_benchmark(): | |
"""Run basic performance benchmarks.""" | |
print("\n" + "="*50) | |
print("PERFORMANCE BENCHMARK") | |
print("="*50) | |
# Text processing benchmark | |
processor = TextProcessor() | |
test_texts = [ | |
"Կարճ տեքստ", | |
"Միջին երկարության տեքստ, որը պարունակում է մի քանի բառ և թվեր 123:", | |
"Շատ երկար տեքստ, որը կրկնվում է " * 20 | |
] | |
for i, text in enumerate(test_texts): | |
import time | |
start = time.time() | |
processed = processor.process_text(text) | |
chunks = processor.chunk_text(processed) | |
end = time.time() | |
print(f"Text {i+1}: {len(text)} chars → {len(chunks)} chunks in {end-start:.4f}s") | |
# Audio processing benchmark | |
audio_processor = AudioProcessor() | |
test_segments = [ | |
np.random.randint(-10000, 10000, 16000, dtype=np.int16), # 1 second | |
np.random.randint(-10000, 10000, 32000, dtype=np.int16), # 2 seconds | |
np.random.randint(-10000, 10000, 80000, dtype=np.int16), # 5 seconds | |
] | |
for i, segment in enumerate(test_segments): | |
import time | |
start = time.time() | |
processed = audio_processor.process_audio(segment) | |
end = time.time() | |
duration = len(segment) / 16000 | |
print(f"Audio {i+1}: {duration:.1f}s processed in {end-start:.4f}s") | |
if __name__ == "__main__": | |
# Run unit tests | |
print("Running Unit Tests...") | |
unittest.main(argv=[''], exit=False, verbosity=2) | |
# Run performance benchmark | |
run_performance_benchmark() | |