Spaces:
Running
Running
Ahmed Mostafa
feat: implement YouTube note generation API with background task processing and duration scraping
8813304 | import json | |
| import os | |
| import re | |
| import uuid | |
| import asyncio | |
| from datetime import datetime | |
| from typing import Dict, List | |
| from curl_cffi import requests as curl_requests | |
| from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException | |
| from pydantic import BaseModel, HttpUrl | |
| from src.api.downloader import YouTubeDownloader | |
| from src.auth.dependencies import get_current_user | |
| from src.db.models import User | |
| from src.summarization.note_generator import NoteGenerator | |
| from src.utils.config import settings | |
| from src.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| router = APIRouter(tags=["Notes"]) | |
| tasks: Dict[str, Dict] = {} | |
| def _set_task_status(task_id: str, status: str, message: str) -> None: | |
| tasks[task_id]["status"] = status | |
| tasks[task_id]["message"] = message | |
| def _proxy_dict() -> dict | None: | |
| proxy_url = os.environ.get("PROXY_URL", "").strip() or os.environ.get("YOUTUBE_PROXY", "").strip() | |
| if not proxy_url: | |
| return None | |
| return { | |
| "http": proxy_url, | |
| "https": proxy_url, | |
| } | |
| def _extract_video_id(url: str) -> str: | |
| """Extract the 11-character YouTube video ID from any URL format.""" | |
| match = re.search(r"(?:v=|youtu\.be/|shorts/|embed/)([A-Za-z0-9_-]{11})", str(url)) | |
| return match.group(1) if match else "" | |
| def _duration_via_supadata(video_id: str) -> int: | |
| """Estimate video duration from Supadata transcript segment timestamps.""" | |
| api_key = os.environ.get("SUPADATA_API_KEY", "").strip() | |
| if not api_key: | |
| return 0 | |
| try: | |
| api_url = ( | |
| "https://api.supadata.ai/v1/youtube/transcript" | |
| f"?url=https://www.youtube.com/watch?v={video_id}" | |
| ) | |
| resp = curl_requests.get( | |
| api_url, | |
| headers={"x-api-key": api_key}, | |
| impersonate="chrome124", | |
| timeout=20, | |
| proxies=_proxy_dict(), | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| segments = data.get("segments") or data.get("content", []) | |
| if isinstance(segments, list) and segments: | |
| last = segments[-1] | |
| offset_ms = last.get("offset", 0) or last.get("start", 0) | |
| dur_ms = last.get("duration", 0) or last.get("dur", 0) | |
| total_s = (int(offset_ms) + int(dur_ms)) // 1000 | |
| if total_s > 0: | |
| logger.info("[S2-supadata] duration~%ds", total_s) | |
| return total_s | |
| except Exception as exc: | |
| logger.warning("[S2-supadata] failed: %s", exc) | |
| return 0 | |
| def _duration_via_html_scrape(url: str) -> int: | |
| """Scrape the watch page and parse duration hints.""" | |
| headers = { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/124.0.0.0 Safari/537.36" | |
| ), | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept": ( | |
| "text/html,application/xhtml+xml,application/xml;" | |
| "q=0.9,image/avif,image/webp,*/*;q=0.8" | |
| ), | |
| "Connection": "keep-alive", | |
| "DNT": "1", | |
| "Upgrade-Insecure-Requests": "1", | |
| } | |
| try: | |
| resp = curl_requests.get( | |
| url, | |
| headers=headers, | |
| impersonate="chrome124", | |
| timeout=15, | |
| proxies=_proxy_dict(), | |
| ) | |
| resp.raise_for_status() | |
| html = resp.text | |
| except Exception as exc: | |
| logger.warning("[S3-scrape] HTTP fetch failed: %s", exc) | |
| return 0 | |
| match = re.search(r'"lengthSeconds"\s*:\s*"(\d+)"', html) | |
| if match: | |
| duration = int(match.group(1)) | |
| logger.info("[S3a-regex-quoted] duration=%ds", duration) | |
| return duration | |
| match = re.search(r'"approxDurationMs"\s*:\s*"(\d+)"', html) | |
| if match: | |
| duration = int(match.group(1)) // 1000 | |
| logger.info("[S3b-approxMs] duration=%ds", duration) | |
| return duration | |
| match = re.search( | |
| r"var\s+ytInitialPlayerResponse\s*=\s*(\{.*?\})\s*;", | |
| html, | |
| re.DOTALL, | |
| ) | |
| if match: | |
| try: | |
| data = json.loads(match.group(1)) | |
| seconds_str = data.get("videoDetails", {}).get("lengthSeconds", "") | |
| if seconds_str and str(seconds_str).isdigit(): | |
| duration = int(seconds_str) | |
| logger.info("[S3c-jsonParse] duration=%ds", duration) | |
| return duration | |
| except (json.JSONDecodeError, AttributeError) as exc: | |
| logger.warning("[S3c-jsonParse] JSON decode failed: %s", exc) | |
| return 0 | |
| def get_youtube_duration( | |
| url: str, | |
| preferred_duration: int = 0, | |
| strategy: str | None = None, | |
| ) -> int: | |
| """Fetch YouTube duration in seconds using Supadata, then page scraping.""" | |
| video_id = _extract_video_id(url) | |
| if preferred_duration > 0: | |
| return preferred_duration | |
| if video_id: | |
| duration = _duration_via_supadata(video_id) | |
| if duration > 0: | |
| return duration | |
| duration = _duration_via_html_scrape(url) | |
| if duration > 0: | |
| return duration | |
| logger.warning("[duration] All strategies exhausted for: %s", url) | |
| return 0 | |
| class GenerateNotesRequest(BaseModel): | |
| youtube_url: HttpUrl | |
| language: str = "en" | |
| deep_scan: bool = False | |
| class TaskResponse(BaseModel): | |
| task_id: str | |
| status: str | |
| message: str | |
| class GeneratedNoteFile(BaseModel): | |
| filename: str | |
| title: str | |
| created_at: float | |
| size: int | |
| async def generate_note( | |
| request: GenerateNotesRequest, | |
| background_tasks: BackgroundTasks, | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| task_id = str(uuid.uuid4()) | |
| user_id = current_user.id | |
| tasks[task_id] = { | |
| "status": "pending", | |
| "message": "Initializing...", | |
| "youtube_url": str(request.youtube_url), | |
| "user_id": user_id, | |
| "usedDeepScan": False, | |
| "created_at": datetime.now(), | |
| } | |
| background_tasks.add_task( | |
| process_video_task, | |
| task_id, | |
| str(request.youtube_url), | |
| request.language, | |
| user_id, | |
| request.deep_scan, | |
| ) | |
| return TaskResponse( | |
| task_id=task_id, | |
| status="pending", | |
| message="Generation started successfully.", | |
| ) | |
| async def get_task_status(task_id: str): | |
| if task_id not in tasks: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| return tasks[task_id] | |
| async def process_video_task( | |
| task_id: str, | |
| youtube_url: str, | |
| language: str, | |
| user_id: str, | |
| deep_scan: bool = False, | |
| ): | |
| downloader = YouTubeDownloader() | |
| try: | |
| video_id = _extract_video_id(youtube_url) | |
| video_title = "YouTube Video" | |
| _set_task_status(task_id, "validating_url", "Validating video URL...") | |
| prefetched_duration = _duration_via_html_scrape(youtube_url) | |
| _set_task_status( | |
| task_id, | |
| "extracting_content", | |
| "Checking for available subtitles...", | |
| ) | |
| if deep_scan: | |
| transcript_text = await asyncio.to_thread( | |
| _transcribe_audio_fallback, | |
| task_id, | |
| youtube_url, | |
| language, | |
| downloader, | |
| ) | |
| else: | |
| try: | |
| transcript_text = await asyncio.to_thread( | |
| downloader.get_transcript, | |
| youtube_url, | |
| ) | |
| except Exception as transcript_exc: | |
| logger.info( | |
| "Subtitle transcript unavailable for task %s, starting deep scan: %s", | |
| task_id, | |
| transcript_exc, | |
| ) | |
| transcript_text = await asyncio.to_thread( | |
| _transcribe_audio_fallback, | |
| task_id, | |
| youtube_url, | |
| language, | |
| downloader, | |
| ) | |
| _set_task_status( | |
| task_id, | |
| "transcript_ready", | |
| "Transcript ready. Preparing summary...", | |
| ) | |
| _set_task_status( | |
| task_id, | |
| "ai_processing", | |
| "Generating intelligent summary...", | |
| ) | |
| note_gen = NoteGenerator() | |
| summary_json = note_gen.generateSummary(transcript_text, video_title) | |
| resolved_video_title = video_title | |
| if resolved_video_title == "YouTube Video": | |
| resolved_video_title = str(summary_json.get("title") or resolved_video_title) | |
| video_duration = get_youtube_duration( | |
| youtube_url, | |
| preferred_duration=prefetched_duration, | |
| ) | |
| final_markdown = note_gen.format_final_notes( | |
| note_gen.format_notes_to_markdown(summary_json), | |
| resolved_video_title, | |
| youtube_url, | |
| video_duration, | |
| detected_language=summary_json.get("detected_language", "English"), | |
| ) | |
| segments = summary_json.get("segments", []) | |
| key_points_list = [ | |
| seg["key_insight"] | |
| for seg in segments | |
| if isinstance(seg, dict) and seg.get("key_insight") | |
| ] | |
| from src.summarization.topic_classifier import classify_topics | |
| _set_task_status( | |
| task_id, | |
| "structuring_notes", | |
| "Structuring notes and key points...", | |
| ) | |
| raw_topics = summary_json.get("topics", []) | |
| categories = classify_topics(raw_topics) if raw_topics else ["Education & Science"] | |
| _set_task_status(task_id, "complete", "Generation completed successfully.") | |
| tasks[task_id]["notes"] = final_markdown | |
| tasks[task_id]["topics"] = categories | |
| tasks[task_id]["category"] = categories | |
| tasks[task_id]["keyPoints"] = key_points_list | |
| tasks[task_id]["videoTitle"] = resolved_video_title | |
| tasks[task_id]["thumbnail"] = ( | |
| f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg" if video_id else "" | |
| ) | |
| logger.info("Task %s completed successfully", task_id) | |
| except Exception as exc: | |
| logger.error("Task %s failed: %s", task_id, exc) | |
| _set_task_status(task_id, "failed", str(exc)) | |
| def _transcribe_audio_fallback( | |
| task_id: str, | |
| youtube_url: str, | |
| language: str, | |
| downloader: YouTubeDownloader, | |
| ) -> str: | |
| audio_path = None | |
| try: | |
| _set_task_status( | |
| task_id, | |
| "extracting_audio", | |
| "No subtitles found. Extracting audio for deep scan...", | |
| ) | |
| tasks[task_id]["usedDeepScan"] = True | |
| audio_path = downloader.download_audio(youtube_url, task_id) | |
| _set_task_status( | |
| task_id, | |
| "transcribing_audio", | |
| "Transcribing audio with deep scan...", | |
| ) | |
| from src.transcription.whisper_transcriber import WhisperTranscriber | |
| transcript_data = WhisperTranscriber().transcribe( | |
| audio_path, | |
| language=language, | |
| verbose=False, | |
| ) | |
| transcript_text = str(transcript_data.get("text", "")).strip() | |
| if not transcript_text: | |
| raise RuntimeError("Deep scan produced an empty transcript.") | |
| return transcript_text | |
| except Exception as exc: | |
| raise RuntimeError( | |
| "Deep scan failed: audio extraction or transcription could not be completed. " | |
| "The video may be private, restricted, DRM-protected, unavailable, " | |
| "or YouTube may require YOUTUBE_COOKIES_B64/YOUTUBE_COOKIES for this Space. " | |
| f"Details: {exc}" | |
| ) from exc | |
| finally: | |
| if audio_path is not None: | |
| downloader.cleanup(audio_path) | |
| async def list_generated_notes(): | |
| notes = [] | |
| output_dir = settings.output_dir | |
| if not output_dir.exists(): | |
| return [] | |
| for file_path in output_dir.glob("*_notes.md"): | |
| stats = file_path.stat() | |
| notes.append( | |
| GeneratedNoteFile( | |
| filename=file_path.name, | |
| title=file_path.name.replace("_notes.md", ""), | |
| created_at=stats.st_mtime, | |
| size=stats.st_size, | |
| ) | |
| ) | |
| notes.sort(key=lambda item: item.created_at, reverse=True) | |
| return notes | |