import os import subprocess import mimetypes from google.cloud import storage from typing import Literal import requests import re from markdownify import markdownify from requests.exceptions import RequestException from langchain_core.tools import convert_runnable_to_tool from smolagents.utils import truncate_content from langchain_core.runnables import RunnableLambda from pytubefix import YouTube from pytubefix.cli import on_progress from langchain_core.tools import tool from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_google_vertexai import ChatVertexAI from langchain.agents import Tool from langchain_experimental.tools import PythonREPLTool from langchain_community.tools import WikipediaQueryRun from langchain_community.utilities import WikipediaAPIWrapper from langchain_community.utilities import GoogleSerperAPIWrapper from system_prompts import SYSTEM_PROMPT_VIDEO, SYSTEM_PROMPT_AUDIO, SYSTEM_PROMPT_IMAGE llm_flash = ChatVertexAI(model="gemini-2.5-flash") # Extensiones que queremos “normalizar” (por si el sistema no las trae de serie) _EXTRA_MIME = { ".mp3": "audio/mpeg", # RFC oficial :contentReference[oaicite:2]{index=2} ".mp4": "video/mp4", # MIME estándar :contentReference[oaicite:3]{index=3} } mimetypes.add_type("audio/mpeg", ".mp3") mimetypes.add_type("video/mp4", ".mp4") def upload_file_to_bucket( local_path: str, bucket_name: str = os.getenv("GCP_BUCKET_NAME"), ) -> str: """ Sube cualquier fichero a Cloud Storage y devuelve su URI gs://. • Detecta automáticamente el MIME según la extensión. • Admite sobrescribir `object_name` para cambiar la ruta en el bucket. • Aplica precondición `if_generation_match=0` (subida segura: falla si ya existe). """ if not os.path.isfile(local_path): raise FileNotFoundError(f"No existe: {local_path}") # ---------- (1) Resolver nombre y extensión ---------- _, ext = os.path.splitext(local_path) # :contentReference[oaicite:4]{index=4} ext = ext.lower() object_name = f"data{ext}" # ---------- (2) Resolver MIME ---------- file_type, _ = mimetypes.guess_type(local_path) # intenta inferir MIME if not file_type and ext in _EXTRA_MIME: # fallback manual file_type = _EXTRA_MIME[ext] if not file_type: raise ValueError(f"No se pudo inferir MIME para «{ext}»") # ---------- (3) Subir a GCS ---------- client = storage.Client() bucket = client.bucket(bucket_name) blob = bucket.blob(object_name) blob.upload_from_filename( local_path, content_type=file_type, ) gs_uri = f"gs://{bucket_name}/{object_name}" print(f"✅ Subido → {gs_uri} ({file_type})") return gs_uri def download_youtube_video(url: str, mode: Literal["video", "audio"]) -> str: """ Downloads a YouTube video or audio file based on the specified mode. Args: url (str): The URL of the YouTube video to download. mode (Literal["audio", "video"]): The download mode. Use "audio" to download the audio track as an .mp3 file, or "video" to download the full video as an .mp4 file. Returns: Tuple[str, str]: A two-element tuple *(local_path, gcp_path)* where * **local_path** is the absolute path of the file saved on disk. * **gcp_path** is the `gs://bucket/object` URI (or signed HTTPS URL) of the file uploaded to Google Cloud Storage. Raises: ValueError: If the mode is not "audio" or "video". Exception: If an error occurs during the download process. """ if mode not in ["audio", "video"]: raise ValueError("'Mode' argument is not valid! It should be audio or video.") data_folder = "data/" yt = YouTube(url, on_progress_callback=on_progress) if mode == "video": ys = yt.streams.get_highest_resolution() tmp_path = ys.download(output_path=data_folder) base, _ = os.path.splitext(tmp_path) mp4_path = f"{base}.mp4" mp4_files = [ f for f in os.listdir(data_folder) if f.lower().endswith(".mp4") ] path_filename = mp4_path uri_path = upload_file_to_bucket(path_filename) elif mode == "audio": audio = yt.streams.filter(only_audio=True).first() # best available audio tmp_path = audio.download(output_path=data_folder) # e.g. .../myvideo.m4a base, _ = os.path.splitext(tmp_path) mp3_path = f"{base}.mp3" # Convert with FFmpeg subprocess.run( [ "ffmpeg", "-y", # overwrite if exists "-i", tmp_path, # input "-vn", # no video "-ar", "44100", # sample-rate "-ab", "192k", # audio bitrate "-loglevel", "error", # silence ffmpeg output mp3_path, ], check=True, ) os.remove(tmp_path) # keep filesystem limpio (opcional) path_filename = os.path.abspath(mp3_path) uri_path = upload_file_to_bucket(path_filename) return path_filename, uri_path @tool def query_video(gcp_uri: str, query: str) -> str: """Analyzes a video file from a Google Cloud Storage (GCS) URI to answer a specific question about its visual content. This tool is the correct choice for any task that requires understanding or describing events, objects, or actions within a video. The video must be accessible via a GCS URI. Args: gcp_uri (str): The full Google Cloud Storage URI for the video file. It MUST be a .mp4 file and the URI MUST start with 'gs://'. query (str): A clear, specific question about the video's content. For example: 'What is the maximum number of birds on screen at the same time?' or 'What color is the car that appears at the 15-second mark?'. Returns: str: A string containing the answer to the query based on the video analysis. """ # Tu código de validación y ejecución de la cadena _, file_extension = os.path.splitext(gcp_uri) if file_extension.lower() != '.mp4': return "Error: The video cannot be processed because it is not a .mp4 file. The gcp_uri must point to a .mp4 file." # He notado que en tu `chain.invoke` usas "video_uri" pero el ChatPromptTemplate usa "{video_uri}". # Sin embargo, tu función no tiene un parámetro `video_uri`. Debería ser `gcp_uri`. Lo corrijo aquí. chat_prompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT_VIDEO), ("human", [ "{query}", { "type": "media", "file_uri": "{video_uri}", # <-- Esta clave debe coincidir con la de invoke "mime_type": "video/mp4" } ]), ]) # Suponiendo que `llm_flash` está definido chain = chat_prompt | llm_flash | StrOutputParser() # La clave en invoke debe coincidir con la del prompt template: "video_uri" result = chain.invoke({ "query": query, "video_uri": gcp_uri # <-- Usar la clave correcta aquí }) return result @tool def query_audio(gcp_uri: str, query: str) -> str: """Analyzes an audio file from a Google Cloud Storage (GCS) URI to answer a specific question about its content. This tool is ideal for tasks like transcription, speaker identification, sound analysis, or answering questions about speech or music within an audio file. Args: gcp_uri (str): The full Google Cloud Storage URI for the audio file. It MUST be a .mp3 file and the URI MUST start with 'gs://'. query (str): A clear, specific question about the audio's content. For example: 'Transcribe the speech in this audio,' 'Is the speaker male or female?' or 'What song is playing in the background?'. Returns: str: A string containing the answer to the query based on the audio analysis. """ # Código de validación y ejecución _, file_extension = os.path.splitext(gcp_uri) if file_extension.lower() != '.mp3': return "Error: The audio cannot be processed because it is not a .mp3 file. The gcp_uri must point to a .mp3 file." chat_prompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT_AUDIO), ("human", [ "{query}", { "type": "media", "file_uri": "{audio_uri}", "mime_type": "audio/mpeg" } ]), ]) # Suponiendo que `llm_flash` está definido chain = chat_prompt | llm_flash | StrOutputParser() result = chain.invoke({ "query": query, "audio_uri": gcp_uri }) return result @tool def query_image(gcp_uri: str, query: str) -> str: """Analyzes an image file from a Google Cloud Storage (GCS) URI to answer a question about its visual content. This tool is ideal for tasks like reading text from an image (OCR), identifying objects, describing a scene, or answering any question based on the visual information in a static image. Args: gcp_uri (str): The full Google Cloud Storage URI for the image file. It MUST be a .png file and the URI MUST start with 'gs://'. query (str): A clear, specific question about the image's content. For example: 'What text is written on the street sign?', 'How many people are in this picture?', or 'Describe the main activity in this image.' Returns: str: A string containing the answer to the query based on the image's content. """ # Código de validación y ejecución _, file_extension = os.path.splitext(gcp_uri) if file_extension.lower() != '.png': return "Error: The image cannot be processed because it is not a .png file. The gcp_uri must point to a .png file." # Corregido: 'hat_prompt' a 'chat_prompt' chat_prompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT_IMAGE), ("human", [ "{query}", { "type": "image_url", "image_url": {"url": "{gcp_uri}"} # Formato estándar para image_url } ]), ]) # Suponiendo que `llm_flash` está definido chain = chat_prompt | llm_flash | StrOutputParser() result = chain.invoke({ "query": query, "gcp_uri": gcp_uri }) return result def visit_webpage(url: str) -> str: try: # Send a GET request to the URL with a 20-second timeout response = requests.get(url, timeout=20) response.raise_for_status() # Raise an exception for bad status codes # Convert the HTML content to Markdown markdown_content = markdownify(response.text).strip() # Remove multiple line breaks markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) return truncate_content(markdown_content, 10000) except requests.exceptions.Timeout: return "The request timed out. Please try again later or check the URL." except RequestException as e: return f"Error fetching the webpage: {str(e)}" except Exception as e: return f"An unexpected error occurred: {str(e)}" visit_webpage_with_retry = RunnableLambda(visit_webpage).with_retry( wait_exponential_jitter=True, stop_after_attempt=3, ) visit_webpage_tool = convert_runnable_to_tool( visit_webpage_with_retry, name="visit_webpage", description=( "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." ), arg_types={"url": "str"}, ) python_tool = PythonREPLTool() search = GoogleSerperAPIWrapper() search_tool = Tool(name="web_search", func=search.run, description="useful for when you need to ask with search on the internet") wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) wikipedia_tool = Tool(name="wikipedia_search", func=wikipedia.run, description="useful for when you need to ask with search on Wikipedia") def get_tools(): visit_webpage_with_retry = RunnableLambda(visit_webpage).with_retry( wait_exponential_jitter=True, stop_after_attempt=3, ) visit_webpage_tool = convert_runnable_to_tool( visit_webpage_with_retry, name="visit_webpage", description=( "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." ), arg_types={"url": "str"}, ) python_tool = PythonREPLTool() search = GoogleSerperAPIWrapper() search_tool = Tool(name="web_search", func=search.run, description="useful for when you need to ask with search on the internet") wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) wikipedia_tool = Tool(name="wikipedia_search", func=wikipedia.run, description="useful for when you need to ask with search on Wikipedia") tools = [python_tool, search_tool, wikipedia_tool, visit_webpage_tool, query_video, query_image, query_audio] return tools