|
|
|
""" |
|
YouTube Tools Module - Fixed version using pytubefix |
|
Addresses network issues, deprecation warnings, and playlist errors |
|
""" |
|
|
|
from pytubefix import YouTube, Playlist |
|
from pytubefix.cli import on_progress |
|
from typing import Optional, Dict, Any, List |
|
import os |
|
import time |
|
import logging |
|
from .utils import logger, validate_file_exists |
|
|
|
import cv2 |
|
import tempfile |
|
import os |
|
from typing import Optional, Dict, Any, List |
|
from PIL import Image |
|
import numpy as np |
|
|
|
class YouTubeTools: |
|
"""YouTube tools with improved error handling and network resilience""" |
|
|
|
def __init__(self, max_retries: int = 3, retry_delay: float = 1.0): |
|
self.supported_formats = ['mp4', '3gp', 'webm'] |
|
self.supported_audio_formats = ['mp3', 'mp4', 'webm'] |
|
self.max_retries = max_retries |
|
self.retry_delay = retry_delay |
|
|
|
def _retry_operation(self, operation, *args, **kwargs): |
|
"""Retry operation with exponential backoff for network issues""" |
|
for attempt in range(self.max_retries): |
|
try: |
|
return operation(*args, **kwargs) |
|
except Exception as e: |
|
if attempt == self.max_retries - 1: |
|
raise e |
|
|
|
error_msg = str(e).lower() |
|
if any(term in error_msg for term in ['network', 'socket', 'timeout', 'connection']): |
|
wait_time = self.retry_delay * (2 ** attempt) |
|
logger.warning(f"Network error (attempt {attempt + 1}/{self.max_retries}): {e}") |
|
logger.info(f"Retrying in {wait_time} seconds...") |
|
time.sleep(wait_time) |
|
else: |
|
raise e |
|
|
|
def get_video_info(self, url: str) -> Optional[Dict[str, Any]]: |
|
""" |
|
Retrieve comprehensive metadata about a YouTube video using pytubefix |
|
""" |
|
try: |
|
def _get_info(): |
|
yt = YouTube(url, on_progress_callback=on_progress) |
|
|
|
|
|
video_streams = [] |
|
try: |
|
streams = yt.streams.filter(progressive=True, file_extension='mp4') |
|
for stream in streams: |
|
try: |
|
video_streams.append({ |
|
'resolution': getattr(stream, 'resolution', 'unknown'), |
|
'fps': getattr(stream, 'fps', 'unknown'), |
|
'video_codec': getattr(stream, 'video_codec', 'unknown'), |
|
'audio_codec': getattr(stream, 'audio_codec', 'unknown'), |
|
'filesize': getattr(stream, 'filesize', None), |
|
'mime_type': getattr(stream, 'mime_type', 'unknown') |
|
}) |
|
except Exception as stream_error: |
|
logger.debug(f"Error processing stream: {stream_error}") |
|
continue |
|
except Exception as e: |
|
logger.warning(f"Could not retrieve stream details: {e}") |
|
|
|
|
|
captions_available = [] |
|
try: |
|
if yt.captions: |
|
captions_available = list(yt.captions.keys()) |
|
except Exception as e: |
|
logger.warning(f"Could not retrieve captions list: {e}") |
|
|
|
info = { |
|
'title': getattr(yt, 'title', 'Unknown'), |
|
'author': getattr(yt, 'author', 'Unknown'), |
|
'channel_url': getattr(yt, 'channel_url', 'Unknown'), |
|
'length': getattr(yt, 'length', 0), |
|
'views': getattr(yt, 'views', 0), |
|
'description': getattr(yt, 'description', ''), |
|
'thumbnail_url': getattr(yt, 'thumbnail_url', ''), |
|
'publish_date': yt.publish_date.isoformat() if getattr(yt, 'publish_date', None) else None, |
|
'keywords': getattr(yt, 'keywords', []), |
|
'video_id': getattr(yt, 'video_id', ''), |
|
'watch_url': getattr(yt, 'watch_url', url), |
|
'available_streams': video_streams, |
|
'captions_available': captions_available |
|
} |
|
|
|
return info |
|
|
|
info = self._retry_operation(_get_info) |
|
if info is not None: |
|
logger.info(f"Retrieved info for video: {info.get('title', 'Unknown')}") |
|
return info |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get video info for {url}: {e}") |
|
return None |
|
|
|
def download_video(self, url: str, output_path: str = './downloads', |
|
resolution: str = 'highest', filename: Optional[str] = None) -> Optional[str]: |
|
"""Download a YouTube video with retry logic""" |
|
try: |
|
def _download(): |
|
os.makedirs(output_path, exist_ok=True) |
|
|
|
yt = YouTube(url, on_progress_callback=on_progress) |
|
|
|
|
|
if resolution == 'highest': |
|
stream = yt.streams.get_highest_resolution() |
|
elif resolution == 'lowest': |
|
stream = yt.streams.get_lowest_resolution() |
|
else: |
|
stream = yt.streams.filter(res=resolution, progressive=True, file_extension='mp4').first() |
|
if not stream: |
|
logger.warning(f"Resolution {resolution} not found, downloading highest instead") |
|
stream = yt.streams.get_highest_resolution() |
|
|
|
if not stream: |
|
raise Exception("No suitable stream found for download") |
|
|
|
|
|
if filename: |
|
safe_filename = "".join(c for c in filename if c.isalnum() or c in (' ', '-', '_')).rstrip() |
|
file_path = stream.download(output_path=output_path, filename=f"{safe_filename}.{stream.subtype}") |
|
else: |
|
file_path = stream.download(output_path=output_path) |
|
|
|
return file_path |
|
|
|
file_path = self._retry_operation(_download) |
|
logger.info(f"Downloaded video to {file_path}") |
|
return file_path |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to download video from {url}: {e}") |
|
return None |
|
|
|
def download_audio(self, url: str, output_path: str = './downloads', |
|
filename: Optional[str] = None) -> Optional[str]: |
|
"""Download only audio from a YouTube video with retry logic""" |
|
try: |
|
def _download_audio(): |
|
os.makedirs(output_path, exist_ok=True) |
|
|
|
yt = YouTube(url, on_progress_callback=on_progress) |
|
audio_stream = yt.streams.get_audio_only() |
|
|
|
if not audio_stream: |
|
raise Exception("No audio stream found") |
|
|
|
if filename: |
|
safe_filename = "".join(c for c in filename if c.isalnum() or c in (' ', '-', '_')).rstrip() |
|
file_path = audio_stream.download(output_path=output_path, filename=f"{safe_filename}.{audio_stream.subtype}") |
|
else: |
|
file_path = audio_stream.download(output_path=output_path) |
|
|
|
return file_path |
|
|
|
file_path = self._retry_operation(_download_audio) |
|
logger.info(f"Downloaded audio to {file_path}") |
|
return file_path |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to download audio from {url}: {e}") |
|
return None |
|
|
|
def get_captions(self, url: str, language_code: str = 'en') -> Optional[str]: |
|
""" |
|
Get captions/subtitles - FIXED: No more deprecation warning |
|
""" |
|
try: |
|
def _get_captions(): |
|
yt = YouTube(url, on_progress_callback=on_progress) |
|
|
|
if not yt.captions: |
|
logger.warning("No captions available for this video") |
|
return None |
|
|
|
|
|
if language_code in yt.captions: |
|
caption = yt.captions[language_code] |
|
captions_text = caption.generate_srt_captions() |
|
return captions_text |
|
else: |
|
available_langs = list(yt.captions.keys()) |
|
logger.warning(f"Captions not found for language {language_code}. Available: {available_langs}") |
|
return None |
|
|
|
result = self._retry_operation(_get_captions) |
|
if result: |
|
logger.info(f"Retrieved captions in {language_code}") |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get captions from {url}: {e}") |
|
return None |
|
|
|
def get_playlist_info(self, playlist_url: str) -> Optional[Dict[str, Any]]: |
|
""" |
|
Get information about a YouTube playlist - FIXED: Better error handling |
|
""" |
|
try: |
|
def _get_playlist_info(): |
|
playlist = Playlist(playlist_url) |
|
|
|
|
|
video_urls = list(playlist.video_urls) |
|
|
|
|
|
info = { |
|
'video_count': len(video_urls), |
|
'video_urls': video_urls[:10], |
|
'total_videos': len(video_urls) |
|
} |
|
|
|
|
|
try: |
|
info['title'] = getattr(playlist, 'title', 'Unknown Playlist') |
|
except: |
|
info['title'] = 'Private/Unavailable Playlist' |
|
|
|
try: |
|
info['description'] = getattr(playlist, 'description', '') |
|
except: |
|
info['description'] = 'Description unavailable' |
|
|
|
try: |
|
info['owner'] = getattr(playlist, 'owner', 'Unknown') |
|
except: |
|
info['owner'] = 'Owner unavailable' |
|
|
|
return info |
|
|
|
info = self._retry_operation(_get_playlist_info) |
|
if info is not None: |
|
logger.info(f"Retrieved playlist info: {info['title']} ({info['video_count']} videos)") |
|
return info |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get playlist info from {playlist_url}: {e}") |
|
return None |
|
|
|
def get_available_qualities(self, url: str) -> Optional[List[Dict[str, Any]]]: |
|
""" |
|
Get all available download qualities - FIXED: Better network handling |
|
""" |
|
try: |
|
def _get_qualities(): |
|
yt = YouTube(url, on_progress_callback=on_progress) |
|
streams = [] |
|
|
|
|
|
for stream in yt.streams.filter(progressive=True): |
|
try: |
|
streams.append({ |
|
'resolution': getattr(stream, 'resolution', 'unknown'), |
|
'fps': getattr(stream, 'fps', 'unknown'), |
|
'filesize_mb': round(stream.filesize / (1024 * 1024), 2) if getattr(stream, 'filesize', None) else None, |
|
'mime_type': getattr(stream, 'mime_type', 'unknown'), |
|
'video_codec': getattr(stream, 'video_codec', 'unknown'), |
|
'audio_codec': getattr(stream, 'audio_codec', 'unknown') |
|
}) |
|
except Exception as stream_error: |
|
logger.debug(f"Error processing stream: {stream_error}") |
|
continue |
|
|
|
|
|
def sort_key(x): |
|
res = x['resolution'] |
|
if res and res != 'unknown' and res[:-1].isdigit(): |
|
return int(res[:-1]) |
|
return 0 |
|
|
|
return sorted(streams, key=sort_key, reverse=True) |
|
|
|
return self._retry_operation(_get_qualities) |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to get qualities for {url}: {e}") |
|
return None |
|
def extract_and_analyze_frames(self, url: str, num_frames: int = 5, analysis_question: str = "Describe what you see in this frame") -> Dict[str, Any]: |
|
""" |
|
Extract key frames and analyze video content visually |
|
Based on search results showing OpenCV and MoviePy approaches |
|
""" |
|
logger.info(f"Starting frame extraction for {url} with {num_frames} frames") |
|
|
|
results = { |
|
'video_info': None, |
|
'frames_analyzed': [], |
|
'extraction_method': None, |
|
'total_frames_extracted': 0, |
|
'analysis_summary': None |
|
} |
|
|
|
try: |
|
|
|
video_info = self.get_video_info(url) |
|
if not video_info: |
|
return {'error': 'Could not retrieve video information'} |
|
|
|
results['video_info'] = { |
|
'title': video_info.get('title', 'Unknown'), |
|
'duration': video_info.get('length', 0), |
|
'author': video_info.get('author', 'Unknown') |
|
} |
|
|
|
|
|
frame_paths = self._strategy_1_opencv_extraction(url, num_frames) |
|
|
|
if frame_paths: |
|
results['extraction_method'] = 'OpenCV Video Download' |
|
results['frames_analyzed'] = self._analyze_extracted_frames(frame_paths, analysis_question) |
|
results['total_frames_extracted'] = len(frame_paths) |
|
|
|
|
|
self._cleanup_files(frame_paths) |
|
else: |
|
|
|
thumbnail_analysis = self._strategy_2_thumbnail_analysis(url, analysis_question) |
|
results['extraction_method'] = 'Thumbnail Analysis (Fallback)' |
|
results['frames_analyzed'] = [thumbnail_analysis] |
|
results['total_frames_extracted'] = 1 |
|
|
|
|
|
results['analysis_summary'] = self._generate_frame_analysis_summary(results) |
|
|
|
return results |
|
|
|
except Exception as e: |
|
logger.error(f"Error in frame extraction: {e}") |
|
return {'error': f'Frame extraction failed: {str(e)}'} |
|
|
|
def _strategy_1_opencv_extraction(self, url: str, num_frames: int) -> List[str]: |
|
""" |
|
Strategy 1: Download video and extract frames using OpenCV |
|
Based on search result [2] OpenCV approach |
|
""" |
|
try: |
|
|
|
if os.getenv("SPACE_ID"): |
|
logger.info("Restricted environment detected, skipping video download") |
|
return [] |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
video_path = self.download_video(url, output_path=temp_dir, resolution='lowest') |
|
|
|
if not video_path or not os.path.exists(video_path): |
|
logger.warning("Video download failed") |
|
return [] |
|
|
|
|
|
frame_paths = self._extract_frames_opencv(video_path, num_frames) |
|
|
|
|
|
if os.path.exists(video_path): |
|
os.remove(video_path) |
|
|
|
return frame_paths |
|
|
|
except Exception as e: |
|
logger.error(f"Strategy 1 failed: {e}") |
|
return [] |
|
|
|
def _extract_frames_opencv(self, video_path: str, num_frames: int) -> List[str]: |
|
""" |
|
Extract frames using OpenCV - implementation from search results |
|
Based on search result [2] and [4] showing cv2.VideoCapture approach |
|
""" |
|
frame_paths = [] |
|
|
|
try: |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
if not cap.isOpened(): |
|
logger.error("Error: Could not open video with OpenCV") |
|
return [] |
|
|
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
logger.info(f"Total frames in video: {total_frames}") |
|
|
|
if total_frames == 0: |
|
return [] |
|
|
|
|
|
if num_frames >= total_frames: |
|
frame_intervals = list(range(total_frames)) |
|
else: |
|
frame_intervals = [int(total_frames * i / (num_frames - 1)) for i in range(num_frames)] |
|
frame_intervals[-1] = total_frames - 1 |
|
|
|
|
|
for i, frame_num in enumerate(frame_intervals): |
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) |
|
ret, frame = cap.read() |
|
|
|
if ret: |
|
|
|
frame_filename = tempfile.mktemp(suffix=f'_frame_{i}.jpg') |
|
cv2.imwrite(frame_filename, frame) |
|
frame_paths.append(frame_filename) |
|
logger.debug(f"Extracted frame {i} at position {frame_num}") |
|
else: |
|
logger.warning(f"Failed to read frame at position {frame_num}") |
|
|
|
cap.release() |
|
logger.info(f"Successfully extracted {len(frame_paths)} frames using OpenCV") |
|
return frame_paths |
|
|
|
except Exception as e: |
|
logger.error(f"OpenCV frame extraction failed: {e}") |
|
return [] |
|
|
|
def _strategy_2_thumbnail_analysis(self, url: str, analysis_question: str) -> Dict[str, Any]: |
|
""" |
|
Strategy 2: Analyze thumbnail when video download isn't possible |
|
Fallback for HF Spaces environment |
|
""" |
|
try: |
|
from .multimodal_tools import MultimodalTools |
|
multimodal = MultimodalTools() |
|
|
|
|
|
video_info = self.get_video_info(url) |
|
if not video_info or not video_info.get('thumbnail_url'): |
|
return {'error': 'No thumbnail available'} |
|
|
|
|
|
thumbnail_url = video_info['thumbnail_url'] |
|
|
|
|
|
import requests |
|
response = requests.get(thumbnail_url, timeout=10) |
|
response.raise_for_status() |
|
|
|
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp_file: |
|
tmp_file.write(response.content) |
|
thumbnail_path = tmp_file.name |
|
|
|
|
|
analysis = multimodal.analyze_image( |
|
thumbnail_path, |
|
f"This is a thumbnail from a YouTube video. {analysis_question}" |
|
) |
|
|
|
|
|
os.unlink(thumbnail_path) |
|
|
|
return { |
|
'frame_number': 0, |
|
'timestamp': 'thumbnail', |
|
'analysis': analysis, |
|
'extraction_method': 'thumbnail' |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Thumbnail analysis failed: {e}") |
|
return {'error': f'Thumbnail analysis failed: {str(e)}'} |
|
|
|
def _analyze_extracted_frames(self, frame_paths: List[str], analysis_question: str) -> List[Dict[str, Any]]: |
|
""" |
|
Analyze extracted frames using multimodal AI |
|
""" |
|
analyzed_frames = [] |
|
|
|
try: |
|
from .multimodal_tools import MultimodalTools |
|
multimodal = MultimodalTools() |
|
|
|
for i, frame_path in enumerate(frame_paths): |
|
try: |
|
analysis = multimodal.analyze_image(frame_path, analysis_question) |
|
|
|
analyzed_frames.append({ |
|
'frame_number': i, |
|
'timestamp': f'frame_{i}', |
|
'analysis': analysis, |
|
'extraction_method': 'opencv' |
|
}) |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to analyze frame {i}: {e}") |
|
analyzed_frames.append({ |
|
'frame_number': i, |
|
'timestamp': f'frame_{i}', |
|
'analysis': f'Analysis failed: {str(e)}', |
|
'extraction_method': 'opencv' |
|
}) |
|
|
|
return analyzed_frames |
|
|
|
except Exception as e: |
|
logger.error(f"Frame analysis failed: {e}") |
|
return [] |
|
|
|
def _generate_frame_analysis_summary(self, results: Dict[str, Any]) -> str: |
|
"""Generate overall summary of frame analysis""" |
|
try: |
|
if not results.get('frames_analyzed'): |
|
return "No frames were successfully analyzed" |
|
|
|
|
|
all_analyses = [] |
|
for frame in results['frames_analyzed']: |
|
if isinstance(frame, dict) and 'analysis' in frame: |
|
all_analyses.append(frame['analysis']) |
|
|
|
if not all_analyses: |
|
return "No valid frame analyses found" |
|
|
|
|
|
from .multimodal_tools import MultimodalTools |
|
multimodal = MultimodalTools() |
|
|
|
combined_text = "\n\n".join([f"Frame {i}: {analysis}" for i, analysis in enumerate(all_analyses)]) |
|
|
|
summary_prompt = f""" |
|
Based on these frame analyses from a video titled "{results['video_info']['title']}", |
|
create a comprehensive summary of the video's visual content: |
|
|
|
{combined_text} |
|
|
|
Provide a concise summary highlighting the main visual elements, actions, and themes. |
|
""" |
|
|
|
summary = multimodal._make_openrouter_request({ |
|
"model": multimodal.text_model, |
|
"messages": [{"role": "user", "content": summary_prompt}], |
|
"temperature": 0, |
|
"max_tokens": 512 |
|
}) |
|
|
|
return summary |
|
|
|
except Exception as e: |
|
logger.error(f"Summary generation failed: {e}") |
|
return f"Summary generation failed: {str(e)}" |
|
|
|
def _cleanup_files(self, file_paths: List[str]): |
|
"""Clean up temporary files""" |
|
for file_path in file_paths: |
|
try: |
|
if os.path.exists(file_path): |
|
os.remove(file_path) |
|
except Exception as e: |
|
logger.warning(f"Could not remove {file_path}: {e}") |
|
|
|
|
|
def analyze_video_slides(self, url: str) -> Dict[str, Any]: |
|
"""Specialized method for analyzing educational videos with slides""" |
|
return self.extract_and_analyze_frames( |
|
url, |
|
num_frames=8, |
|
analysis_question="Is this a presentation slide? If yes, extract the main title and key points. If no, describe the visual content." |
|
) |
|
|
|
def analyze_video_content(self, url: str, question: str) -> str: |
|
"""Analyze video content and answer specific questions""" |
|
frame_results = self.extract_and_analyze_frames(url, num_frames=5, analysis_question=question) |
|
|
|
if 'error' in frame_results: |
|
return frame_results['error'] |
|
|
|
return frame_results.get('analysis_summary', 'No analysis available') |
|
|
|
|
|
def get_video_info(url: str) -> Optional[Dict[str, Any]]: |
|
"""Standalone function to get video information""" |
|
tools = YouTubeTools() |
|
return tools.get_video_info(url) |
|
|
|
def download_video(url: str, output_path: str = './downloads', |
|
resolution: str = 'highest', filename: Optional[str] = None) -> Optional[str]: |
|
"""Standalone function to download a video""" |
|
tools = YouTubeTools() |
|
return tools.download_video(url, output_path, resolution, filename) |
|
|
|
def download_audio(url: str, output_path: str = './downloads', |
|
filename: Optional[str] = None) -> Optional[str]: |
|
"""Standalone function to download audio only""" |
|
tools = YouTubeTools() |
|
return tools.download_audio(url, output_path, filename) |
|
|
|
def get_captions(url: str, language_code: str = 'en') -> Optional[str]: |
|
"""Standalone function to get video captions""" |
|
tools = YouTubeTools() |
|
return tools.get_captions(url, language_code) |
|
|
|
def get_playlist_info(playlist_url: str) -> Optional[Dict[str, Any]]: |
|
"""Standalone function to get playlist information""" |
|
tools = YouTubeTools() |
|
return tools.get_playlist_info(playlist_url) |
|
|