omarequalmars
Added wikipedia search
7b71bd5
# tools/youtube_tools.py (Updated with fixes)
"""
YouTube Tools Module - Fixed version using pytubefix
Addresses network issues, deprecation warnings, and playlist errors
"""
from pytubefix import YouTube, Playlist
from pytubefix.cli import on_progress
from typing import Optional, Dict, Any, List
import os
import time
import logging
from .utils import logger, validate_file_exists
import cv2
import tempfile
import os
from typing import Optional, Dict, Any, List
from PIL import Image
import numpy as np
class YouTubeTools:
"""YouTube tools with improved error handling and network resilience"""
def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
self.supported_formats = ['mp4', '3gp', 'webm']
self.supported_audio_formats = ['mp3', 'mp4', 'webm']
self.max_retries = max_retries
self.retry_delay = retry_delay
def _retry_operation(self, operation, *args, **kwargs):
"""Retry operation with exponential backoff for network issues"""
for attempt in range(self.max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise e
error_msg = str(e).lower()
if any(term in error_msg for term in ['network', 'socket', 'timeout', 'connection']):
wait_time = self.retry_delay * (2 ** attempt)
logger.warning(f"Network error (attempt {attempt + 1}/{self.max_retries}): {e}")
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise e
def get_video_info(self, url: str) -> Optional[Dict[str, Any]]:
"""
Retrieve comprehensive metadata about a YouTube video using pytubefix
"""
try:
def _get_info():
yt = YouTube(url, on_progress_callback=on_progress)
# Get available streams info with better error handling
video_streams = []
try:
streams = yt.streams.filter(progressive=True, file_extension='mp4')
for stream in streams:
try:
video_streams.append({
'resolution': getattr(stream, 'resolution', 'unknown'),
'fps': getattr(stream, 'fps', 'unknown'),
'video_codec': getattr(stream, 'video_codec', 'unknown'),
'audio_codec': getattr(stream, 'audio_codec', 'unknown'),
'filesize': getattr(stream, 'filesize', None),
'mime_type': getattr(stream, 'mime_type', 'unknown')
})
except Exception as stream_error:
logger.debug(f"Error processing stream: {stream_error}")
continue
except Exception as e:
logger.warning(f"Could not retrieve stream details: {e}")
# Get caption languages safely
captions_available = []
try:
if yt.captions:
captions_available = list(yt.captions.keys())
except Exception as e:
logger.warning(f"Could not retrieve captions list: {e}")
info = {
'title': getattr(yt, 'title', 'Unknown'),
'author': getattr(yt, 'author', 'Unknown'),
'channel_url': getattr(yt, 'channel_url', 'Unknown'),
'length': getattr(yt, 'length', 0),
'views': getattr(yt, 'views', 0),
'description': getattr(yt, 'description', ''),
'thumbnail_url': getattr(yt, 'thumbnail_url', ''),
'publish_date': yt.publish_date.isoformat() if getattr(yt, 'publish_date', None) else None,
'keywords': getattr(yt, 'keywords', []),
'video_id': getattr(yt, 'video_id', ''),
'watch_url': getattr(yt, 'watch_url', url),
'available_streams': video_streams,
'captions_available': captions_available
}
return info
info = self._retry_operation(_get_info)
if info is not None:
logger.info(f"Retrieved info for video: {info.get('title', 'Unknown')}")
return info
except Exception as e:
logger.error(f"Failed to get video info for {url}: {e}")
return None
def download_video(self, url: str, output_path: str = './downloads',
resolution: str = 'highest', filename: Optional[str] = None) -> Optional[str]:
"""Download a YouTube video with retry logic"""
try:
def _download():
os.makedirs(output_path, exist_ok=True)
yt = YouTube(url, on_progress_callback=on_progress)
# Select stream based on resolution preference
if resolution == 'highest':
stream = yt.streams.get_highest_resolution()
elif resolution == 'lowest':
stream = yt.streams.get_lowest_resolution()
else:
stream = yt.streams.filter(res=resolution, progressive=True, file_extension='mp4').first()
if not stream:
logger.warning(f"Resolution {resolution} not found, downloading highest instead")
stream = yt.streams.get_highest_resolution()
if not stream:
raise Exception("No suitable stream found for download")
# Download with custom filename if provided
if filename:
safe_filename = "".join(c for c in filename if c.isalnum() or c in (' ', '-', '_')).rstrip()
file_path = stream.download(output_path=output_path, filename=f"{safe_filename}.{stream.subtype}")
else:
file_path = stream.download(output_path=output_path)
return file_path
file_path = self._retry_operation(_download)
logger.info(f"Downloaded video to {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to download video from {url}: {e}")
return None
def download_audio(self, url: str, output_path: str = './downloads',
filename: Optional[str] = None) -> Optional[str]:
"""Download only audio from a YouTube video with retry logic"""
try:
def _download_audio():
os.makedirs(output_path, exist_ok=True)
yt = YouTube(url, on_progress_callback=on_progress)
audio_stream = yt.streams.get_audio_only()
if not audio_stream:
raise Exception("No audio stream found")
if filename:
safe_filename = "".join(c for c in filename if c.isalnum() or c in (' ', '-', '_')).rstrip()
file_path = audio_stream.download(output_path=output_path, filename=f"{safe_filename}.{audio_stream.subtype}")
else:
file_path = audio_stream.download(output_path=output_path)
return file_path
file_path = self._retry_operation(_download_audio)
logger.info(f"Downloaded audio to {file_path}")
return file_path
except Exception as e:
logger.error(f"Failed to download audio from {url}: {e}")
return None
def get_captions(self, url: str, language_code: str = 'en') -> Optional[str]:
"""
Get captions/subtitles - FIXED: No more deprecation warning
"""
try:
def _get_captions():
yt = YouTube(url, on_progress_callback=on_progress)
if not yt.captions:
logger.warning("No captions available for this video")
return None
# Use modern dictionary-style access instead of deprecated method
if language_code in yt.captions:
caption = yt.captions[language_code]
captions_text = caption.generate_srt_captions()
return captions_text
else:
available_langs = list(yt.captions.keys())
logger.warning(f"Captions not found for language {language_code}. Available: {available_langs}")
return None
result = self._retry_operation(_get_captions)
if result:
logger.info(f"Retrieved captions in {language_code}")
return result
except Exception as e:
logger.error(f"Failed to get captions from {url}: {e}")
return None
def get_playlist_info(self, playlist_url: str) -> Optional[Dict[str, Any]]:
"""
Get information about a YouTube playlist - FIXED: Better error handling
"""
try:
def _get_playlist_info():
playlist = Playlist(playlist_url)
# Get video URLs first (this triggers the playlist loading)
video_urls = list(playlist.video_urls)
# Safely access playlist properties with fallbacks
info = {
'video_count': len(video_urls),
'video_urls': video_urls[:10], # Limit to first 10 for performance
'total_videos': len(video_urls)
}
# Try to get additional info, but don't fail if unavailable
try:
info['title'] = getattr(playlist, 'title', 'Unknown Playlist')
except:
info['title'] = 'Private/Unavailable Playlist'
try:
info['description'] = getattr(playlist, 'description', '')
except:
info['description'] = 'Description unavailable'
try:
info['owner'] = getattr(playlist, 'owner', 'Unknown')
except:
info['owner'] = 'Owner unavailable'
return info
info = self._retry_operation(_get_playlist_info)
if info is not None:
logger.info(f"Retrieved playlist info: {info['title']} ({info['video_count']} videos)")
return info
except Exception as e:
logger.error(f"Failed to get playlist info from {playlist_url}: {e}")
return None
def get_available_qualities(self, url: str) -> Optional[List[Dict[str, Any]]]:
"""
Get all available download qualities - FIXED: Better network handling
"""
try:
def _get_qualities():
yt = YouTube(url, on_progress_callback=on_progress)
streams = []
# Get progressive streams (video + audio)
for stream in yt.streams.filter(progressive=True):
try:
streams.append({
'resolution': getattr(stream, 'resolution', 'unknown'),
'fps': getattr(stream, 'fps', 'unknown'),
'filesize_mb': round(stream.filesize / (1024 * 1024), 2) if getattr(stream, 'filesize', None) else None,
'mime_type': getattr(stream, 'mime_type', 'unknown'),
'video_codec': getattr(stream, 'video_codec', 'unknown'),
'audio_codec': getattr(stream, 'audio_codec', 'unknown')
})
except Exception as stream_error:
logger.debug(f"Error processing stream: {stream_error}")
continue
# Sort by resolution (numeric part)
def sort_key(x):
res = x['resolution']
if res and res != 'unknown' and res[:-1].isdigit():
return int(res[:-1])
return 0
return sorted(streams, key=sort_key, reverse=True)
return self._retry_operation(_get_qualities)
except Exception as e:
logger.error(f"Failed to get qualities for {url}: {e}")
return None
def extract_and_analyze_frames(self, url: str, num_frames: int = 5, analysis_question: str = "Describe what you see in this frame") -> Dict[str, Any]:
"""
Extract key frames and analyze video content visually
Based on search results showing OpenCV and MoviePy approaches
"""
logger.info(f"Starting frame extraction for {url} with {num_frames} frames")
results = {
'video_info': None,
'frames_analyzed': [],
'extraction_method': None,
'total_frames_extracted': 0,
'analysis_summary': None
}
try:
# Get video info first
video_info = self.get_video_info(url)
if not video_info:
return {'error': 'Could not retrieve video information'}
results['video_info'] = {
'title': video_info.get('title', 'Unknown'),
'duration': video_info.get('length', 0),
'author': video_info.get('author', 'Unknown')
}
# Strategy 1: Try full video download and OpenCV frame extraction (local environment)
frame_paths = self._strategy_1_opencv_extraction(url, num_frames)
if frame_paths:
results['extraction_method'] = 'OpenCV Video Download'
results['frames_analyzed'] = self._analyze_extracted_frames(frame_paths, analysis_question)
results['total_frames_extracted'] = len(frame_paths)
# Cleanup downloaded video and frames
self._cleanup_files(frame_paths)
else:
# Strategy 2: Thumbnail analysis fallback (HF Spaces compatible)
thumbnail_analysis = self._strategy_2_thumbnail_analysis(url, analysis_question)
results['extraction_method'] = 'Thumbnail Analysis (Fallback)'
results['frames_analyzed'] = [thumbnail_analysis]
results['total_frames_extracted'] = 1
# Generate overall summary
results['analysis_summary'] = self._generate_frame_analysis_summary(results)
return results
except Exception as e:
logger.error(f"Error in frame extraction: {e}")
return {'error': f'Frame extraction failed: {str(e)}'}
def _strategy_1_opencv_extraction(self, url: str, num_frames: int) -> List[str]:
"""
Strategy 1: Download video and extract frames using OpenCV
Based on search result [2] OpenCV approach
"""
try:
# Check if we're in a restricted environment (HF Spaces)
if os.getenv("SPACE_ID"):
logger.info("Restricted environment detected, skipping video download")
return []
# Download video to temporary location
temp_dir = tempfile.mkdtemp()
video_path = self.download_video(url, output_path=temp_dir, resolution='lowest')
if not video_path or not os.path.exists(video_path):
logger.warning("Video download failed")
return []
# Extract frames using OpenCV (based on search results)
frame_paths = self._extract_frames_opencv(video_path, num_frames)
# Cleanup video file (keep frame files for analysis)
if os.path.exists(video_path):
os.remove(video_path)
return frame_paths
except Exception as e:
logger.error(f"Strategy 1 failed: {e}")
return []
def _extract_frames_opencv(self, video_path: str, num_frames: int) -> List[str]:
"""
Extract frames using OpenCV - implementation from search results
Based on search result [2] and [4] showing cv2.VideoCapture approach
"""
frame_paths = []
try:
# Load video using OpenCV (from search results)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error("Error: Could not open video with OpenCV")
return []
# Get total frames and calculate intervals (from search results)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
logger.info(f"Total frames in video: {total_frames}")
if total_frames == 0:
return []
# Calculate frame intervals to get evenly distributed frames
if num_frames >= total_frames:
frame_intervals = list(range(total_frames))
else:
frame_intervals = [int(total_frames * i / (num_frames - 1)) for i in range(num_frames)]
frame_intervals[-1] = total_frames - 1 # Ensure we get the last frame
# Extract frames at calculated intervals (based on search results pattern)
for i, frame_num in enumerate(frame_intervals):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if ret:
# Save frame as temporary file (from search results)
frame_filename = tempfile.mktemp(suffix=f'_frame_{i}.jpg')
cv2.imwrite(frame_filename, frame)
frame_paths.append(frame_filename)
logger.debug(f"Extracted frame {i} at position {frame_num}")
else:
logger.warning(f"Failed to read frame at position {frame_num}")
cap.release()
logger.info(f"Successfully extracted {len(frame_paths)} frames using OpenCV")
return frame_paths
except Exception as e:
logger.error(f"OpenCV frame extraction failed: {e}")
return []
def _strategy_2_thumbnail_analysis(self, url: str, analysis_question: str) -> Dict[str, Any]:
"""
Strategy 2: Analyze thumbnail when video download isn't possible
Fallback for HF Spaces environment
"""
try:
from .multimodal_tools import MultimodalTools
multimodal = MultimodalTools()
# Get video info for thumbnail
video_info = self.get_video_info(url)
if not video_info or not video_info.get('thumbnail_url'):
return {'error': 'No thumbnail available'}
# Download and analyze thumbnail
thumbnail_url = video_info['thumbnail_url']
# Download thumbnail to temporary file
import requests
response = requests.get(thumbnail_url, timeout=10)
response.raise_for_status()
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as tmp_file:
tmp_file.write(response.content)
thumbnail_path = tmp_file.name
# Analyze thumbnail
analysis = multimodal.analyze_image(
thumbnail_path,
f"This is a thumbnail from a YouTube video. {analysis_question}"
)
# Cleanup
os.unlink(thumbnail_path)
return {
'frame_number': 0,
'timestamp': 'thumbnail',
'analysis': analysis,
'extraction_method': 'thumbnail'
}
except Exception as e:
logger.error(f"Thumbnail analysis failed: {e}")
return {'error': f'Thumbnail analysis failed: {str(e)}'}
def _analyze_extracted_frames(self, frame_paths: List[str], analysis_question: str) -> List[Dict[str, Any]]:
"""
Analyze extracted frames using multimodal AI
"""
analyzed_frames = []
try:
from .multimodal_tools import MultimodalTools
multimodal = MultimodalTools()
for i, frame_path in enumerate(frame_paths):
try:
analysis = multimodal.analyze_image(frame_path, analysis_question)
analyzed_frames.append({
'frame_number': i,
'timestamp': f'frame_{i}',
'analysis': analysis,
'extraction_method': 'opencv'
})
except Exception as e:
logger.warning(f"Failed to analyze frame {i}: {e}")
analyzed_frames.append({
'frame_number': i,
'timestamp': f'frame_{i}',
'analysis': f'Analysis failed: {str(e)}',
'extraction_method': 'opencv'
})
return analyzed_frames
except Exception as e:
logger.error(f"Frame analysis failed: {e}")
return []
def _generate_frame_analysis_summary(self, results: Dict[str, Any]) -> str:
"""Generate overall summary of frame analysis"""
try:
if not results.get('frames_analyzed'):
return "No frames were successfully analyzed"
# Combine all frame analyses
all_analyses = []
for frame in results['frames_analyzed']:
if isinstance(frame, dict) and 'analysis' in frame:
all_analyses.append(frame['analysis'])
if not all_analyses:
return "No valid frame analyses found"
# Use multimodal AI to create summary
from .multimodal_tools import MultimodalTools
multimodal = MultimodalTools()
combined_text = "\n\n".join([f"Frame {i}: {analysis}" for i, analysis in enumerate(all_analyses)])
summary_prompt = f"""
Based on these frame analyses from a video titled "{results['video_info']['title']}",
create a comprehensive summary of the video's visual content:
{combined_text}
Provide a concise summary highlighting the main visual elements, actions, and themes.
"""
summary = multimodal._make_openrouter_request({
"model": multimodal.text_model,
"messages": [{"role": "user", "content": summary_prompt}],
"temperature": 0,
"max_tokens": 512
})
return summary
except Exception as e:
logger.error(f"Summary generation failed: {e}")
return f"Summary generation failed: {str(e)}"
def _cleanup_files(self, file_paths: List[str]):
"""Clean up temporary files"""
for file_path in file_paths:
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logger.warning(f"Could not remove {file_path}: {e}")
# Convenience method for specific use cases
def analyze_video_slides(self, url: str) -> Dict[str, Any]:
"""Specialized method for analyzing educational videos with slides"""
return self.extract_and_analyze_frames(
url,
num_frames=8,
analysis_question="Is this a presentation slide? If yes, extract the main title and key points. If no, describe the visual content."
)
def analyze_video_content(self, url: str, question: str) -> str:
"""Analyze video content and answer specific questions"""
frame_results = self.extract_and_analyze_frames(url, num_frames=5, analysis_question=question)
if 'error' in frame_results:
return frame_results['error']
return frame_results.get('analysis_summary', 'No analysis available')
# Convenience functions (unchanged)
def get_video_info(url: str) -> Optional[Dict[str, Any]]:
"""Standalone function to get video information"""
tools = YouTubeTools()
return tools.get_video_info(url)
def download_video(url: str, output_path: str = './downloads',
resolution: str = 'highest', filename: Optional[str] = None) -> Optional[str]:
"""Standalone function to download a video"""
tools = YouTubeTools()
return tools.download_video(url, output_path, resolution, filename)
def download_audio(url: str, output_path: str = './downloads',
filename: Optional[str] = None) -> Optional[str]:
"""Standalone function to download audio only"""
tools = YouTubeTools()
return tools.download_audio(url, output_path, filename)
def get_captions(url: str, language_code: str = 'en') -> Optional[str]:
"""Standalone function to get video captions"""
tools = YouTubeTools()
return tools.get_captions(url, language_code)
def get_playlist_info(playlist_url: str) -> Optional[Dict[str, Any]]:
"""Standalone function to get playlist information"""
tools = YouTubeTools()
return tools.get_playlist_info(playlist_url)