Spaces:
				
			
			
	
			
			
		Paused
		
	
	
	
			
			
	
	
	
	
		
		
		Paused
		
	File size: 5,885 Bytes
			
			| b0a4866 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | """
Google Cloud Speech-to-Text Implementation
"""
import os
import asyncio
from typing import AsyncIterator, Optional, List
from google.cloud import speech_v1p1beta1 as speech
from google.api_core import exceptions
from utils import log
from stt_interface import STTInterface, STTConfig, TranscriptionResult
class GoogleCloudSTT(STTInterface):
    """Google Cloud Speech-to-Text implementation"""
    
    def __init__(self, credentials_path: str):
        if credentials_path and os.path.exists(credentials_path):
            os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
            log(f"β
 Google credentials set from: {credentials_path}")
        else:
            log("β οΈ Google credentials path not found, using default credentials")
            
        self.client = speech.SpeechAsyncClient()
        self.streaming_config = None
        self.is_streaming = False
        self.audio_queue = asyncio.Queue()
        
    async def start_streaming(self, config: STTConfig) -> None:
        """Initialize streaming session"""
        try:
            recognition_config = speech.RecognitionConfig(
                encoding=self._get_encoding(config.encoding),
                sample_rate_hertz=config.sample_rate,
                language_code=config.language,
                enable_automatic_punctuation=config.enable_punctuation,
                enable_word_time_offsets=config.enable_word_timestamps,
                model=config.model,
                use_enhanced=config.use_enhanced,
                metadata=speech.RecognitionMetadata(
                    interaction_type=speech.RecognitionMetadata.InteractionType.VOICE_SEARCH,
                    recording_device_type=speech.RecognitionMetadata.RecordingDeviceType.PC,
                    audio_topic="general"
                )
            )
            
            self.streaming_config = speech.StreamingRecognitionConfig(
                config=recognition_config,
                interim_results=config.interim_results,
                single_utterance=config.single_utterance
            )
            
            self.is_streaming = True
            log("β
 Google STT streaming session started")
            
        except Exception as e:
            log(f"β Failed to start Google STT streaming: {e}")
            raise
            
    async def stream_audio(self, audio_chunk: bytes) -> AsyncIterator[TranscriptionResult]:
        """Stream audio chunk and get transcription results"""
        if not self.is_streaming:
            log("β οΈ STT streaming not started")
            return
            
        try:
            # Add audio chunk to queue
            await self.audio_queue.put(audio_chunk)
            
            # Process audio stream
            async def audio_generator():
                while self.is_streaming:
                    chunk = await self.audio_queue.get()
                    yield speech.StreamingRecognizeRequest(audio_content=chunk)
                    
            # Get responses
            responses = await self.client.streaming_recognize(
                self.streaming_config,
                audio_generator()
            )
            
            async for response in responses:
                for result in response.results:
                    if result.alternatives:
                        yield TranscriptionResult(
                            text=result.alternatives[0].transcript,
                            is_final=result.is_final,
                            confidence=result.alternatives[0].confidence,
                            timestamp=asyncio.get_event_loop().time()
                        )
                        
        except exceptions.OutOfRange:
            log("β οΈ Google STT: Exceeded maximum audio duration")
            self.is_streaming = False
        except Exception as e:
            log(f"β Google STT streaming error: {e}")
            raise
            
    async def stop_streaming(self) -> Optional[TranscriptionResult]:
        """Stop streaming and get final result"""
        self.is_streaming = False
        log("π Google STT streaming stopped")
        
        # Process any remaining audio in queue
        if not self.audio_queue.empty():
            # TODO: Process remaining audio
            pass
            
        return None
        
    def supports_realtime(self) -> bool:
        """Google Cloud Speech supports real-time streaming"""
        return True
        
    def get_supported_languages(self) -> List[str]:
        """Get list of supported language codes"""
        return [
            "tr-TR",  # Turkish
            "en-US",  # English (US)
            "en-GB",  # English (UK)
            "de-DE",  # German
            "fr-FR",  # French
            "es-ES",  # Spanish
            "it-IT",  # Italian
            "pt-BR",  # Portuguese (Brazil)
            "ru-RU",  # Russian
            "ja-JP",  # Japanese
            "ko-KR",  # Korean
            "zh-CN",  # Chinese (Simplified)
        ]
        
    def _get_encoding(self, encoding: str):
        """Convert encoding string to Google Cloud Speech encoding"""
        encoding_map = {
            "LINEAR16": speech.RecognitionConfig.AudioEncoding.LINEAR16,
            "FLAC": speech.RecognitionConfig.AudioEncoding.FLAC,
            "MULAW": speech.RecognitionConfig.AudioEncoding.MULAW,
            "AMR": speech.RecognitionConfig.AudioEncoding.AMR,
            "AMR_WB": speech.RecognitionConfig.AudioEncoding.AMR_WB,
            "OGG_OPUS": speech.RecognitionConfig.AudioEncoding.OGG_OPUS,
            "SPEEX_WITH_HEADER_BYTE": speech.RecognitionConfig.AudioEncoding.SPEEX_WITH_HEADER_BYTE,
            "WEBM_OPUS": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS,
        }
        return encoding_map.get(encoding, speech.RecognitionConfig.AudioEncoding.WEBM_OPUS) | 
