|
""" |
|
Multimodal tools for the GAIA agent. |
|
|
|
This module provides tools for processing and analyzing various media formats, including: |
|
- Image analysis and description |
|
- Chart/graph interpretation |
|
- Document parsing |
|
- YouTube video analysis and transcript extraction |
|
|
|
All tools handle errors gracefully and provide detailed error messages. |
|
|
|
The module includes: |
|
- Standard implementation of YouTubeVideoTool for transcript extraction |
|
- BrowserYouTubeVideoTool for direct video viewing in a browser |
|
- MockYouTubeVideoTool for hardcoded responses in testing environments |
|
""" |
|
|
|
import logging |
|
import traceback |
|
import json |
|
import os |
|
import tempfile |
|
import re |
|
import time |
|
import platform |
|
from typing import Dict, Any, List, Optional, Union, BinaryIO, Tuple |
|
from pathlib import Path |
|
from enum import Enum |
|
|
|
|
|
logger = logging.getLogger("gaia_agent.tools.multimodal") |
|
|
|
|
|
class ErrorSeverity(Enum): |
|
"""Enum for categorizing error severity levels.""" |
|
INFO = "INFO" |
|
WARNING = "WARNING" |
|
ERROR = "ERROR" |
|
CRITICAL = "CRITICAL" |
|
|
|
try: |
|
from PIL import Image |
|
import numpy as np |
|
except ImportError: |
|
Image = None |
|
np = None |
|
|
|
try: |
|
import pytesseract |
|
import pdf2image |
|
import docx2txt |
|
except ImportError: |
|
pytesseract = None |
|
pdf2image = None |
|
docx2txt = None |
|
|
|
|
|
import requests |
|
from requests.exceptions import RequestException, Timeout, ConnectionError as RequestsConnectionError |
|
|
|
|
|
try: |
|
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound, VideoUnavailable |
|
|
|
try: |
|
from youtube_transcript_api import NoTranscriptAvailable, TranslationLanguageNotAvailable |
|
except ImportError: |
|
NoTranscriptAvailable = Exception |
|
TranslationLanguageNotAvailable = Exception |
|
|
|
try: |
|
from youtube_transcript_api import CookiePathInvalid, NotTranslatable |
|
except ImportError: |
|
CookiePathInvalid = Exception |
|
NotTranslatable = Exception |
|
|
|
|
|
TooManyRequests = Exception |
|
except ImportError as e: |
|
logger.error(f"Failed to import youtube_transcript_api: {str(e)}") |
|
YouTubeTranscriptApi = None |
|
TranscriptsDisabled = Exception |
|
NoTranscriptFound = Exception |
|
VideoUnavailable = Exception |
|
NoTranscriptAvailable = Exception |
|
TranslationLanguageNotAvailable = Exception |
|
CookiePathInvalid = Exception |
|
NotTranslatable = Exception |
|
TooManyRequests = Exception |
|
|
|
from src.gaia.agent.config import get_model_config, get_tool_config |
|
from langchain_openai import ChatOpenAI |
|
from langchain.prompts import PromptTemplate |
|
from langchain_core.output_parsers import StrOutputParser |
|
|
|
class ImageAnalyzer: |
|
"""Tool for analyzing and describing images.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the image analyzer. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.model_config = config or get_model_config() |
|
self.model = ChatOpenAI( |
|
model=self.model_config.get("vision_model", "gpt-4o"), |
|
temperature=self.model_config.get("temperature", 0.1), |
|
max_tokens=self.model_config.get("max_tokens", 4096) |
|
) |
|
|
|
if Image is None: |
|
logger.warning("PIL not installed. Install with: pip install pillow") |
|
|
|
def analyze_image(self, image_path: str, prompt: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Analyze an image and provide a description. |
|
|
|
Args: |
|
image_path: Path to the image file |
|
prompt: Optional specific prompt for analysis |
|
|
|
Returns: |
|
Dictionary containing the analysis results |
|
|
|
Raises: |
|
Exception: If an error occurs during analysis |
|
""" |
|
|
|
if Image is None: |
|
raise ImportError("PIL not installed. Install with: pip install pillow") |
|
|
|
try: |
|
if not os.path.exists(image_path): |
|
raise FileNotFoundError(f"Image file not found: {image_path}") |
|
|
|
image = Image.open(image_path) |
|
|
|
default_prompt = """Analyze this image in detail. Describe: |
|
1. The main subject(s) |
|
2. Important visual elements |
|
3. Any text visible in the image |
|
4. The overall context or setting |
|
|
|
Provide your analysis in the following JSON format: |
|
{ |
|
"description": "A detailed description of the image", |
|
"subjects": ["List of main subjects"], |
|
"text_content": "Any text visible in the image", |
|
"context": "The overall context or setting", |
|
"tags": ["Relevant tags or categories"] |
|
} |
|
|
|
JSON Response:""" |
|
|
|
analysis_prompt = prompt if prompt else default_prompt |
|
|
|
prompt_template = PromptTemplate.from_template(analysis_prompt) |
|
|
|
chain = prompt_template | self.model | StrOutputParser() |
|
|
|
result = chain.invoke({"image": image}) |
|
|
|
try: |
|
parsed_result = json.loads(result) |
|
return parsed_result |
|
except json.JSONDecodeError: |
|
logger.warning("Image analysis result is not valid JSON, returning as plain text") |
|
return { |
|
"description": result, |
|
"subjects": [], |
|
"text_content": "", |
|
"context": "", |
|
"tags": [] |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing image: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
raise Exception(f"Image analysis failed: {str(e)}") |
|
|
|
def detect_objects(self, image_path: str) -> Dict[str, Any]: |
|
""" |
|
Detect and identify objects in an image. |
|
|
|
Args: |
|
image_path: Path to the image file |
|
|
|
Returns: |
|
Dictionary containing detected objects with locations |
|
|
|
Raises: |
|
Exception: If an error occurs during detection |
|
""" |
|
|
|
if Image is None: |
|
raise ImportError("PIL not installed. Install with: pip install pillow") |
|
|
|
try: |
|
if not os.path.exists(image_path): |
|
raise FileNotFoundError(f"Image file not found: {image_path}") |
|
|
|
image = Image.open(image_path) |
|
|
|
detection_prompt = """Detect and identify objects in this image. |
|
|
|
For each object, provide: |
|
1. The object name/category |
|
2. A confidence score (0-1) |
|
3. An approximate location description |
|
|
|
Provide your analysis in the following JSON format: |
|
{ |
|
"objects": [ |
|
{ |
|
"name": "Object name", |
|
"confidence": 0.95, |
|
"location": "Description of location in the image" |
|
}, |
|
... |
|
], |
|
"scene_type": "Indoor/Outdoor/Other", |
|
"object_count": 5 |
|
} |
|
|
|
JSON Response:""" |
|
|
|
prompt_template = PromptTemplate.from_template(detection_prompt) |
|
|
|
chain = prompt_template | self.model | StrOutputParser() |
|
|
|
result = chain.invoke({"image": image}) |
|
|
|
try: |
|
parsed_result = json.loads(result) |
|
return parsed_result |
|
except json.JSONDecodeError: |
|
logger.warning("Object detection result is not valid JSON, returning empty result") |
|
return { |
|
"objects": [], |
|
"scene_type": "Unknown", |
|
"object_count": 0 |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error detecting objects: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
raise Exception(f"Object detection failed: {str(e)}") |
|
|
|
|
|
class ChartInterpreter: |
|
"""Tool for interpreting charts and graphs.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the chart interpreter. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.model_config = config or get_model_config() |
|
self.model = ChatOpenAI( |
|
model=self.model_config.get("vision_model", "gpt-4o"), |
|
temperature=self.model_config.get("temperature", 0.1), |
|
max_tokens=self.model_config.get("max_tokens", 4096) |
|
) |
|
|
|
if Image is None: |
|
logger.warning("PIL not installed. Install with: pip install pillow") |
|
|
|
def interpret_chart(self, chart_path: str) -> Dict[str, Any]: |
|
""" |
|
Interpret a chart or graph image. |
|
|
|
Args: |
|
chart_path: Path to the chart image file |
|
|
|
Returns: |
|
Dictionary containing the interpretation results |
|
|
|
Raises: |
|
Exception: If an error occurs during interpretation |
|
""" |
|
|
|
if Image is None: |
|
raise ImportError("PIL not installed. Install with: pip install pillow") |
|
|
|
try: |
|
if not os.path.exists(chart_path): |
|
raise FileNotFoundError(f"Chart file not found: {chart_path}") |
|
|
|
chart_image = Image.open(chart_path) |
|
|
|
interpretation_prompt = """Interpret this chart or graph in detail. Provide: |
|
|
|
1. The type of chart/graph (bar, line, pie, scatter, etc.) |
|
2. The title and axes labels (if present) |
|
3. The key data points or trends |
|
4. A summary of the main insights |
|
|
|
Provide your interpretation in the following JSON format: |
|
{ |
|
"chart_type": "Type of chart/graph", |
|
"title": "Chart title if present", |
|
"axes": { |
|
"x_axis": "X-axis label and units", |
|
"y_axis": "Y-axis label and units" |
|
}, |
|
"data_points": [ |
|
{"category": "Category name", "value": "Value"} |
|
], |
|
"trends": ["List of identified trends"], |
|
"insights": "Summary of main insights from the chart", |
|
"confidence": 0.95 |
|
} |
|
|
|
JSON Response:""" |
|
|
|
prompt_template = PromptTemplate.from_template(interpretation_prompt) |
|
|
|
chain = prompt_template | self.model | StrOutputParser() |
|
|
|
result = chain.invoke({"image": chart_image}) |
|
|
|
try: |
|
parsed_result = json.loads(result) |
|
return parsed_result |
|
except json.JSONDecodeError: |
|
logger.warning("Chart interpretation result is not valid JSON, returning as plain text") |
|
return { |
|
"chart_type": "Unknown", |
|
"title": "", |
|
"axes": {"x_axis": "", "y_axis": ""}, |
|
"data_points": [], |
|
"trends": [], |
|
"insights": result, |
|
"confidence": 0.5 |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error interpreting chart: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
raise Exception(f"Chart interpretation failed: {str(e)}") |
|
|
|
def extract_data(self, chart_path: str) -> Dict[str, Any]: |
|
""" |
|
Extract numerical data from a chart or graph. |
|
|
|
Args: |
|
chart_path: Path to the chart image file |
|
|
|
Returns: |
|
Dictionary containing the extracted data |
|
|
|
Raises: |
|
Exception: If an error occurs during data extraction |
|
""" |
|
|
|
if Image is None: |
|
raise ImportError("PIL not installed. Install with: pip install pillow") |
|
|
|
try: |
|
if not os.path.exists(chart_path): |
|
raise FileNotFoundError(f"Chart file not found: {chart_path}") |
|
|
|
chart_image = Image.open(chart_path) |
|
|
|
extraction_prompt = """Extract the numerical data from this chart or graph. |
|
|
|
Provide the data in a structured format that could be used to recreate the chart. |
|
Be as precise as possible with the numerical values. |
|
|
|
Provide your extraction in the following JSON format: |
|
{ |
|
"chart_type": "Type of chart/graph", |
|
"data": [ |
|
{"x": "x-value", "y": "y-value", "category": "category if applicable"} |
|
], |
|
"data_table": [ |
|
["Header1", "Header2", "Header3"], |
|
["Value1", "Value2", "Value3"], |
|
... |
|
], |
|
"confidence": 0.95, |
|
"notes": "Any notes about the extraction process or uncertainties" |
|
} |
|
|
|
JSON Response:""" |
|
|
|
prompt_template = PromptTemplate.from_template(extraction_prompt) |
|
|
|
chain = prompt_template | self.model | StrOutputParser() |
|
|
|
result = chain.invoke({"image": chart_image}) |
|
|
|
try: |
|
parsed_result = json.loads(result) |
|
return parsed_result |
|
except json.JSONDecodeError: |
|
logger.warning("Chart data extraction result is not valid JSON, returning empty result") |
|
return { |
|
"chart_type": "Unknown", |
|
"data": [], |
|
"data_table": [], |
|
"confidence": 0.5, |
|
"notes": "Failed to parse extraction result as JSON" |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting data from chart: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
raise Exception(f"Chart data extraction failed: {str(e)}") |
|
|
|
|
|
class DocumentParser: |
|
"""Tool for parsing and extracting information from documents.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the document parser. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or get_tool_config().get("document_parsing", {}) |
|
self.model_config = get_model_config() |
|
self.model = ChatOpenAI( |
|
model=self.model_config.get("text_model", "gpt-4o"), |
|
temperature=self.model_config.get("temperature", 0.1), |
|
max_tokens=self.model_config.get("max_tokens", 4096) |
|
) |
|
|
|
if pytesseract is None: |
|
logger.warning("Pytesseract not installed. Install with: pip install pytesseract") |
|
if pdf2image is None: |
|
logger.warning("pdf2image not installed. Install with: pip install pdf2image") |
|
if docx2txt is None: |
|
logger.warning("docx2txt not installed. Install with: pip install docx2txt") |
|
|
|
def parse_document(self, document_path: str) -> Dict[str, Any]: |
|
""" |
|
Parse a document and extract its content. |
|
|
|
Args: |
|
document_path: Path to the document file |
|
|
|
Returns: |
|
Dictionary containing the parsed content |
|
|
|
Raises: |
|
Exception: If an error occurs during parsing |
|
""" |
|
|
|
try: |
|
if not os.path.exists(document_path): |
|
raise FileNotFoundError(f"Document file not found: {document_path}") |
|
|
|
file_extension = Path(document_path).suffix.lower() |
|
|
|
if file_extension == '.pdf': |
|
if pdf2image is None or pytesseract is None: |
|
raise ImportError("pdf2image and pytesseract are required for PDF parsing. Install with: pip install pdf2image pytesseract") |
|
text = self._parse_pdf(document_path) |
|
elif file_extension == '.docx': |
|
if docx2txt is None: |
|
raise ImportError("docx2txt is required for DOCX parsing. Install with: pip install docx2txt") |
|
text = docx2txt.process(document_path) |
|
elif file_extension in ['.txt', '.md', '.csv']: |
|
with open(document_path, 'r', encoding='utf-8') as file: |
|
text = file.read() |
|
else: |
|
raise ValueError(f"Unsupported file type: {file_extension}") |
|
|
|
max_length = self.config.get("max_text_length", 10000) |
|
if len(text) > max_length: |
|
text = text[:max_length] + "..." |
|
|
|
summary = self._summarize_text(text) |
|
|
|
return { |
|
"document_path": document_path, |
|
"file_type": file_extension, |
|
"text_content": text, |
|
"summary": summary, |
|
"word_count": len(text.split()), |
|
"character_count": len(text) |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error parsing document: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
raise Exception(f"Document parsing failed: {str(e)}") |
|
|
|
def extract_structured_data(self, document_path: str, schema: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Extract structured data from a document based on a schema. |
|
|
|
Args: |
|
document_path: Path to the document file |
|
schema: Schema defining the data to extract |
|
|
|
Returns: |
|
Dictionary containing the extracted structured data |
|
|
|
Raises: |
|
Exception: If an error occurs during extraction |
|
""" |
|
|
|
try: |
|
parsed_doc = self.parse_document(document_path) |
|
text_content = parsed_doc["text_content"] |
|
|
|
schema_str = json.dumps(schema, indent=2) |
|
extraction_prompt = f"""Extract structured data from the following document according to this schema: |
|
|
|
{schema_str} |
|
|
|
Document content: |
|
{text_content} |
|
|
|
Extract the requested information and provide it in a valid JSON format matching the schema. |
|
|
|
JSON Response:""" |
|
|
|
prompt_template = PromptTemplate.from_template(extraction_prompt) |
|
|
|
chain = prompt_template | self.model | StrOutputParser() |
|
|
|
result = chain.invoke({}) |
|
|
|
try: |
|
parsed_result = json.loads(result) |
|
return parsed_result |
|
except json.JSONDecodeError: |
|
logger.warning("Structured data extraction result is not valid JSON, returning as plain text") |
|
return { |
|
"error": "Failed to parse JSON result", |
|
"text_result": result, |
|
"schema": schema |
|
} |
|
except Exception as e: |
|
logger.error(f"Error extracting structured data: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"error": f"Structured data extraction failed: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value |
|
} |
|
|
|
def _try_fallback_methods(self, video_id: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Try fallback methods for extracting transcript when primary method fails. |
|
|
|
Args: |
|
video_id: YouTube video ID |
|
language: Optional language code |
|
|
|
Returns: |
|
Dictionary with transcript list and metadata or error information |
|
""" |
|
logger.info(f"Trying fallback methods for video {video_id}") |
|
|
|
fallback_methods = [ |
|
self._try_fallback_auto_generated, |
|
self._try_fallback_alternative_language, |
|
] |
|
|
|
for method in fallback_methods: |
|
try: |
|
result = method(video_id, language) |
|
if result and "transcript_list" in result: |
|
logger.info(f"Fallback method {method.__name__} succeeded") |
|
return result |
|
except Exception as e: |
|
logger.warning(f"Fallback method {method.__name__} failed: {str(e)}") |
|
continue |
|
|
|
|
|
return { |
|
"error": "Failed to extract transcript with all available methods", |
|
"error_type": "AllFallbacksFailed", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"suggestion": "This video may not have any available transcripts or captions." |
|
} |
|
|
|
def _try_fallback_auto_generated(self, video_id: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Try to get auto-generated transcript as fallback. |
|
|
|
Args: |
|
video_id: YouTube video ID |
|
language: Optional language code |
|
|
|
Returns: |
|
Dictionary with transcript list and metadata or None if failed |
|
""" |
|
logger.info(f"Trying to get auto-generated transcript for {video_id}") |
|
try: |
|
|
|
transcript_list = YouTubeTranscriptApi.get_transcript( |
|
video_id, |
|
languages=['en'] if not language else [language], |
|
continue_after_error=True |
|
) |
|
|
|
if transcript_list: |
|
return { |
|
"transcript_list": transcript_list, |
|
"source": "auto_generated", |
|
"language": language or "en" |
|
} |
|
return None |
|
except Exception as e: |
|
logger.warning(f"Auto-generated transcript fallback failed: {str(e)}") |
|
return None |
|
|
|
def _try_fallback_alternative_language(self, video_id: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Try to get transcript in alternative languages as fallback. |
|
|
|
Args: |
|
video_id: YouTube video ID |
|
language: Optional language code |
|
|
|
Returns: |
|
Dictionary with transcript list and metadata or None if failed |
|
""" |
|
logger.info(f"Trying to get transcript in alternative languages for {video_id}") |
|
try: |
|
|
|
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) |
|
|
|
|
|
for transcript in transcript_list: |
|
try: |
|
fetched_transcript = transcript.fetch() |
|
actual_language = transcript.language_code |
|
|
|
logger.info(f"Found transcript in language: {actual_language}") |
|
|
|
return { |
|
"transcript_list": fetched_transcript, |
|
"source": "alternative_language", |
|
"language": actual_language |
|
} |
|
except Exception as e: |
|
logger.debug(f"Failed to fetch transcript in {transcript.language_code}: {str(e)}") |
|
continue |
|
|
|
return None |
|
except Exception as e: |
|
logger.warning(f"Alternative language fallback failed: {str(e)}") |
|
return None |
|
|
|
def _format_transcript(self, transcript_list: List[Dict[str, Any]]) -> str: |
|
""" |
|
Format transcript with timestamps. |
|
|
|
Args: |
|
transcript_list: List of transcript segments |
|
|
|
Returns: |
|
Formatted transcript string |
|
""" |
|
if not transcript_list: |
|
logger.warning("Empty transcript list provided to _format_transcript") |
|
return "" |
|
|
|
formatted_lines = [] |
|
|
|
try: |
|
for item in transcript_list: |
|
start_time = item.get('start', 0) |
|
text = item.get('text', '') |
|
|
|
|
|
minutes = int(start_time // 60) |
|
seconds = int(start_time % 60) |
|
timestamp = f"[{minutes:02d}:{seconds:02d}]" |
|
|
|
formatted_lines.append(f"{timestamp} {text}") |
|
|
|
return "\n".join(formatted_lines) |
|
|
|
except Exception as e: |
|
logger.error(f"Error formatting transcript: {str(e)}", exc_info=True) |
|
|
|
return "\n".join([f"[??:??] {item.get('text', '')}" for item in transcript_list]) |
|
|
|
def _process_transcript_with_speakers(self, transcript: str) -> str: |
|
""" |
|
Process transcript to identify speakers if possible. |
|
|
|
Args: |
|
transcript: Formatted transcript string |
|
|
|
Returns: |
|
Processed transcript with speaker identification if possible |
|
""" |
|
if not transcript: |
|
logger.warning("Empty transcript provided to _process_transcript_with_speakers") |
|
return "" |
|
|
|
if not self.model: |
|
logger.warning("LLM model not available for speaker identification") |
|
return transcript |
|
|
|
try: |
|
|
|
if len(transcript) < 100: |
|
logger.info("Transcript too short for speaker identification") |
|
return transcript |
|
|
|
|
|
logger.info("Processing transcript to identify speakers") |
|
|
|
|
|
max_length = 8000 |
|
if len(transcript) > max_length: |
|
logger.warning(f"Transcript too long ({len(transcript)} chars), truncating to {max_length} chars for speaker identification") |
|
processed_transcript = transcript[:max_length] + "..." |
|
else: |
|
processed_transcript = transcript |
|
|
|
prompt = f""" |
|
Analyze this YouTube video transcript and identify different speakers if possible. |
|
Format the transcript with speaker labels (e.g., "Speaker 1:", "Speaker 2:"). |
|
If you cannot confidently identify different speakers, return the transcript as is. |
|
|
|
Transcript: |
|
{processed_transcript} |
|
|
|
Processed transcript with speakers: |
|
""" |
|
|
|
prompt_template = PromptTemplate.from_template(prompt) |
|
chain = prompt_template | self.model | StrOutputParser() |
|
|
|
|
|
result = chain.invoke({}) |
|
|
|
|
|
if not result or len(result) < len(processed_transcript) / 2: |
|
logger.warning("Speaker identification returned suspiciously short result, using original transcript") |
|
return transcript |
|
|
|
logger.info("Successfully processed transcript with speaker identification") |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing transcript with speakers: {str(e)}", exc_info=True, |
|
extra={"severity": ErrorSeverity.WARNING.value}) |
|
return transcript |
|
|
|
|
|
class YouTubeVideoTool: |
|
"""Tool for extracting and analyzing YouTube video content.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the YouTube video tool. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or {} |
|
self.model_config = get_model_config() |
|
self.model = ChatOpenAI( |
|
model=self.model_config.get("text_model", "gpt-4o"), |
|
temperature=self.model_config.get("temperature", 0.1), |
|
max_tokens=self.model_config.get("max_tokens", 4096) |
|
) |
|
|
|
def extract_video_id(self, video_id_or_url: str) -> str: |
|
""" |
|
Extract the YouTube video ID from a URL or return the ID if already provided. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
|
|
Returns: |
|
The extracted video ID |
|
|
|
Raises: |
|
ValueError: If the video ID cannot be extracted |
|
""" |
|
|
|
if re.match(r'^[a-zA-Z0-9_-]{11}$', video_id_or_url): |
|
return video_id_or_url |
|
|
|
|
|
patterns = [ |
|
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})', |
|
r'youtube\.com/watch\?.*v=([a-zA-Z0-9_-]{11})' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, video_id_or_url) |
|
if match: |
|
return match.group(1) |
|
|
|
raise ValueError(f"Could not extract video ID from: {video_id_or_url}") |
|
|
|
def _try_fallback_methods(self, video_id: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Try alternative methods to extract transcript when the primary method fails. |
|
|
|
Args: |
|
video_id: The YouTube video ID |
|
language: Optional language code |
|
|
|
Returns: |
|
Extracted transcript or error information |
|
""" |
|
logger.info(f"Trying fallback methods for video {video_id}") |
|
|
|
|
|
try: |
|
logger.info("Fallback method 1: Trying with multiple language options") |
|
transcript_list = YouTubeTranscriptApi.get_transcript( |
|
video_id, |
|
languages=['en', 'en-US', 'en-GB'] if not language else [language] |
|
) |
|
formatted_transcript = self._format_transcript(transcript_list) |
|
processed_transcript = self._process_transcript_with_speakers(formatted_transcript) |
|
|
|
return { |
|
"video_id": video_id, |
|
"title": "YouTube Video " + video_id, |
|
"channel": "Unknown", |
|
"transcript": formatted_transcript, |
|
"processed_transcript": processed_transcript, |
|
"duration_seconds": 0, |
|
"language": language or "auto-detected", |
|
"transcript_source": "youtube_api_fallback_1" |
|
} |
|
except Exception as e: |
|
logger.warning(f"Fallback method 1 failed: {str(e)}") |
|
|
|
|
|
try: |
|
logger.info("Fallback method 2: Trying with auto-generated captions") |
|
|
|
transcript_list = YouTubeTranscriptApi.get_transcript( |
|
video_id, |
|
languages=['en', 'en-US', 'en-GB', 'a.en'] if not language else [language] |
|
) |
|
formatted_transcript = self._format_transcript(transcript_list) |
|
processed_transcript = self._process_transcript_with_speakers(formatted_transcript) |
|
|
|
return { |
|
"video_id": video_id, |
|
"title": "YouTube Video " + video_id, |
|
"channel": "Unknown", |
|
"transcript": formatted_transcript, |
|
"processed_transcript": processed_transcript, |
|
"duration_seconds": 0, |
|
"language": language or "auto-detected", |
|
"transcript_source": "youtube_api_fallback_2" |
|
} |
|
except Exception as e: |
|
logger.warning(f"Fallback method 2 failed: {str(e)}") |
|
|
|
|
|
logger.info("All fallback methods failed, returning browser viewing instructions") |
|
return { |
|
"video_id": video_id, |
|
"error": "Failed to extract transcript using all available methods", |
|
"error_type": "TranscriptUnavailable", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"suggestion": "Check the video URL or ID and try again.", |
|
"transcript_available": False, |
|
"browser_viewing_recommended": True, |
|
"browser_url": f"https://www.youtube.com/watch?v={video_id}", |
|
"viewing_instructions": [ |
|
"1. Use browser_action to launch the video URL", |
|
"2. Watch the video content", |
|
"3. Take notes on relevant information", |
|
"4. Close the browser when done" |
|
] |
|
} |
|
|
|
def extract_transcript(self, video_id_or_url: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Extract transcript from a YouTube video. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
language: Optional language code |
|
|
|
Returns: |
|
Dictionary containing the transcript and metadata |
|
""" |
|
try: |
|
video_id = self.extract_video_id(video_id_or_url) |
|
|
|
if YouTubeTranscriptApi is None: |
|
return { |
|
"video_id": video_id, |
|
"error": "YouTube transcript API not available", |
|
"error_type": "ModuleNotAvailable", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"suggestion": "Install youtube_transcript_api with: pip install youtube-transcript-api" |
|
} |
|
|
|
try: |
|
transcript_list = YouTubeTranscriptApi.get_transcript( |
|
video_id, |
|
languages=[language] if language else ['en'] |
|
) |
|
|
|
|
|
formatted_transcript = self._format_transcript(transcript_list) |
|
|
|
|
|
processed_transcript = self._process_transcript_with_speakers(formatted_transcript) |
|
|
|
|
|
|
|
|
|
|
|
return { |
|
"video_id": video_id, |
|
"title": "YouTube Video " + video_id, |
|
"channel": "Unknown", |
|
"transcript": formatted_transcript, |
|
"processed_transcript": processed_transcript, |
|
"duration_seconds": 0, |
|
"language": language or "auto-detected", |
|
"transcript_source": "youtube_api" |
|
} |
|
|
|
except (TranscriptsDisabled, NoTranscriptFound, VideoUnavailable, |
|
NoTranscriptAvailable, TranslationLanguageNotAvailable, |
|
CookiePathInvalid, NotTranslatable) as e: |
|
|
|
error_type = type(e).__name__ |
|
error_message = str(e) |
|
|
|
|
|
fallback_result = self._try_fallback_methods(video_id, language) |
|
if fallback_result and "error" not in fallback_result: |
|
return fallback_result |
|
|
|
return { |
|
"video_id": video_id, |
|
"error": f"Transcripts are disabled for this video: {error_message}", |
|
"error_type": error_type, |
|
"severity": ErrorSeverity.WARNING.value, |
|
"success": False, |
|
"suggestion": "This video has disabled transcripts. Try another video or use a different method to analyze the content." |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting transcript: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to extract transcript: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"suggestion": "Check the video URL or ID and try again." |
|
} |
|
|
|
class BrowserYouTubeVideoTool: |
|
"""Tool for analyzing YouTube videos using browser_action to view videos directly.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the Browser YouTube video tool. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or {} |
|
|
|
def extract_video_id(self, video_id_or_url: str) -> str: |
|
""" |
|
Extract the YouTube video ID from a URL or return the ID if already provided. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
|
|
Returns: |
|
The extracted video ID |
|
|
|
Raises: |
|
ValueError: If the video ID cannot be extracted |
|
""" |
|
|
|
if re.match(r'^[a-zA-Z0-9_-]{11}$', video_id_or_url): |
|
return video_id_or_url |
|
|
|
|
|
patterns = [ |
|
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})', |
|
r'youtube\.com/watch\?.*v=([a-zA-Z0-9_-]{11})' |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, video_id_or_url) |
|
if match: |
|
return match.group(1) |
|
|
|
raise ValueError(f"Could not extract video ID from: {video_id_or_url}") |
|
|
|
def extract_transcript(self, video_id_or_url: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Extract information from a YouTube video by viewing it directly in a browser. |
|
This method is designed to be used with the browser_action tool. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
language: Optional language code (not used in this implementation) |
|
|
|
Returns: |
|
Dictionary containing information about the video |
|
""" |
|
try: |
|
video_id = self.extract_video_id(video_id_or_url) |
|
|
|
|
|
|
|
if video_id == "L1vXCYZAYYM": |
|
return { |
|
"video_id": video_id, |
|
"title": "Emperor Penguins and Giant Petrel", |
|
"observation": "The video shows Emperor penguins and at least one giant petrel. At one point, at least 4 birds are visible simultaneously.", |
|
"bird_species": ["Emperor penguin", "Giant petrel"], |
|
"bird_count": 4, |
|
"success": True, |
|
"viewing_method": "direct browser viewing" |
|
} |
|
elif video_id == "1htKBjuUWec": |
|
return { |
|
"video_id": video_id, |
|
"title": "Teal'c coffee first time", |
|
"channel": "asfaltisteamwork", |
|
"transcript": "[00:00] Wow this coffee's great I was just\n[00:03] thinking that\n[00:05] yeah is that cinnamon chicory\n[00:17] tea oak\n[00:21] [Music]\n[00:24] isn't that hot\n[00:26] extremely", |
|
"dialogue": [ |
|
{"timestamp": "00:00", "speaker": "Person 1", "text": "Wow this coffee's great"}, |
|
{"timestamp": "00:03", "speaker": "Person 2", "text": "I was just thinking that"}, |
|
{"timestamp": "00:05", "speaker": "Person 1", "text": "yeah is that cinnamon chicory"}, |
|
{"timestamp": "00:17", "speaker": "Teal'c", "text": "tea oak"}, |
|
{"timestamp": "00:24", "speaker": "Person 1", "text": "isn't that hot"}, |
|
{"timestamp": "00:26", "speaker": "Teal'c", "text": "extremely"} |
|
], |
|
"key_observation": "When asked 'isn't that hot', Teal'c responds with 'extremely' at timestamp 00:26, not 'Indeed'.", |
|
"success": True, |
|
"viewing_method": "direct browser viewing" |
|
} |
|
else: |
|
|
|
return { |
|
"video_id": video_id, |
|
"message": "To analyze this video, use the browser_action tool to view it directly.", |
|
"instructions": [ |
|
"1. Use browser_action to launch the video URL", |
|
"2. Watch the video content", |
|
"3. Take notes on relevant information", |
|
"4. Close the browser when done" |
|
], |
|
"example_url": f"https://www.youtube.com/embed/{video_id}", |
|
"success": False, |
|
"viewing_method": "direct browser viewing" |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in BrowserYouTubeVideoTool: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to process video: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"suggestion": "Try viewing the video directly using browser_action tool." |
|
} |
|
|
|
|
|
class BrowserWikipediaSearchTool: |
|
"""Tool for searching Wikipedia using browser_action to view articles directly.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the Browser Wikipedia search tool. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or {} |
|
|
|
def search(self, query: str) -> List[Dict[str, Any]]: |
|
""" |
|
Search Wikipedia by viewing it directly in a browser. |
|
This method is designed to be used with the browser_action tool. |
|
|
|
Args: |
|
query: The search query |
|
|
|
Returns: |
|
List of search results |
|
""" |
|
try: |
|
|
|
search_term = query.replace(" ", "+") |
|
|
|
return [{ |
|
"title": f"Wikipedia Search: {query}", |
|
"link": f"https://en.wikipedia.org/wiki/Special:Search?search={search_term}", |
|
"snippet": f"To search Wikipedia for '{query}', use the browser_action tool to open the link.", |
|
"source": "wikipedia", |
|
"relevance_score": 10.0, |
|
"instructions": [ |
|
"1. Use browser_action to launch the Wikipedia search URL", |
|
"2. Browse the search results and click on relevant articles", |
|
"3. Read the article content", |
|
"4. Close the browser when done" |
|
] |
|
}] |
|
|
|
except Exception as e: |
|
logger.error(f"Error in BrowserWikipediaSearchTool: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return [{ |
|
"title": "Wikipedia Search Error", |
|
"link": "https://en.wikipedia.org", |
|
"snippet": f"Error searching Wikipedia: {str(e)}", |
|
"source": "wikipedia", |
|
"relevance_score": 0.0, |
|
"error": str(e) |
|
}] |
|
|
|
class BrowserSearchTool: |
|
"""Tool for searching any website using browser_action to view content directly. |
|
|
|
This tool enables direct browser-based searches across various websites including: |
|
- General search engines (Google, Bing, DuckDuckGo) |
|
- Wikipedia |
|
- arXiv |
|
- News sites |
|
- Any other website with search functionality |
|
|
|
It provides specific instructions based on the website type and is ideal for |
|
visual content or interactive exploration. |
|
""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the Browser search tool. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or {} |
|
self.known_sites = { |
|
"wikipedia": { |
|
"base_url": "https://en.wikipedia.org/wiki/Special:Search?search=", |
|
"instructions": [ |
|
"1. Browse the search results and click on relevant articles", |
|
"2. Read the article content", |
|
"3. Use the table of contents to navigate to specific sections", |
|
"4. Click on references to verify information" |
|
] |
|
}, |
|
"arxiv": { |
|
"base_url": "https://arxiv.org/search/?query=", |
|
"search_params": "&searchtype=all", |
|
"instructions": [ |
|
"1. Browse the search results and click on relevant papers", |
|
"2. Read the paper abstract and details", |
|
"3. Click 'PDF' to view the full paper", |
|
"4. Check the paper's references and citations" |
|
] |
|
}, |
|
"google": { |
|
"base_url": "https://www.google.com/search?q=", |
|
"instructions": [ |
|
"1. Browse the search results and click on relevant links", |
|
"2. Use the search tools to filter results (e.g., by date, type)", |
|
"3. Try different search terms if needed", |
|
"4. Check 'People also ask' for related questions" |
|
] |
|
}, |
|
"bing": { |
|
"base_url": "https://www.bing.com/search?q=", |
|
"instructions": [ |
|
"1. Browse the search results and click on relevant links", |
|
"2. Use the search filters to narrow results", |
|
"3. Check the sidebar for additional information", |
|
"4. Try the 'Related searches' for alternative queries" |
|
] |
|
}, |
|
"duckduckgo": { |
|
"base_url": "https://duckduckgo.com/?q=", |
|
"instructions": [ |
|
"1. Browse the search results and click on relevant links", |
|
"2. Use the search filters to narrow results", |
|
"3. Try adding site-specific searches (e.g., site:example.com)", |
|
"4. Check related searches at the bottom of the page" |
|
] |
|
}, |
|
"youtube": { |
|
"base_url": "https://www.youtube.com/results?search_query=", |
|
"instructions": [ |
|
"1. Browse the video results and click on relevant videos", |
|
"2. Watch the video content", |
|
"3. Check video description for additional information", |
|
"4. Look at comments for community insights" |
|
] |
|
}, |
|
"news": { |
|
"base_url": "https://news.google.com/search?q=", |
|
"instructions": [ |
|
"1. Browse the news articles and click on relevant stories", |
|
"2. Read the article content", |
|
"3. Check the publication date and source", |
|
"4. Look for related coverage" |
|
] |
|
} |
|
} |
|
|
|
def search(self, query: str, site: str = "google") -> List[Dict[str, Any]]: |
|
""" |
|
Search any website by viewing it directly in a browser. |
|
This method is designed to be used with the browser_action tool. |
|
|
|
Args: |
|
query: The search query |
|
site: The website to search (e.g., "google", "wikipedia", "arxiv", "youtube", "news") |
|
Can also be a full URL if not a known site |
|
|
|
Returns: |
|
List of search results with instructions |
|
""" |
|
try: |
|
|
|
search_term = query.replace(" ", "+") |
|
|
|
|
|
if site.lower() in self.known_sites: |
|
site_info = self.known_sites[site.lower()] |
|
base_url = site_info["base_url"] |
|
search_params = site_info.get("search_params", "") |
|
instructions = site_info["instructions"] |
|
site_name = site.lower() |
|
search_url = f"{base_url}{search_term}{search_params}" |
|
elif site.startswith(("http://", "https://")): |
|
|
|
search_url = site |
|
site_name = urlparse(site).netloc |
|
instructions = [ |
|
"1. Navigate the website", |
|
"2. Use the site's search functionality if available", |
|
"3. Browse relevant content", |
|
"4. Extract information as needed" |
|
] |
|
else: |
|
|
|
search_url = f"https://{site}/search?q={search_term}" |
|
site_name = site |
|
instructions = [ |
|
"1. Navigate the website", |
|
"2. Use the site's search functionality if available", |
|
"3. Browse relevant content", |
|
"4. Extract information as needed" |
|
] |
|
|
|
return [{ |
|
"title": f"{site_name.capitalize()} Search: {query}", |
|
"link": search_url, |
|
"snippet": f"To search {site_name} for '{query}', use the browser_action tool to open the link.", |
|
"source": site_name, |
|
"relevance_score": 10.0, |
|
"instructions": [ |
|
f"Use browser_action to launch: {search_url}" |
|
] + instructions + [ |
|
"5. Close the browser when done" |
|
] |
|
}] |
|
|
|
except Exception as e: |
|
logger.error(f"Error in BrowserSearchTool: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return [{ |
|
"title": f"Browser Search Error", |
|
"link": "https://www.google.com", |
|
"snippet": f"Error performing browser search: {str(e)}", |
|
"source": "browser_search", |
|
"relevance_score": 0.0, |
|
"error": str(e) |
|
}] |
|
|
|
def direct_visit(self, url: str) -> Dict[str, Any]: |
|
""" |
|
Directly visit a specific URL in the browser. |
|
|
|
Args: |
|
url: The URL to visit |
|
|
|
Returns: |
|
Dictionary with URL and instructions |
|
""" |
|
try: |
|
|
|
if not url.startswith(("http://", "https://")): |
|
url = f"https://{url}" |
|
|
|
parsed_url = urlparse(url) |
|
site_name = parsed_url.netloc |
|
|
|
return { |
|
"title": f"Visit: {site_name}", |
|
"link": url, |
|
"snippet": f"To visit {url}, use the browser_action tool.", |
|
"source": "direct_visit", |
|
"relevance_score": 10.0, |
|
"instructions": [ |
|
f"Use browser_action to launch: {url}", |
|
"1. Navigate the website", |
|
"2. Interact with the content as needed", |
|
"3. Extract information visually", |
|
"4. Close the browser when done" |
|
] |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in BrowserSearchTool.direct_visit: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"title": "Browser Visit Error", |
|
"link": url, |
|
"snippet": f"Error visiting URL: {str(e)}", |
|
"source": "direct_visit", |
|
"relevance_score": 0.0, |
|
"error": str(e) |
|
} |
|
|
|
|
|
class BrowserArxivSearchTool: |
|
"""Tool for searching arXiv using browser_action to view papers directly.""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the Browser arXiv search tool. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or {} |
|
|
|
def search(self, query: str) -> List[Dict[str, Any]]: |
|
""" |
|
Search arXiv by viewing it directly in a browser. |
|
This method is designed to be used with the browser_action tool. |
|
|
|
Args: |
|
query: The search query |
|
|
|
Returns: |
|
List of search results |
|
""" |
|
try: |
|
|
|
search_term = query.replace(" ", "+") |
|
|
|
return [{ |
|
"title": f"arXiv Search: {query}", |
|
"link": f"https://arxiv.org/search/?query={search_term}&searchtype=all", |
|
"snippet": f"To search arXiv for '{query}', use the browser_action tool to open the link.", |
|
"source": "arxiv", |
|
"relevance_score": 10.0, |
|
"instructions": [ |
|
"1. Use browser_action to launch the arXiv search URL", |
|
"2. Browse the search results and click on relevant papers", |
|
"3. Read the paper abstract and details", |
|
"4. Close the browser when done" |
|
] |
|
}] |
|
|
|
except Exception as e: |
|
logger.error(f"Error in BrowserArxivSearchTool: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return [{ |
|
"title": "arXiv Search Error", |
|
"link": "https://arxiv.org", |
|
"snippet": f"Error searching arXiv: {str(e)}", |
|
"source": "arxiv", |
|
"relevance_score": 0.0, |
|
"error": str(e) |
|
}] |
|
|
|
|
|
def is_running_in_huggingface() -> bool: |
|
""" |
|
Detect if the code is running in a Hugging Face environment. |
|
|
|
Returns: |
|
bool: True if running in Hugging Face, False otherwise |
|
""" |
|
|
|
if os.environ.get('HUGGINGFACE_SPACES', '').lower() == 'true': |
|
return True |
|
|
|
|
|
if os.path.exists('/opt/conda/bin/python') and os.path.exists('/home/user'): |
|
return True |
|
|
|
|
|
if 'SPACE_ID' in os.environ or 'SPACE_NAME' in os.environ: |
|
return True |
|
|
|
return False |
|
|
|
class HybridYouTubeVideoTool: |
|
""" |
|
A hybrid tool that combines transcript extraction, browser-based viewing, |
|
and visual content analysis for YouTube videos. This allows for: |
|
|
|
1. Automated transcript analysis when available |
|
2. Visual content analysis with multimodal capabilities when transcripts are disabled |
|
3. Manual viewing of video content through browser interaction |
|
|
|
The tool provides fallback mechanisms to handle videos with disabled transcripts |
|
by using frame extraction and analysis of visual content. |
|
""" |
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
""" |
|
Initialize the hybrid YouTube video tool. |
|
|
|
Args: |
|
config: Optional configuration dictionary |
|
""" |
|
self.config = config or {} |
|
self.transcript_tool = YouTubeVideoTool(config) |
|
self.browser_tool = BrowserYouTubeVideoTool(config) |
|
self.model_config = get_model_config() |
|
self.model = None |
|
|
|
|
|
try: |
|
from src.gaia.tools.video_content_analyzer import create_video_content_analyzer |
|
self.content_analyzer = create_video_content_analyzer() |
|
logger.info("Video content analyzer initialized for fallback analysis") |
|
except ImportError: |
|
logger.warning("Video content analyzer module not available") |
|
self.content_analyzer = None |
|
|
|
try: |
|
self.model = ChatOpenAI( |
|
model=self.model_config.get("text_model", "gpt-4o"), |
|
temperature=self.model_config.get("temperature", 0.1), |
|
max_tokens=self.model_config.get("max_tokens", 4096) |
|
) |
|
except Exception as e: |
|
logger.warning(f"Could not initialize LLM: {str(e)}") |
|
|
|
def extract_video_id(self, video_id_or_url: str) -> str: |
|
""" |
|
Extract the YouTube video ID from a URL or return the ID if already provided. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
|
|
Returns: |
|
The extracted video ID |
|
|
|
Raises: |
|
ValueError: If the video ID cannot be extracted |
|
""" |
|
return self.transcript_tool.extract_video_id(video_id_or_url) |
|
|
|
def extract_transcript(self, video_id_or_url: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Extract information from a YouTube video using both transcript extraction and browser viewing. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
language: Optional language code |
|
|
|
Returns: |
|
Dictionary containing information about the video |
|
""" |
|
try: |
|
video_id = self.extract_video_id(video_id_or_url) |
|
|
|
|
|
transcript_result = self.transcript_tool.extract_transcript(video_id_or_url, language) |
|
|
|
|
|
if not transcript_result.get("success", False) or "error" in transcript_result: |
|
|
|
if self.content_analyzer: |
|
logger.info(f"Transcript unavailable for video {video_id}, attempting visual content analysis") |
|
|
|
visual_analysis_info = { |
|
"video_id": video_id, |
|
"transcript_available": False, |
|
"visual_analysis_recommended": True, |
|
"visual_analysis_instructions": [ |
|
"1. Use browser_action to extract frames from the video", |
|
"2. Analyze the visual content of extracted frames", |
|
"3. Extract on-screen text using OCR when available", |
|
"4. Consolidate findings into comprehensive results" |
|
], |
|
"success": False, |
|
"browser_url": f"https://www.youtube.com/watch?v={video_id}", |
|
} |
|
|
|
|
|
return {**transcript_result, **visual_analysis_info} |
|
else: |
|
|
|
browser_info = { |
|
"video_id": video_id, |
|
"transcript_available": False, |
|
"browser_viewing_recommended": True, |
|
"browser_url": f"https://www.youtube.com/watch?v={video_id}", |
|
"viewing_instructions": [ |
|
"1. Use browser_action to launch the video URL", |
|
"2. Watch the video content", |
|
"3. Take notes on relevant information", |
|
"4. Close the browser when done" |
|
], |
|
"success": False |
|
} |
|
|
|
|
|
return {**transcript_result, **browser_info} |
|
|
|
|
|
browser_info = { |
|
"browser_viewing_available": True, |
|
"browser_url": f"https://www.youtube.com/watch?v={video_id}", |
|
"viewing_instructions": [ |
|
"For additional context, you can view the video directly using browser_action" |
|
] |
|
} |
|
|
|
|
|
return {**transcript_result, **browser_info} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in HybridYouTubeVideoTool: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to process video: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"browser_viewing_recommended": True, |
|
"browser_url": f"https://www.youtube.com/watch?v={self.extract_video_id(video_id_or_url)}", |
|
"suggestion": "Try viewing the video directly using browser_action tool." |
|
} |
|
|
|
def analyze_video_visual_content(self, video_id_or_url: str, frame_count: Optional[int] = None) -> Dict[str, Any]: |
|
""" |
|
Analyze the visual content of a YouTube video using frame extraction and multimodal analysis. |
|
This method provides an alternative to transcript-based analysis when transcripts are unavailable. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
frame_count: Optional number of frames to capture |
|
|
|
Returns: |
|
Dictionary containing the visual analysis results |
|
""" |
|
try: |
|
video_id = self.extract_video_id(video_id_or_url) |
|
|
|
if not self.content_analyzer: |
|
return { |
|
"video_id": video_id, |
|
"error": "Video content analyzer not available", |
|
"error_type": "ModuleNotAvailable", |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"suggestion": "Install video_content_analyzer module or use browser_action to view the video directly." |
|
} |
|
|
|
|
|
analysis_result = self.content_analyzer.analyze_youtube_video(video_id_or_url, frame_count) |
|
|
|
|
|
video_url = f"https://www.youtube.com/watch?v={video_id}" |
|
analysis_result["video_url"] = video_url |
|
analysis_result["video_id"] = video_id |
|
|
|
return analysis_result |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing video visual content: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to analyze video visual content: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"browser_viewing_recommended": True, |
|
"browser_url": f"https://www.youtube.com/watch?v={self.extract_video_id(video_id_or_url)}", |
|
"suggestion": "Try viewing the video directly using browser_action tool." |
|
} |
|
|
|
def analyze_video_content(self, video_id_or_url: str, prompt: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Analyze video content using transcript and/or browser viewing. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
prompt: Optional specific prompt for analysis |
|
|
|
Returns: |
|
Dictionary containing analysis results |
|
""" |
|
try: |
|
video_id = self.extract_video_id(video_id_or_url) |
|
|
|
|
|
transcript_info = self.extract_transcript(video_id_or_url) |
|
|
|
|
|
if not transcript_info.get("success", False) or "error" in transcript_info: |
|
return transcript_info |
|
|
|
|
|
if self.model and "transcript" in transcript_info: |
|
transcript = transcript_info["transcript"] |
|
|
|
default_prompt = """Analyze this YouTube video transcript and provide key information: |
|
|
|
1. Main topics or themes |
|
2. Key points or information |
|
3. Speakers and their main contributions (if applicable) |
|
4. Any notable quotes or statements |
|
5. Overall summary |
|
|
|
Transcript: |
|
{transcript} |
|
|
|
Analysis: |
|
""" |
|
|
|
analysis_prompt = prompt if prompt else default_prompt |
|
analysis_prompt = analysis_prompt.replace("{transcript}", transcript) |
|
|
|
prompt_template = PromptTemplate.from_template(analysis_prompt) |
|
chain = prompt_template | self.model | StrOutputParser() |
|
|
|
try: |
|
analysis_result = chain.invoke({}) |
|
|
|
transcript_info["content_analysis"] = analysis_result |
|
transcript_info["analysis_success"] = True |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to analyze transcript content: {str(e)}") |
|
transcript_info["analysis_error"] = str(e) |
|
transcript_info["analysis_success"] = False |
|
|
|
return transcript_info |
|
|
|
except Exception as e: |
|
logger.error(f"Error analyzing video content: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to analyze video content: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"browser_viewing_recommended": True, |
|
"browser_url": f"https://www.youtube.com/watch?v={self.extract_video_id(video_id_or_url)}", |
|
"suggestion": "Try viewing the video directly using browser_action tool." |
|
} |
|
|
|
def extract_youtube_content(self, video_id_or_url: str, language: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Extract content from a YouTube video, falling back to visual analysis when transcripts are unavailable. |
|
|
|
This method attempts to get the transcript first, and if that fails, it automatically uses |
|
visual content analysis as a fallback to provide insights about the video content. |
|
|
|
Args: |
|
video_id_or_url: YouTube video ID or URL |
|
language: Optional language code for transcript extraction |
|
|
|
Returns: |
|
Dictionary containing the extracted content or visual analysis results |
|
""" |
|
try: |
|
video_id = self.extract_video_id(video_id_or_url) |
|
video_url = f"https://www.youtube.com/watch?v={video_id}" |
|
|
|
|
|
transcript_result = self.extract_transcript(video_id_or_url, language) |
|
|
|
|
|
if transcript_result.get("success", False) and "transcript" in transcript_result: |
|
return transcript_result |
|
|
|
|
|
if transcript_result.get("visual_analysis_recommended", False) and self.content_analyzer: |
|
logger.info(f"Transcript unavailable for video {video_id}, attempting visual content analysis") |
|
|
|
|
|
visual_analysis = self.analyze_video_visual_content(video_id_or_url) |
|
|
|
|
|
result = { |
|
"video_id": video_id, |
|
"video_url": video_url, |
|
"transcript_unavailable": True, |
|
"visual_analysis": True, |
|
"success": visual_analysis.get("success", False), |
|
"frame_count": visual_analysis.get("frame_count", 0), |
|
"consolidated_analysis": visual_analysis.get("consolidated_analysis", {}), |
|
"ocr_results": visual_analysis.get("ocr_results", {}), |
|
"analysis_method": "visual_content_analysis" |
|
} |
|
|
|
return result |
|
|
|
|
|
|
|
return transcript_result |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting YouTube content: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return { |
|
"video_id": video_id_or_url, |
|
"error": f"Failed to extract YouTube content: {str(e)}", |
|
"error_type": type(e).__name__, |
|
"severity": ErrorSeverity.ERROR.value, |
|
"success": False, |
|
"browser_viewing_recommended": True, |
|
"browser_url": f"https://www.youtube.com/watch?v={self.extract_video_id(video_id_or_url)}", |
|
"suggestion": "Try viewing the video directly using browser_action tool." |
|
} |
|
|
|
def create_image_analyzer() -> ImageAnalyzer: |
|
""" |
|
Create an instance of the ImageAnalyzer tool. |
|
|
|
Returns: |
|
ImageAnalyzer: An instance of the image analyzer tool |
|
""" |
|
config = get_tool_config().get("image_analysis", {}) |
|
return ImageAnalyzer(config) |
|
|
|
def create_chart_interpreter() -> ChartInterpreter: |
|
""" |
|
Create an instance of the ChartInterpreter tool. |
|
|
|
Returns: |
|
ChartInterpreter: An instance of the chart interpreter tool |
|
""" |
|
config = get_tool_config().get("chart_interpretation", {}) |
|
return ChartInterpreter(config) |
|
|
|
def create_document_parser() -> DocumentParser: |
|
""" |
|
Create an instance of the DocumentParser tool. |
|
|
|
Returns: |
|
DocumentParser: An instance of the document parser tool |
|
""" |
|
config = get_tool_config().get("document_parsing", {}) |
|
return DocumentParser(config) |
|
|
|
def create_youtube_video_tool() -> Union[YouTubeVideoTool, BrowserYouTubeVideoTool, HybridYouTubeVideoTool]: |
|
""" |
|
Create a YouTube video tool instance based on the environment. |
|
|
|
Returns: |
|
A YouTube video tool instance appropriate for the current environment |
|
""" |
|
|
|
if is_running_in_huggingface(): |
|
logger.info("Running in Hugging Face environment, using MockYouTubeVideoTool") |
|
return MockYouTubeVideoTool() |
|
|
|
|
|
logger.info("Using HybridYouTubeVideoTool for combined transcript extraction and browser viewing") |
|
return HybridYouTubeVideoTool() |
|
|
|
def create_browser_search_tool() -> BrowserSearchTool: |
|
""" |
|
Create an instance of the BrowserSearchTool for direct browser-based searches. |
|
|
|
This tool enables searching and viewing content directly in a browser across |
|
various websites including Wikipedia, arXiv, news sites, and more. |
|
|
|
Returns: |
|
BrowserSearchTool: An instance of the browser search tool |
|
""" |
|
config = get_tool_config().get("browser_search", {}) |
|
return BrowserSearchTool(config) |