videolink-to-transcript / transcriber.py
algorembrant's picture
Upload 4 files
04850df verified
#!/usr/bin/env python3
"""
Universal Media Transcriber
Supports: YouTube, YouTube Music, Spotify, Direct Audio/Video URLs
Blazing fast: uses native captions when available, falls back to faster-whisper
"""
import os
import sys
import re
import json
import time
import shutil
import hashlib
import argparse
import tempfile
import subprocess
from pathlib import Path
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, parse_qs
# Add current directory to PATH so local ffmpeg.exe can be found
script_dir = str(Path(__file__).parent.absolute())
if script_dir not in os.environ["PATH"]:
os.environ["PATH"] = script_dir + os.pathsep + os.environ["PATH"]
# ─────────────────────────────────────────────────────────────────────────────
# AUTO-INSTALLER β€” installs missing deps silently on first run
# ─────────────────────────────────────────────────────────────────────────────
REQUIRED = {
"yt_dlp": "yt-dlp",
"youtube_transcript_api": "youtube-transcript-api",
"faster_whisper": "faster-whisper",
"rich": "rich",
"spotdl": "spotdl",
"requests": "requests",
}
def ensure_deps():
missing = []
for module, pkg in REQUIRED.items():
try:
__import__(module)
except ImportError:
missing.append(pkg)
if missing:
print(f"[setup] Installing: {', '.join(missing)} ...")
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "--quiet", "--break-system-packages"] + missing
)
print("[setup] Done. Reloading...\n")
ensure_deps()
# ─────────────────────────────────────────────────────────────────────────────
# IMPORTS (after install)
# ─────────────────────────────────────────────────────────────────────────────
import yt_dlp
import requests
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
from rich.panel import Panel
from rich.table import Table
from rich import print as rprint
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
console = Console()
# ─────────────────────────────────────────────────────────────────────────────
# CONSTANTS & CONFIG
# ─────────────────────────────────────────────────────────────────────────────
WHISPER_MODEL = "base" # tiny / base / small / medium / large-v3
WHISPER_DEVICE = "auto" # auto / cpu / cuda
WHISPER_THREADS = os.cpu_count() # use all cores
AUDIO_FORMAT = "mp3"
MAX_WORKERS = 4 # parallel jobs
CACHE_DIR = Path.home() / ".transcriber_cache"
CACHE_DIR.mkdir(exist_ok=True)
# Language preference order for YouTube captions
LANG_PREF = ["en", "en-US", "en-GB", "en-AU", "en-CA", "en-IN", "en-IE", "en-NZ", "en-PH", "en-ZA", "en-orig", "a.en"]
# ─────────────────────────────────────────────────────────────────────────────
# URL DETECTION
# ─────────────────────────────────────────────────────────────────────────────
def detect_source(url: str) -> str:
"""Returns: youtube | youtube_music | spotify | audio | unknown"""
parsed = urlparse(url)
host = parsed.netloc.lower().replace("www.", "")
if host in ("youtube.com", "youtu.be", "m.youtube.com"):
return "youtube"
if host in ("music.youtube.com",):
return "youtube_music"
if host in ("open.spotify.com", "spotify.com"):
return "spotify"
if any(url.lower().endswith(ext) for ext in [
".mp3", ".mp4", ".wav", ".ogg", ".flac", ".m4a", ".webm",
".aac", ".opus", ".mkv", ".avi", ".mov"
]):
return "audio"
# Try to detect by content-type via HEAD
try:
r = requests.head(url, timeout=5, allow_redirects=True)
ct = r.headers.get("content-type", "")
if "audio" in ct or "video" in ct:
return "audio"
except Exception:
pass
return "unknown"
def extract_youtube_id(url: str) -> str | None:
"""Extract video ID from any YouTube URL format."""
patterns = [
r"(?:v=|youtu\.be/|embed/|shorts/)([A-Za-z0-9_-]{11})",
]
for p in patterns:
m = re.search(p, url)
if m:
return m.group(1)
return None
def extract_spotify_type(url: str) -> tuple[str, str]:
"""Returns (type, id) e.g. ('track', 'abc123')"""
m = re.search(r"spotify\.com/(track|album|playlist|episode|show)/([A-Za-z0-9]+)", url)
if m:
return m.group(1), m.group(2)
return "unknown", ""
# ─────────────────────────────────────────────────────────────────────────────
# CACHE
# ─────────────────────────────────────────────────────────────────────────────
def cache_key(url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()
def cache_get(url: str) -> str | None:
path = CACHE_DIR / f"{cache_key(url)}.txt"
if path.exists():
return path.read_text(encoding="utf-8")
return None
def cache_set(url: str, text: str):
path = CACHE_DIR / f"{cache_key(url)}.txt"
path.write_text(text, encoding="utf-8")
# ─────────────────────────────────────────────────────────────────────────────
# WHISPER ENGINE (lazy-loaded, singleton)
# ─────────────────────────────────────────────────────────────────────────────
_whisper_model = None
def get_whisper():
global _whisper_model
if _whisper_model is None:
from faster_whisper import WhisperModel
device = WHISPER_DEVICE
if device == "auto":
try:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
except ImportError:
device = "cpu"
console.log(f"[cyan]Loading Whisper [{WHISPER_MODEL}] on {device}...[/cyan]")
compute = "float16" if device == "cuda" else "int8"
_whisper_model = WhisperModel(WHISPER_MODEL, device=device, compute_type=compute,
num_workers=WHISPER_THREADS, cpu_threads=WHISPER_THREADS)
return _whisper_model
def transcribe_audio_file(audio_path: str, lang: str = None) -> str:
"""Transcribe a local audio file with faster-whisper. Returns full transcript text."""
model = get_whisper()
opts = dict(beam_size=5, word_timestamps=False, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500))
if lang:
opts["language"] = lang
segments, info = model.transcribe(audio_path, **opts)
lines = []
for seg in segments:
ts = str(timedelta(seconds=int(seg.start))).zfill(8)
lines.append(f"[{ts}] {seg.text.strip()}")
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────────────────────
# YOUTUBE / YOUTUBE MUSIC β€” caption-first, whisper fallback
# ─────────────────────────────────────────────────────────────────────────────
def fetch_youtube_captions(video_id: str) -> str | None:
"""Try to get native captions (instant, no download)."""
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# prefer manual > auto-generated, english first
transcript = None
for lang in LANG_PREF:
try:
transcript = transcript_list.find_transcript([lang])
break
except Exception:
pass
if transcript is None:
# grab whatever is first
transcript = next(iter(transcript_list))
entries = transcript.fetch()
lines = []
for e in entries:
ts = str(timedelta(seconds=int(e["start"]))).zfill(8)
lines.append(f"[{ts}] {e['text'].strip()}")
return "\n".join(lines)
except (TranscriptsDisabled, NoTranscriptFound):
return None
except Exception as exc:
console.log(f"[yellow]Caption fetch warning: {exc}[/yellow]")
return None
def download_audio_yt(url: str, out_dir: str) -> str:
"""Download audio from YouTube/YouTube Music using yt-dlp. Returns file path."""
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": os.path.join(out_dir, "%(id)s.%(ext)s"),
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": AUDIO_FORMAT,
"preferredquality": "128",
}],
"quiet": True,
"no_warnings": True,
"concurrent_fragment_downloads": 8,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
video_id = info.get("id", "audio")
return os.path.join(out_dir, f"{video_id}.{AUDIO_FORMAT}")
def get_video_metadata(url: str) -> dict:
"""Get title, uploader, duration without downloading."""
ydl_opts = {"quiet": True, "no_warnings": True, "skip_download": True}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
"title": info.get("title", "Unknown"),
"uploader": info.get("uploader", "Unknown"),
"duration": info.get("duration", 0),
"description": info.get("description", ""),
"upload_date": info.get("upload_date", ""),
}
except Exception:
return {"title": "Unknown", "uploader": "Unknown", "duration": 0}
def transcribe_youtube(url: str, force_whisper: bool = False) -> dict:
"""Full pipeline for YouTube / YouTube Music."""
video_id = extract_youtube_id(url) or "unknown"
meta = get_video_metadata(url)
transcript_text = None
method = "unknown"
if not force_whisper:
console.log(f"[cyan]Trying native captions for[/cyan] [bold]{meta['title']}[/bold]")
transcript_text = fetch_youtube_captions(video_id)
if transcript_text:
method = "native_captions"
console.log("[green]βœ“ Got captions instantly (no download needed)[/green]")
if transcript_text is None:
console.log("[yellow]No captions β†’ downloading audio for Whisper...[/yellow]")
with tempfile.TemporaryDirectory() as tmpdir:
audio_path = download_audio_yt(url, tmpdir)
console.log(f"[cyan]Transcribing with Whisper [{WHISPER_MODEL}]...[/cyan]")
transcript_text = transcribe_audio_file(audio_path)
method = f"whisper_{WHISPER_MODEL}"
return {
"url": url,
"source": "youtube",
"method": method,
"meta": meta,
"transcript": transcript_text,
}
# ─────────────────────────────────────────────────────────────────────────────
# SPOTIFY
# ─────────────────────────────────────────────────────────────────────────────
def transcribe_spotify(url: str) -> dict:
"""Download Spotify track/episode then transcribe."""
sp_type, sp_id = extract_spotify_type(url)
# Podcasts/episodes: yt-dlp can handle them sometimes
if sp_type == "episode":
console.log("[cyan]Spotify episode β€” trying yt-dlp...[/cyan]")
try:
with tempfile.TemporaryDirectory() as tmpdir:
audio_path = download_audio_yt(url, tmpdir)
meta = get_video_metadata(url)
transcript_text = transcribe_audio_file(audio_path)
return {
"url": url,
"source": "spotify_episode",
"method": f"whisper_{WHISPER_MODEL}",
"meta": meta,
"transcript": transcript_text,
}
except Exception as e:
console.log(f"[yellow]yt-dlp failed for Spotify episode: {e}[/yellow]")
# Music tracks / albums / playlists β†’ spotdl
console.log("[cyan]Spotify music β€” downloading via spotdl...[/cyan]")
with tempfile.TemporaryDirectory() as tmpdir:
result = subprocess.run(
[sys.executable, "-m", "spotdl", url, "--output", tmpdir,
"--format", "mp3", "--bitrate", "128k", "--print-errors"],
capture_output=True, text=True
)
# find downloaded files
audio_files = list(Path(tmpdir).glob("*.mp3")) + list(Path(tmpdir).glob("*.m4a"))
if not audio_files:
raise RuntimeError(f"spotdl produced no files.\n{result.stderr}")
transcripts = []
for af in sorted(audio_files):
console.log(f"[cyan]Transcribing:[/cyan] {af.name}")
t = transcribe_audio_file(str(af))
transcripts.append(f"=== {af.stem} ===\n{t}")
return {
"url": url,
"source": f"spotify_{sp_type}",
"method": f"spotdl+whisper_{WHISPER_MODEL}",
"meta": {"title": f"Spotify {sp_type.title()}", "uploader": "Spotify"},
"transcript": "\n\n".join(transcripts),
}
# ─────────────────────────────────────────────────────────────────────────────
# DIRECT AUDIO / VIDEO URL
# ─────────────────────────────────────────────────────────────────────────────
def transcribe_direct_audio(url: str) -> dict:
"""Download a direct audio/video file and transcribe."""
console.log(f"[cyan]Downloading direct audio:[/cyan] {url}")
with tempfile.TemporaryDirectory() as tmpdir:
ydl_opts = {
"outtmpl": os.path.join(tmpdir, "audio.%(ext)s"),
"quiet": True,
"no_warnings": True,
"concurrent_fragment_downloads": 8,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
title = info.get("title", Path(url).stem) if info else Path(url).stem
audio_files = list(Path(tmpdir).iterdir())
if not audio_files:
raise RuntimeError("No file downloaded")
audio_path = str(audio_files[0])
console.log(f"[cyan]Transcribing:[/cyan] {Path(audio_path).name}")
transcript_text = transcribe_audio_file(audio_path)
return {
"url": url,
"source": "audio",
"method": f"whisper_{WHISPER_MODEL}",
"meta": {"title": title, "uploader": "Direct"},
"transcript": transcript_text,
}
# ─────────────────────────────────────────────────────────────────────────────
# PLAYLIST / BATCH EXPANSION
# ─────────────────────────────────────────────────────────────────────────────
def expand_playlist(url: str) -> list[str]:
"""Return list of individual video URLs from a playlist/album/channel."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": True,
"skip_download": True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if "entries" in info:
urls = []
for e in info["entries"]:
if e and e.get("url"):
urls.append(e["url"])
elif e and e.get("id"):
urls.append(f"https://www.youtube.com/watch?v={e['id']}")
return urls
except Exception as exc:
console.log(f"[yellow]Playlist expansion warning: {exc}[/yellow]")
return [url]
# ─────────────────────────────────────────────────────────────────────────────
# MAIN ROUTER
# ─────────────────────────────────────────────────────────────────────────────
def transcribe_url(url: str, force_whisper: bool = False, use_cache: bool = True) -> dict:
"""Route URL to the correct transcription pipeline."""
url = url.strip()
if use_cache:
cached = cache_get(url)
if cached:
console.log(f"[green]βœ“ Cache hit:[/green] {url[:60]}")
return {"url": url, "source": "cache", "method": "cache",
"meta": {"title": "Cached"}, "transcript": cached}
source = detect_source(url)
console.log(f"[bold blue]Source detected:[/bold blue] {source} β†’ {url[:70]}")
if source in ("youtube", "youtube_music"):
result = transcribe_youtube(url, force_whisper=force_whisper)
elif source == "spotify":
result = transcribe_spotify(url)
elif source == "audio":
result = transcribe_direct_audio(url)
else:
# Try yt-dlp as generic fallback (handles 1000+ sites)
console.log("[yellow]Unknown source β€” trying yt-dlp generic handler...[/yellow]")
result = transcribe_direct_audio(url)
if use_cache:
cache_set(url, result["transcript"])
return result
# ─────────────────────────────────────────────────────────────────────────────
# OUTPUT FORMATTING
# ─────────────────────────────────────────────────────────────────────────────
def format_transcript(result: dict, include_header: bool = True) -> str:
meta = result.get("meta", {})
title = meta.get("title", "Unknown")
uploader = meta.get("uploader", "Unknown")
duration = meta.get("duration", 0)
dur_str = str(timedelta(seconds=int(duration))) if duration else "N/A"
method = result.get("method", "unknown")
url = result.get("url", "")
header = ""
if include_header:
header = (
f"{'='*70}\n"
f"TITLE : {title}\n"
f"UPLOADER : {uploader}\n"
f"DURATION : {dur_str}\n"
f"SOURCE : {result.get('source','')}\n"
f"METHOD : {method}\n"
f"URL : {url}\n"
f"{'='*70}\n\n"
)
return header + result["transcript"] + "\n"
def safe_filename(title: str) -> str:
title = re.sub(r'[<>:"/\\|?*]', "_", title)
title = title.strip(". ")[:80]
return title or "transcript"
# ─────────────────────────────────────────────────────────────────────────────
# BATCH PROCESSING
# ─────────────────────────────────────────────────────────────────────────────
def process_batch(urls: list[str], output_dir: Path, force_whisper: bool,
use_cache: bool, merge: bool, workers: int):
output_dir.mkdir(parents=True, exist_ok=True)
results = []
errors = []
console.rule("[bold green]Universal Media Transcriber[/bold green]")
console.print(f"[dim]URLs: {len(urls)} | Workers: {workers} | Model: {WHISPER_MODEL}[/dim]\n")
def job(url):
t0 = time.time()
try:
r = transcribe_url(url, force_whisper=force_whisper, use_cache=use_cache)
r["elapsed"] = round(time.time() - t0, 1)
return r
except Exception as exc:
return {"url": url, "error": str(exc), "elapsed": round(time.time() - t0, 1)}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Transcribing...", total=len(urls))
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(job, u): u for u in urls}
for fut in as_completed(futures):
result = fut.result()
if "error" in result:
errors.append(result)
console.log(f"[red]βœ— Error:[/red] {result['url'][:60]} β†’ {result['error']}")
else:
results.append(result)
console.log(f"[green]βœ“[/green] {result['meta'].get('title','?')[:50]} [{result['elapsed']}s]")
progress.advance(task)
# ── Write files ─────────────────────────────────────────────────────────
if merge and results:
merged_path = output_dir / "merged_transcript.txt"
with open(merged_path, "w", encoding="utf-8") as f:
for r in results:
f.write(format_transcript(r))
f.write("\n" + "─" * 70 + "\n\n")
console.print(f"\n[bold green]βœ“ Merged transcript:[/bold green] {merged_path}")
else:
for r in results:
title = r["meta"].get("title", "transcript")
fname = safe_filename(title) + ".txt"
out_path = output_dir / fname
# avoid collisions
if out_path.exists():
stem = out_path.stem
out_path = output_dir / f"{stem}_{cache_key(r['url'])[:6]}.txt"
out_path.write_text(format_transcript(r), encoding="utf-8")
console.print(f"[green]βœ“ Saved:[/green] {out_path}")
# ── Summary table ────────────────────────────────────────────────────────
table = Table(title="\n Summary", show_lines=True)
table.add_column("Title", style="cyan", max_width=40)
table.add_column("Method", style="magenta")
table.add_column("Time", justify="right")
table.add_column("Status", justify="center")
for r in results:
table.add_row(
r["meta"].get("title", "?")[:38],
r.get("method", "?"),
f"{r['elapsed']}s",
"[green]βœ“[/green]",
)
for r in errors:
table.add_row(r["url"][:38], "β€”", f"{r['elapsed']}s", "[red]βœ—[/red]")
console.print(table)
console.print(f"\n[bold]Done:[/bold] {len(results)} ok, {len(errors)} failed β†’ [dim]{output_dir}[/dim]")
# ─────────────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────────────
def main():
global WHISPER_MODEL
parser = argparse.ArgumentParser(
description=" Universal Media Transcriber β€” YouTube, Spotify, Audio & more",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python transcriber.py https://youtu.be/dQw4w9WgXcQ
python transcriber.py URL1 URL2 URL3 --merge
python transcriber.py --file urls.txt --output ./transcripts
python transcriber.py https://open.spotify.com/track/... --whisper
python transcriber.py https://youtu.be/... --model large-v3
python transcriber.py --playlist https://youtube.com/playlist?list=...
"""
)
parser.add_argument("urls", nargs="*", help="One or more media URLs")
parser.add_argument("--file", "-f", help="Text file with one URL per line")
parser.add_argument("--output", "-o", default="./transcripts", help="Output directory (default: ./transcripts)")
parser.add_argument("--merge", "-m", action="store_true", help="Merge all transcripts into one file")
parser.add_argument("--whisper", "-w", action="store_true", help="Force Whisper (skip caption check)")
parser.add_argument("--model", default=WHISPER_MODEL,
choices=["tiny", "base", "small", "medium", "large-v2", "large-v3"],
help="Whisper model size (default: base)")
parser.add_argument("--workers", type=int, default=MAX_WORKERS, help="Parallel workers (default: 4)")
parser.add_argument("--no-cache", action="store_true", help="Disable transcript cache")
parser.add_argument("--playlist", action="store_true", help="Treat URL as playlist β€” expand all videos")
parser.add_argument("--clear-cache", action="store_true", help="Clear the transcript cache and exit")
args = parser.parse_args()
if args.clear_cache:
shutil.rmtree(CACHE_DIR, ignore_errors=True)
CACHE_DIR.mkdir(exist_ok=True)
console.print("[green]Cache cleared.[/green]")
return
# Collect URLs
all_urls = list(args.urls)
if args.file:
path = Path(args.file)
if not path.exists():
console.print(f"[red]File not found: {path}[/red]")
sys.exit(1)
lines = path.read_text().splitlines()
all_urls += [l.strip() for l in lines if l.strip() and not l.startswith("#")]
if not all_urls:
parser.print_help()
sys.exit(0)
# Expand playlists
if args.playlist or len(all_urls) == 1:
expanded = []
for u in all_urls:
exp = expand_playlist(u)
if len(exp) > 1:
console.log(f"[cyan]Playlist expanded:[/cyan] {len(exp)} items")
expanded.extend(exp)
all_urls = expanded
# Deduplicate preserving order
seen = set()
deduped = []
for u in all_urls:
if u not in seen:
seen.add(u)
deduped.append(u)
all_urls = deduped
process_batch(
urls=all_urls,
output_dir=Path(args.output),
force_whisper=args.whisper,
use_cache=not args.no_cache,
merge=args.merge,
wocd rkers=args.workers,
)
if __name__ == "__main__":
main()