Spaces:
Running
Running
| # Audio chunking/splitting/merging logic | |
| import shlex | |
| import subprocess | |
| from typing import List | |
| from app.core.audio_utils import get_audio_info, make_temp_path | |
| import soundfile as sf | |
| import numpy as np | |
| # optional webrtcvad for speech-based splitting | |
| try: | |
| import webrtcvad | |
| _HAS_VAD = True | |
| except Exception: | |
| _HAS_VAD = False | |
| def ffmpeg_extract_segment(src: str, start: float, duration: float, dst: str): | |
| """ | |
| Extract segment [start, start+duration) using ffmpeg into dst (wav 16k mono pcm16). | |
| """ | |
| cmd = f'ffmpeg -v error -y -ss {start:.3f} -i "{src}" -t {duration:.3f} -ar 16000 -ac 1 -acodec pcm_s16le "{dst}"' | |
| proc = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"ffmpeg extract failed: {proc.stderr.decode(errors='ignore')}") | |
| return dst | |
| def split_audio_to_chunks(src_wav: str, chunk_length_s: float = 30.0, overlap_s: float = 5.0) -> List[str]: | |
| info = get_audio_info(src_wav) | |
| if not info: | |
| raise RuntimeError("Cannot read audio info") | |
| duration = info["duration"] | |
| step = chunk_length_s - overlap_s | |
| if step <= 0: | |
| raise ValueError("chunk_length_s must be > overlap_s") | |
| starts = [] | |
| t = 0.0 | |
| while t < duration: | |
| starts.append(t) | |
| t += step | |
| chunks = [] | |
| for i, s in enumerate(starts): | |
| chunk_path = make_temp_path(suffix=f"_chunk{i}.wav") | |
| ffmpeg_extract_segment(src_wav, s, min(chunk_length_s, duration - s), chunk_path) | |
| chunks.append(chunk_path) | |
| return chunks | |
| def split_audio_with_vad( | |
| src_wav: str, | |
| aggressiveness: int = 2, | |
| frame_ms: int = 30, | |
| padding_ms: int = 300, | |
| ) -> List[str]: | |
| """ | |
| Split audio using webrtcvad speech detection. Returns list of chunk file paths. | |
| Falls back to fixed-window splitting if webrtcvad is not available or audio not 16k mono. | |
| """ | |
| if not _HAS_VAD: | |
| return split_audio_to_chunks(src_wav) | |
| info = get_audio_info(src_wav) | |
| if not info: | |
| raise RuntimeError("Cannot read audio info for VAD split") | |
| sr = int(info.get("samplerate", 0)) | |
| channels = int(info.get("channels", 0)) | |
| if sr != 16000 or channels != 1: | |
| # require 16k mono for webrtcvad reliability; fallback | |
| return split_audio_to_chunks(src_wav) | |
| # read PCM samples | |
| data, _ = sf.read(src_wav, dtype="int16") | |
| if data.ndim > 1: | |
| data = data[:, 0] | |
| pcm_bytes = data.tobytes() | |
| vad = webrtcvad.Vad(aggressiveness) | |
| frame_size = int(sr * frame_ms / 1000) # samples per frame | |
| frame_bytes = frame_size * 2 | |
| total_frames = (len(pcm_bytes) + frame_bytes - 1) // frame_bytes | |
| speech_frames = [] | |
| for i in range(total_frames): | |
| start = i * frame_bytes | |
| end = start + frame_bytes | |
| frame = pcm_bytes[start:end] | |
| if len(frame) < frame_bytes: | |
| # pad last frame | |
| frame = frame.ljust(frame_bytes, b"\x00") | |
| is_speech = False | |
| try: | |
| is_speech = vad.is_speech(frame, sr) | |
| except Exception: | |
| is_speech = False | |
| speech_frames.append(bool(is_speech)) | |
| # group contiguous speech frames into segments | |
| segments = [] | |
| in_speech = False | |
| seg_start = 0 | |
| for idx, val in enumerate(speech_frames): | |
| if val and not in_speech: | |
| in_speech = True | |
| seg_start = idx | |
| elif not val and in_speech: | |
| in_speech = False | |
| seg_end = idx - 1 | |
| segments.append((seg_start, seg_end)) | |
| if in_speech: | |
| segments.append((seg_start, len(speech_frames) - 1)) | |
| # merge segments if gap smaller than padding_ms | |
| merged = [] | |
| pad_frames = int(padding_ms / frame_ms) | |
| for seg in segments: | |
| if not merged: | |
| merged.append(seg) | |
| continue | |
| prev = merged[-1] | |
| if seg[0] - prev[1] <= pad_frames: | |
| merged[-1] = (prev[0], seg[1]) | |
| else: | |
| merged.append(seg) | |
| # convert frame indices to times and extract with ffmpeg | |
| chunks = [] | |
| for i, (s_idx, e_idx) in enumerate(merged): | |
| start_s = s_idx * frame_ms / 1000.0 | |
| dur = (e_idx - s_idx + 1) * frame_ms / 1000.0 | |
| chunk_path = make_temp_path(suffix=f"_vad_chunk{i}.wav") | |
| ffmpeg_extract_segment(src_wav, start_s, dur, chunk_path) | |
| chunks.append(chunk_path) | |
| # If VAD found nothing, fallback to fixed windows | |
| if not chunks: | |
| return split_audio_to_chunks(src_wav) | |
| return chunks | |