guyu / main.py
tecuts's picture
Update main.py
3e91fb9 verified
import os
import uuid
import json
import asyncio
import logging
import subprocess
from pathlib import Path
from typing import Optional
import yt_dlp
from fastapi import FastAPI, HTTPException, Request, Body
from fastapi.responses import JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s")
logger = logging.getLogger("main")
# ---------------------------------------------------------------------------
# App setup
# ---------------------------------------------------------------------------
app = FastAPI(
title="yt-dlp API",
description="Download videos, fetch info, and stream HLS via yt-dlp + Deno/EJS.",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DOWNLOAD_DIR = Path("downloads")
DOWNLOAD_DIR.mkdir(exist_ok=True)
COOKIE_FILE = "www.youtube.com_cookies.txt"
# Map human-friendly quality labels β†’ yt-dlp format selectors
# For video qualities, we target the max dimension (height or width)
# and prefer H.264 for ≀1080, best codec for higher.
QUALITY_MAP: dict[str, str] = {
# --- Video ---
"best": "bestvideo+bestaudio/best",
"2160": "bestvideo[height<=3888][width<=3888]+bestaudio/best", # 2160 * 1.8
"1440": "bestvideo[height<=2592][width<=2592]+bestaudio/best",
"1080": "bestvideo[vcodec^=avc][height<=1944][width<=1944]+bestaudio/bestvideo[height<=1944][width<=1944]+bestaudio/best",
"720": "bestvideo[vcodec^=avc][height<=1296][width<=1296]+bestaudio/bestvideo[height<=1296][width<=1296]+bestaudio/best",
"480": "bestvideo[vcodec^=avc][height<=864][width<=864]+bestaudio/bestvideo[height<=864][width<=864]+bestaudio/best",
"360": "bestvideo[vcodec^=avc][height<=648][width<=648]+bestaudio/bestvideo[height<=648][width<=648]+bestaudio/best",
"240": "bestvideo[vcodec^=avc][height<=432][width<=432]+bestaudio/bestvideo[height<=432][width<=432]+bestaudio/best",
# --- Audio only ---
"mp3": "bestaudio/best",
"m4a": "bestaudio[ext=m4a]/bestaudio/best",
"wav": "bestaudio/best",
"flac": "bestaudio/best",
"opus": "bestaudio[ext=webm]/bestaudio/best",
}
AUDIO_FORMATS = {"mp3", "m4a", "wav", "flac", "opus"}
ALLOWED_QUALITIES = set(QUALITY_MAP.keys())
# ---------------------------------------------------------------------------
# Pydantic models
# ---------------------------------------------------------------------------
class DownloadRequest(BaseModel):
url: HttpUrl
quality: str = "best" # any key from QUALITY_MAP
prefer_h264: bool = True # ignored for audio / >1080 (already baked in)
class InfoRequest(BaseModel):
url: HttpUrl
flat: bool = False # True = fast playlist-level info only
class HLSRequest(BaseModel):
url: HttpUrl
quality: str = "best"
class DownloadResponse(BaseModel):
url: str
filename: str
format: str
filesize_approx: Optional[int] = None
class HLSResponse(BaseModel):
url: str
filename: str
title: Optional[str] = None
duration: Optional[float] = None
thumbnail: Optional[str] = None
class ErrorResponse(BaseModel):
detail: str
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def base_ydl_opts() -> dict:
"""Common yt-dlp options shared across all calls."""
opts: dict = {
"javascript_runtime": "deno",
"extractor_args": {
"youtube": {
"player_client": ["web", "tv"],
}
},
"quiet": True,
"noprogress": True,
"noplaylist": True,
}
if os.path.exists(COOKIE_FILE):
opts["cookiefile"] = COOKIE_FILE
logger.info("Cookie file found, using it.")
else:
logger.warning(f"Cookie file '{COOKIE_FILE}' not found.")
return opts
def resolve_format_selector(quality: str) -> tuple[str, bool]:
"""
Returns (format_selector, is_audio_only).
Raises HTTPException 400 if quality is unknown.
"""
q = quality.lower().strip()
if q not in QUALITY_MAP:
raise HTTPException(
status_code=400,
detail=f"Unknown quality '{quality}'. Allowed: {sorted(ALLOWED_QUALITIES)}",
)
return QUALITY_MAP[q], q in AUDIO_FORMATS
def perform_download(ydl_opts: dict, url: str, stem: Path) -> Path:
"""Run yt-dlp download synchronously and return the output file path."""
logger.info(f"Starting download: {url}")
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
except yt_dlp.utils.DownloadError as e:
logger.error(f"yt-dlp download error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Find the file that was written (extension unknown ahead of time)
candidates = list(stem.parent.glob(f"{stem.name}.*"))
# Filter out yt-dlp temp files
candidates = [f for f in candidates if not f.suffix in (".part", ".ytdl")]
if not candidates:
raise HTTPException(status_code=500, detail="Download completed but output file not found.")
# Pick the largest file if somehow multiple exist
final = max(candidates, key=lambda f: f.stat().st_size)
logger.info(f"Download complete: {final}")
return final
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/")
async def root():
return {"status": "ok", "message": "yt-dlp API is running. See /docs for usage."}
# ── /download ────────────────────────────────────────────────────────────────
@app.post(
"/download",
response_model=DownloadResponse,
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}},
summary="Download a video or audio file",
description=(
"Download media from any yt-dlp supported URL. "
"Supported qualities: best, 2160, 1440, 1080, 720, 480, 360, 240, mp3, m4a, wav, flac, opus."
),
)
async def download(request: Request, payload: DownloadRequest = Body(...)):
logger.info(f"/download url={payload.url} quality={payload.quality}")
format_selector, is_audio = resolve_format_selector(payload.quality)
quality_lower = payload.quality.lower()
unique_id = str(uuid.uuid4())
stem = DOWNLOAD_DIR / unique_id
opts = base_ydl_opts()
opts["format"] = format_selector
opts["outtmpl"] = str(stem) + ".%(ext)s"
if is_audio:
# Transcode to the requested audio format
target_ext = quality_lower # e.g. "mp3"
opts["postprocessors"] = [
{
"key": "FFmpegExtractAudio",
"preferredcodec": target_ext,
"preferredquality": "192" if target_ext == "mp3" else "0",
}
]
logger.info(f"Audio mode: extracting as {target_ext}")
else:
# Apple Compatibility: Prioritize H.264 (avc1) and AAC (m4a)
# We modify the format selector to prefer these codecs if available
if quality_lower == "best" or quality_lower.isdigit():
# If a specific quality is requested, we try to find H.264 within that constraint
original_fmt = opts.get("format", "bestvideo+bestaudio/best")
# This selector tries to find H.264+AAC first, then falls back to original selector
opts["format"] = f"bestvideo[vcodec^=avc]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/({original_fmt})"
opts["merge_output_format"] = "mp4"
# Ensure audio is AAC if merging to MP4 for best Apple compatibility
opts["postprocessors"] = opts.get("postprocessors", []) + [
{
"key": "FFmpegVideoConvertor",
"preferedformat": "mp4",
}
]
loop = asyncio.get_event_loop()
final_path = await loop.run_in_executor(None, perform_download, opts, str(payload.url), stem)
filename = final_path.name
filesize = final_path.stat().st_size if final_path.exists() else None
download_url = f"{str(request.base_url).rstrip('/')}/downloads/{filename}"
return DownloadResponse(
url=download_url,
filename=filename,
format=payload.quality,
filesize_approx=filesize,
)
# ── /get-info ────────────────────────────────────────────────────────────────
@app.post(
"/get-info",
summary="Fetch raw media info without downloading",
description=(
"Returns the raw yt-dlp info dict for the given URL as JSON. "
"Set flat=true for fast playlist-level info."
),
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}},
)
async def get_info(payload: InfoRequest = Body(...)):
logger.info(f"/get-info url={payload.url} flat={payload.flat}")
opts = base_ydl_opts()
opts["skip_download"] = True
if payload.flat:
opts["extract_flat"] = True
try:
def _extract():
with yt_dlp.YoutubeDL(opts) as ydl:
return ydl.extract_info(str(payload.url), download=False)
loop = asyncio.get_event_loop()
info = await loop.run_in_executor(None, _extract)
except yt_dlp.utils.DownloadError as e:
logger.error(f"yt-dlp info extraction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if info is None:
raise HTTPException(status_code=500, detail="yt-dlp returned no info.")
# Sanitize: yt-dlp info dicts are JSON-serialisable but may contain
# non-serialisable objects in edge cases β€” use yt-dlp's own sanitizer.
sanitized = yt_dlp.utils.sanitize_filename # just importing to confirm available
try:
clean = json.loads(json.dumps(info, default=str))
except Exception:
clean = {"error": "Info dict could not be fully serialized.", "title": info.get("title")}
return JSONResponse(content=clean)
# ── /hls ─────────────────────────────────────────────────────────────────────
@app.post(
"/hls",
response_model=HLSResponse,
summary="Download video and return metadata + download link",
description=(
"Extracts metadata and downloads the video in one call. "
"Returns the local download URL and video info."
),
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}},
)
async def get_hls(request: Request, payload: HLSRequest = Body(...)):
logger.info(f"/hls (download mode) url={payload.url} quality={payload.quality}")
format_selector, _ = resolve_format_selector(payload.quality)
unique_id = str(uuid.uuid4())
stem = DOWNLOAD_DIR / unique_id
opts = base_ydl_opts()
opts["format"] = format_selector
opts["outtmpl"] = str(stem) + ".%(ext)s"
opts["merge_output_format"] = "mp4"
try:
def _extract_and_download():
with yt_dlp.YoutubeDL(opts) as ydl:
# Extract info first to get metadata
info = ydl.extract_info(str(payload.url), download=True)
return info
loop = asyncio.get_event_loop()
info = await loop.run_in_executor(None, _extract_and_download)
except yt_dlp.utils.DownloadError as e:
logger.error(f"yt-dlp error in /hls: {e}")
raise HTTPException(status_code=500, detail=str(e))
if info is None:
raise HTTPException(status_code=500, detail="yt-dlp returned no info.")
# Find the downloaded file
candidates = list(stem.parent.glob(f"{stem.name}.*"))
candidates = [f for f in candidates if not f.suffix in (".part", ".ytdl")]
if not candidates:
raise HTTPException(status_code=500, detail="Download failed or file not found.")
final_path = max(candidates, key=lambda f: f.stat().st_size)
filename = final_path.name
download_url = f"{str(request.base_url).rstrip('/')}/downloads/{filename}"
return HLSResponse(
url=download_url,
filename=filename,
title=info.get("title"),
duration=info.get("duration"),
thumbnail=info.get("thumbnail"),
)
# ── /downloads/{filename} ─────────────────────────────────────────────────────
@app.get(
"/downloads/{filename}",
summary="Serve a previously downloaded file",
)
async def serve_file(filename: str):
"""Serve files from the downloads directory."""
# Basic path traversal guard
safe_name = Path(filename).name
file_path = DOWNLOAD_DIR / safe_name
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found.")
return FileResponse(
path=file_path,
filename=safe_name,
media_type="application/octet-stream",
)