Spaces:
Build error
Build error
| """Integration tests for performance and error scenario testing.""" | |
| import time | |
| import pytest | |
| import threading | |
| import queue | |
| import psutil | |
| import os | |
| from unittest.mock import Mock, patch, MagicMock | |
| from typing import List, Dict, Any, Optional | |
| from src.application.services.audio_processing_service import AudioProcessingApplicationService | |
| from src.application.dtos.audio_upload_dto import AudioUploadDto | |
| from src.application.dtos.processing_request_dto import ProcessingRequestDto | |
| from src.application.dtos.processing_result_dto import ProcessingResultDto | |
| from src.infrastructure.config.dependency_container import DependencyContainer | |
| from src.infrastructure.config.app_config import AppConfig | |
| from src.domain.models.audio_content import AudioContent | |
| from src.domain.models.text_content import TextContent | |
| from src.domain.exceptions import ( | |
| SpeechRecognitionException, | |
| TranslationFailedException, | |
| SpeechSynthesisException, | |
| AudioProcessingException, | |
| ProviderNotAvailableException | |
| ) | |
| class TestPerformanceAndErrors: | |
| """Integration tests for performance and error scenarios.""" | |
| def mock_config(self, tmp_path): | |
| """Create mock configuration for testing.""" | |
| config = Mock(spec=AppConfig) | |
| # Processing configuration | |
| config.get_processing_config.return_value = { | |
| 'max_file_size_mb': 100, | |
| 'supported_audio_formats': ['wav', 'mp3', 'flac'], | |
| 'temp_dir': str(tmp_path), | |
| 'cleanup_temp_files': True, | |
| 'processing_timeout': 300, # 5 minutes | |
| 'max_concurrent_requests': 10 | |
| } | |
| # Logging configuration | |
| config.get_logging_config.return_value = { | |
| 'level': 'INFO', | |
| 'enable_file_logging': False, | |
| 'log_file_path': str(tmp_path / 'test.log'), | |
| 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| } | |
| # STT configuration | |
| config.get_stt_config.return_value = { | |
| 'preferred_providers': ['whisper-small', 'whisper-medium', 'parakeet'], | |
| 'provider_timeout': 60.0, | |
| 'max_retries': 2 | |
| } | |
| # TTS configuration | |
| config.get_tts_config.return_value = { | |
| 'preferred_providers': ['kokoro', 'dia', 'cosyvoice2', 'dummy'], | |
| 'provider_timeout': 30.0, | |
| 'max_retries': 3 | |
| } | |
| # Translation configuration | |
| config.get_translation_config.return_value = { | |
| 'provider_timeout': 45.0, | |
| 'max_retries': 2, | |
| 'chunk_size': 512 | |
| } | |
| return config | |
| def mock_container(self, mock_config): | |
| """Create mock dependency container.""" | |
| container = Mock(spec=DependencyContainer) | |
| container.resolve.return_value = mock_config | |
| # Mock providers with configurable behavior | |
| self._setup_mock_providers(container) | |
| return container | |
| def _setup_mock_providers(self, container): | |
| """Setup mock providers with configurable behavior.""" | |
| # Mock STT provider | |
| mock_stt_provider = Mock() | |
| mock_stt_provider.transcribe.return_value = TextContent( | |
| text="Performance test transcription", | |
| language="en" | |
| ) | |
| container.get_stt_provider.return_value = mock_stt_provider | |
| # Mock translation provider | |
| mock_translation_provider = Mock() | |
| mock_translation_provider.translate.return_value = TextContent( | |
| text="Transcripción de prueba de rendimiento", | |
| language="es" | |
| ) | |
| container.get_translation_provider.return_value = mock_translation_provider | |
| # Mock TTS provider | |
| mock_tts_provider = Mock() | |
| mock_tts_provider.synthesize.return_value = AudioContent( | |
| data=b"performance_test_audio_data", | |
| format="wav", | |
| sample_rate=22050, | |
| duration=3.0 | |
| ) | |
| container.get_tts_provider.return_value = mock_tts_provider | |
| def audio_service(self, mock_container, mock_config): | |
| """Create audio processing service.""" | |
| return AudioProcessingApplicationService(mock_container, mock_config) | |
| def sample_request(self): | |
| """Create sample processing request.""" | |
| audio_upload = AudioUploadDto( | |
| filename="performance_test.wav", | |
| content=b"performance_test_audio_data", | |
| content_type="audio/wav", | |
| size=len(b"performance_test_audio_data") | |
| ) | |
| return ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="kokoro", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| def test_processing_time_performance(self, audio_service, sample_request): | |
| """Test processing time performance benchmarks.""" | |
| # Warm up | |
| audio_service.process_audio_pipeline(sample_request) | |
| # Measure processing time | |
| start_time = time.time() | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| end_time = time.time() | |
| processing_time = end_time - start_time | |
| assert result.success is True | |
| assert result.processing_time > 0 | |
| assert result.processing_time <= processing_time + 0.1 # Allow small margin | |
| # Performance benchmark: should complete within reasonable time | |
| assert processing_time < 5.0 # Should complete within 5 seconds for mock providers | |
| def test_memory_usage_performance(self, audio_service, sample_request): | |
| """Test memory usage during processing.""" | |
| process = psutil.Process(os.getpid()) | |
| # Measure initial memory | |
| initial_memory = process.memory_info().rss | |
| # Process multiple requests | |
| for _ in range(10): | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| assert result.success is True | |
| # Measure final memory | |
| final_memory = process.memory_info().rss | |
| memory_increase = final_memory - initial_memory | |
| # Memory increase should be reasonable (less than 100MB for test data) | |
| assert memory_increase < 100 * 1024 * 1024 | |
| def test_concurrent_processing_performance(self, audio_service, sample_request): | |
| """Test performance under concurrent load.""" | |
| num_threads = 5 | |
| results_queue = queue.Queue() | |
| def process_request(): | |
| try: | |
| start_time = time.time() | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| end_time = time.time() | |
| results_queue.put((result, end_time - start_time)) | |
| except Exception as e: | |
| results_queue.put(e) | |
| # Start concurrent processing | |
| threads = [] | |
| start_time = time.time() | |
| for _ in range(num_threads): | |
| thread = threading.Thread(target=process_request) | |
| threads.append(thread) | |
| thread.start() | |
| # Wait for completion | |
| for thread in threads: | |
| thread.join() | |
| total_time = time.time() - start_time | |
| # Collect results | |
| results = [] | |
| processing_times = [] | |
| while not results_queue.empty(): | |
| item = results_queue.get() | |
| if isinstance(item, Exception): | |
| pytest.fail(f"Concurrent processing failed: {item}") | |
| result, proc_time = item | |
| results.append(result) | |
| processing_times.append(proc_time) | |
| # Verify all succeeded | |
| assert len(results) == num_threads | |
| for result in results: | |
| assert result.success is True | |
| # Performance checks | |
| avg_processing_time = sum(processing_times) / len(processing_times) | |
| assert avg_processing_time < 10.0 # Average should be reasonable | |
| assert total_time < 15.0 # Total concurrent time should be reasonable | |
| def test_large_file_performance(self, audio_service): | |
| """Test performance with large audio files.""" | |
| # Create large audio file (10MB) | |
| large_content = b"x" * (10 * 1024 * 1024) | |
| audio_upload = AudioUploadDto( | |
| filename="large_performance_test.wav", | |
| content=large_content, | |
| content_type="audio/wav", | |
| size=len(large_content) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=audio_upload, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="kokoro", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| start_time = time.time() | |
| result = audio_service.process_audio_pipeline(request) | |
| end_time = time.time() | |
| processing_time = end_time - start_time | |
| assert result.success is True | |
| # Large files should still complete within reasonable time | |
| assert processing_time < 30.0 | |
| def test_stt_provider_failure_recovery(self, audio_service, sample_request, mock_container): | |
| """Test recovery from STT provider failures.""" | |
| mock_stt_provider = mock_container.get_stt_provider.return_value | |
| # Mock first call to fail, second to succeed | |
| mock_stt_provider.transcribe.side_effect = [ | |
| SpeechRecognitionException("STT provider temporarily unavailable"), | |
| TextContent(text="Recovered transcription", language="en") | |
| ] | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| assert result.success is True | |
| assert "Recovered transcription" in result.original_text | |
| def test_translation_provider_failure_recovery(self, audio_service, sample_request, mock_container): | |
| """Test recovery from translation provider failures.""" | |
| mock_translation_provider = mock_container.get_translation_provider.return_value | |
| # Mock first call to fail, second to succeed | |
| mock_translation_provider.translate.side_effect = [ | |
| TranslationFailedException("Translation service temporarily unavailable"), | |
| TextContent(text="Traducción recuperada", language="es") | |
| ] | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| assert result.success is True | |
| assert "Traducción recuperada" in result.translated_text | |
| def test_tts_provider_failure_recovery(self, audio_service, sample_request, mock_container): | |
| """Test recovery from TTS provider failures.""" | |
| mock_tts_provider = mock_container.get_tts_provider.return_value | |
| # Mock first call to fail, second to succeed | |
| mock_tts_provider.synthesize.side_effect = [ | |
| SpeechSynthesisException("TTS provider temporarily unavailable"), | |
| AudioContent( | |
| data=b"recovered_audio_data", | |
| format="wav", | |
| sample_rate=22050, | |
| duration=2.5 | |
| ) | |
| ] | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| assert result.success is True | |
| assert result.audio_path is not None | |
| def test_multiple_provider_failures(self, audio_service, sample_request, mock_container): | |
| """Test handling of multiple provider failures.""" | |
| # Mock all providers to fail initially | |
| mock_stt_provider = mock_container.get_stt_provider.return_value | |
| mock_translation_provider = mock_container.get_translation_provider.return_value | |
| mock_tts_provider = mock_container.get_tts_provider.return_value | |
| mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT failed") | |
| mock_translation_provider.translate.side_effect = TranslationFailedException("Translation failed") | |
| mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("TTS failed") | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| assert result.success is False | |
| assert result.error_message is not None | |
| assert result.error_code is not None | |
| def test_timeout_handling(self, audio_service, sample_request, mock_container): | |
| """Test handling of provider timeouts.""" | |
| mock_stt_provider = mock_container.get_stt_provider.return_value | |
| def slow_transcribe(*args, **kwargs): | |
| time.sleep(2.0) # Simulate slow processing | |
| return TextContent(text="Slow transcription", language="en") | |
| mock_stt_provider.transcribe.side_effect = slow_transcribe | |
| start_time = time.time() | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| end_time = time.time() | |
| processing_time = end_time - start_time | |
| # Should complete despite slow provider | |
| assert result.success is True | |
| assert processing_time >= 2.0 # Should include the delay | |
| def test_invalid_input_handling(self, audio_service): | |
| """Test handling of invalid input data.""" | |
| # Test with invalid audio format | |
| invalid_audio = AudioUploadDto( | |
| filename="invalid.xyz", | |
| content=b"invalid_audio_data", | |
| content_type="audio/xyz", | |
| size=len(b"invalid_audio_data") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=invalid_audio, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="kokoro", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is False | |
| assert result.error_code is not None | |
| assert "format" in result.error_message.lower() or "unsupported" in result.error_message.lower() | |
| def test_oversized_file_handling(self, audio_service, mock_config): | |
| """Test handling of oversized files.""" | |
| # Mock config to have small file size limit | |
| mock_config.get_processing_config.return_value['max_file_size_mb'] = 1 | |
| # Create file larger than limit | |
| large_content = b"x" * (2 * 1024 * 1024) # 2MB | |
| oversized_audio = AudioUploadDto( | |
| filename="oversized.wav", | |
| content=large_content, | |
| content_type="audio/wav", | |
| size=len(large_content) | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=oversized_audio, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="kokoro", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| assert result.success is False | |
| assert result.error_code is not None | |
| assert "size" in result.error_message.lower() or "large" in result.error_message.lower() | |
| def test_corrupted_audio_handling(self, audio_service): | |
| """Test handling of corrupted audio data.""" | |
| corrupted_audio = AudioUploadDto( | |
| filename="corrupted.wav", | |
| content=b"corrupted_data_not_audio", | |
| content_type="audio/wav", | |
| size=len(b"corrupted_data_not_audio") | |
| ) | |
| request = ProcessingRequestDto( | |
| audio=corrupted_audio, | |
| asr_model="whisper-small", | |
| target_language="es", | |
| voice="kokoro", | |
| speed=1.0, | |
| requires_translation=True | |
| ) | |
| result = audio_service.process_audio_pipeline(request) | |
| # Should handle gracefully (success depends on implementation) | |
| assert result.error_message is None or "audio" in result.error_message.lower() | |
| def test_network_error_simulation(self, audio_service, sample_request, mock_container): | |
| """Test handling of network-related errors.""" | |
| mock_translation_provider = mock_container.get_translation_provider.return_value | |
| # Simulate network errors | |
| mock_translation_provider.translate.side_effect = [ | |
| ConnectionError("Network connection failed"), | |
| TimeoutError("Request timed out"), | |
| TextContent(text="Network recovered translation", language="es") | |
| ] | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| # Should recover from network errors | |
| assert result.success is True | |
| assert "Network recovered translation" in result.translated_text | |
| def test_resource_exhaustion_handling(self, audio_service, sample_request): | |
| """Test handling of resource exhaustion scenarios.""" | |
| # Simulate memory pressure by processing many requests | |
| results = [] | |
| for i in range(20): # Process many requests | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| results.append(result) | |
| # All should succeed despite resource pressure | |
| assert result.success is True | |
| # Verify all completed successfully | |
| assert len(results) == 20 | |
| for result in results: | |
| assert result.success is True | |
| def test_error_correlation_tracking(self, audio_service, sample_request, mock_container): | |
| """Test error correlation tracking across pipeline stages.""" | |
| mock_stt_provider = mock_container.get_stt_provider.return_value | |
| mock_stt_provider.transcribe.side_effect = SpeechRecognitionException("STT correlation test error") | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| assert result.success is False | |
| assert result.metadata is not None | |
| assert 'correlation_id' in result.metadata | |
| # Verify correlation ID is consistent | |
| correlation_id = result.metadata['correlation_id'] | |
| assert isinstance(correlation_id, str) | |
| assert len(correlation_id) > 0 | |
| def test_graceful_degradation(self, audio_service, sample_request, mock_container): | |
| """Test graceful degradation when some features fail.""" | |
| # Mock translation to fail but allow STT and TTS to succeed | |
| mock_translation_provider = mock_container.get_translation_provider.return_value | |
| mock_translation_provider.translate.side_effect = TranslationFailedException("Translation unavailable") | |
| # Modify request to not require translation | |
| sample_request.requires_translation = False | |
| sample_request.target_language = "en" # Same as source | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| # Should succeed without translation | |
| assert result.success is True | |
| assert result.translated_text is None # No translation performed | |
| def test_circuit_breaker_behavior(self, audio_service, sample_request, mock_container): | |
| """Test circuit breaker behavior under repeated failures.""" | |
| mock_tts_provider = mock_container.get_tts_provider.return_value | |
| # Mock repeated failures to trigger circuit breaker | |
| mock_tts_provider.synthesize.side_effect = SpeechSynthesisException("Repeated TTS failure") | |
| results = [] | |
| for _ in range(5): # Multiple attempts | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| results.append(result) | |
| # All should fail, but circuit breaker should prevent excessive retries | |
| for result in results: | |
| assert result.success is False | |
| assert result.error_code is not None | |
| def test_performance_metrics_collection(self, audio_service, sample_request): | |
| """Test collection of performance metrics.""" | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| assert result.success is True | |
| assert result.processing_time > 0 | |
| assert result.metadata is not None | |
| # Verify performance-related metadata | |
| metadata = result.metadata | |
| assert 'correlation_id' in metadata | |
| assert 'asr_model' in metadata | |
| assert 'target_language' in metadata | |
| assert 'voice' in metadata | |
| def test_stress_testing(self, audio_service, sample_request): | |
| """Test system behavior under stress conditions.""" | |
| num_requests = 50 | |
| results = [] | |
| start_time = time.time() | |
| for i in range(num_requests): | |
| result = audio_service.process_audio_pipeline(sample_request) | |
| results.append(result) | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| # Verify all requests completed | |
| assert len(results) == num_requests | |
| # Calculate success rate | |
| successful_results = [r for r in results if r.success] | |
| success_rate = len(successful_results) / len(results) | |
| # Should maintain high success rate under stress | |
| assert success_rate >= 0.95 # At least 95% success rate | |
| # Performance should remain reasonable | |
| avg_time_per_request = total_time / num_requests | |
| assert avg_time_per_request < 1.0 # Average less than 1 second per request |