Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import subprocess | |
from typing import Tuple | |
from fastapi import FastAPI, File, UploadFile | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse | |
# --- Put caches in writable paths for Spaces BEFORE any HF imports --- | |
os.environ.setdefault("HF_HOME", "/tmp/huggingface") | |
os.environ.setdefault("XDG_CACHE_HOME", "/tmp") | |
from faster_whisper import WhisperModel | |
from transformers import pipeline | |
app = FastAPI(title="Video β Title & Summary (Open Source)") | |
# CORS so your React app can call this API from anywhere | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # tighten to your domain in production | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# --------- Model loading (once) --------- | |
print("Loading models...") | |
# Whisper: choose tiny | base | small ; small = better accuracy, slower | |
WHISPER_SIZE = os.getenv("WHISPER_SIZE", "small") | |
# CPU-friendly: int8 compute; uses ~1β2 GB RAM for "small" | |
whisper_model = WhisperModel(WHISPER_SIZE, device="cpu", compute_type="int8") | |
# Summarizer: compact & solid | |
summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum") | |
# Title generator (tiny T5). You can switch to flan-t5-base if you upgrade hardware. | |
title_gen = pipeline("text2text-generation", model="google/flan-t5-small") | |
print("Models loaded.") | |
# --------- Helpers --------- | |
def extract_audio_wav_16k_mono(video_path: str) -> str: | |
"""Extract 16kHz mono WAV from a video file using system ffmpeg.""" | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: | |
audio_path = tmp.name | |
cmd = [ | |
"ffmpeg", | |
"-y", | |
"-i", video_path, | |
"-vn", | |
"-acodec", "pcm_s16le", | |
"-ar", "16000", | |
"-ac", "1", | |
audio_path, | |
] | |
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) | |
return audio_path | |
def safe_trim(text: str, max_chars: int) -> str: | |
"""Trim at word boundary to keep inputs within model limits.""" | |
text = (text or "").strip() | |
if len(text) <= max_chars: | |
return text | |
return text[:max_chars].rsplit(" ", 1)[0] + "..." | |
def summarize_and_title(transcript: str) -> Tuple[str, str]: | |
"""Return (title, summary) from transcript using open-source models.""" | |
trimmed = safe_trim(transcript, 4000) # rough guard for token limits | |
# Summary | |
summary = summarizer( | |
trimmed, | |
max_length=140, | |
min_length=40, | |
do_sample=False, | |
)[0]["summary_text"].strip() | |
# Title | |
title_prompt = ( | |
"Write a short, catchy YouTube-style title (<= 8 words) for this summary:\n" | |
f"{summary}" | |
) | |
title = title_gen(title_prompt, max_new_tokens=16, num_return_sequences=1)[0]["generated_text"].strip() | |
# Tidying | |
title = title.replace('"', "").replace("\n", " ").strip() | |
title = safe_trim(title, 80) | |
return title, summary | |
# --------- API --------- | |
async def process_video(file: UploadFile = File(...)): | |
""" | |
Accepts a video under form field 'file'. | |
Returns JSON: { "title": str, "summary": str } | |
""" | |
tmp_video = None | |
tmp_audio = None | |
try: | |
# Save uploaded video | |
suffix = os.path.splitext(file.filename or "")[1] or ".mp4" | |
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tv: | |
tmp_video = tv.name | |
tv.write(await file.read()) | |
# Extract audio with ffmpeg | |
tmp_audio = extract_audio_wav_16k_mono(tmp_video) | |
# Transcribe (auto language). You can force English via language="en" | |
segments, _info = whisper_model.transcribe(tmp_audio) | |
transcript = " ".join(seg.text for seg in segments).strip() | |
if not transcript: | |
return JSONResponse({"error": "No speech detected in the video."}, status_code=400) | |
# Summarize + Title | |
title, summary = summarize_and_title(transcript) | |
return JSONResponse({"title": title, "summary": summary}) | |
except subprocess.CalledProcessError as e: | |
return JSONResponse({"error": "ffmpeg failed", "detail": str(e)}, status_code=500) | |
except Exception as e: | |
return JSONResponse({"error": str(e)}, status_code=500) | |
finally: | |
# Cleanup | |
for p in (tmp_audio, tmp_video): | |
if p and os.path.exists(p): | |
try: | |
os.remove(p) | |
except Exception: | |
pass | |
def root(): | |
return { | |
"ok": True, | |
"message": "POST a video to /process_video with form field 'file'.", | |
"docs": "/docs" | |
} | |