Spaces:
Running
on
Zero
Running
on
Zero
| from app.logger_config import logger as logging | |
| from fastrtc.utils import AdditionalOutputs | |
| from pydub import AudioSegment | |
| import asyncio | |
| import os | |
| import time | |
| import numpy as np | |
| # -------------------------------------------------------- | |
| # Utility functions | |
| # -------------------------------------------------------- | |
| def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict): | |
| """ | |
| Read an audio file and stream it chunk by chunk (1s per chunk). | |
| Handles errors safely and reports structured messages to the client. | |
| """ | |
| if not session_id: | |
| yield from handle_stream_error("unknown", "No session_id provided.", stop_streaming_flags) | |
| return | |
| if not filepath_to_stream or not os.path.exists(filepath_to_stream): | |
| yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags) | |
| return | |
| try: | |
| segment = AudioSegment.from_file(filepath_to_stream) | |
| chunk_duration_ms = 1000 | |
| total_chunks = len(segment) // chunk_duration_ms + 1 | |
| logging.info(f"[{session_id}] Starting audio streaming ({total_chunks} chunks).") | |
| for i, chunk in enumerate(segment[::chunk_duration_ms]): | |
| if _is_stop_requested(stop_streaming_flags): | |
| logging.info(f"[{session_id}] Stop signal received. Terminating stream.") | |
| break | |
| frame_rate = chunk.frame_rate | |
| samples = np.array(chunk.get_array_of_samples()).reshape(1, -1) | |
| progress = round(((i + 1) / total_chunks) * 100, 2) | |
| yield ((frame_rate, samples), AdditionalOutputs(progress)) | |
| logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).") | |
| time.sleep(1) | |
| # raise_function() # Optional injected test exception | |
| logging.info(f"[{session_id}] Audio streaming completed successfully.") | |
| except asyncio.CancelledError: | |
| yield from handle_stream_error(session_id, "Streaming cancelled by user.", stop_streaming_flags) | |
| except FileNotFoundError as e: | |
| yield from handle_stream_error(session_id, e, stop_streaming_flags) | |
| except Exception as e: | |
| yield from handle_stream_error(session_id, e, stop_streaming_flags) | |
| finally: | |
| if isinstance(stop_streaming_flags, dict): | |
| stop_streaming_flags["stop"] = False | |
| logging.info(f"[{session_id}] Stop flag reset.") | |
| yield (None, AdditionalOutputs("STREAM_DONE")) | |
| def handle_stream_error(session_id: str, error: Exception | str, stop_streaming_flags: dict | None = None): | |
| """ | |
| Handle streaming errors: | |
| - Log the error | |
| - Send structured info to client | |
| - Reset stop flag | |
| """ | |
| if isinstance(error, Exception): | |
| msg = f"{type(error).__name__}: {str(error)}" | |
| else: | |
| msg = str(error) | |
| logging.error(f"[{session_id}] Streaming error: {msg}", exc_info=isinstance(error, Exception)) | |
| if isinstance(stop_streaming_flags, dict): | |
| stop_streaming_flags["stop"] = False | |
| yield (None, AdditionalOutputs({"error": True, "message": msg})) | |
| yield (None, AdditionalOutputs("STREAM_DONE")) | |
| def _is_stop_requested(stop_streaming_flags: dict) -> bool: | |
| """Check if the stop signal was requested.""" | |
| if not isinstance(stop_streaming_flags, dict): | |
| return False | |
| return bool(stop_streaming_flags.get("stop", False)) | |
| def stop_streaming(session_id: str, stop_streaming_flags: dict): | |
| """Trigger the stop flag for active streaming.""" | |
| logging.info(f"[{session_id}] Stop button clicked — sending stop signal.") | |
| if not isinstance(stop_streaming_flags, dict): | |
| stop_streaming_flags = {"stop": True} | |
| else: | |
| stop_streaming_flags["stop"] = True | |
| return stop_streaming_flags | |