Challenge_Task / app /utils.py
abdullah0101's picture
Add full application code and deps
3859913
import asyncio
import subprocess
from pathlib import Path
from typing import List
import torchaudio
from yt_dlp import YoutubeDL
import webrtcvad
from .config import AUDIO_CACHE
# ---------------------------------------------------------------------------
# ffmpeg helpers
# ---------------------------------------------------------------------------
def _run(cmd: List[str]):
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode())
# ---------------------------------------------------------------------------
# Video → Audio
# ---------------------------------------------------------------------------
async def download_video(url: str, out_dir: Path) -> Path:
"""Async wrapper around yt‑dlp to pull remote video assets."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"outtmpl": str(out_dir / "download.%(ext)s"),
"format": "bestvideo+bestaudio/best / best",
}
loop = asyncio.get_running_loop()
def _job():
with YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
await loop.run_in_executor(None, _job)
return next(out_dir.glob("download.*"))
async def extract_audio(video_path: Path, wav_path: Path, sr: int = 16000):
cmd = [
"ffmpeg", "-y", "-i", str(video_path),
"-vn", "-ac", "1", "-ar", str(sr), str(wav_path)
]
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _run, cmd)
# ---------------------------------------------------------------------------
# VAD trimming (WebRTC)
# ---------------------------------------------------------------------------
def _frame_gen(frame_ms, pcm16, sr):
n = int(sr * (frame_ms / 1000.0) * 2)
for i in range(0, len(pcm16), n):
yield pcm16[i : i + n]
def trim_silence(wav_path: Path, aggressiveness: int = 3) -> Path:
sig, sr = torchaudio.load(str(wav_path))
sig = sig.squeeze(0).numpy()
vad = webrtcvad.Vad(aggressiveness)
frames = list(_frame_gen(30, (sig * 32768).astype("int16").tobytes(), sr))
voiced = [vad.is_speech(f, sr) for f in frames]
if not any(voiced):
return wav_path
first, last = voiced.index(True), len(voiced) - 1 - voiced[::-1].index(True)
kept = sig[first * 480 : (last + 1) * 480]
out = wav_path.with_name(wav_path.stem + "_trim.wav")
torchaudio.save(str(out), torchaudio.tensor(kept).unsqueeze(0), sr)
return out