Spaces:
Sleeping
Sleeping
# pylint: disable=no-member | |
import base64 | |
import gc | |
import mimetypes | |
import os | |
import re | |
import tempfile | |
import time | |
import uuid | |
from datetime import timedelta | |
from typing import Dict, List, Optional, TypedDict, Union | |
from urllib.parse import urlparse | |
import cv2 | |
import imageio | |
import pandas as pd | |
import pytesseract | |
import requests | |
import torch | |
import whisper | |
import yt_dlp | |
from bs4 import BeautifulSoup, Tag | |
from dotenv import load_dotenv | |
from duckduckgo_search import DDGS | |
from langchain_core.messages import HumanMessage | |
from langchain_core.tools import tool | |
from langchain_ollama import ChatOllama | |
from PIL import Image | |
from playwright.sync_api import sync_playwright | |
from youtube_transcript_api import ( | |
NoTranscriptFound, | |
TranscriptsDisabled, | |
YouTubeTranscriptApi, | |
) | |
load_dotenv() | |
base_url = os.getenv("OLLAMA_BASE_URL") | |
model_vision = ChatOllama( | |
model="gemma3:latest", | |
base_url=base_url, | |
) | |
model_text = ChatOllama(model="gemma3:latest", base_url=base_url) | |
def use_vision_model(question: str) -> str: | |
""" | |
A multimodal reasoning model that combines image and text input to answer | |
questions using the image. | |
""" | |
# Extract image paths | |
image_paths = re.findall(r"[\w\-/\.]+\.(?:png|jpg|jpeg|webp)", question) | |
image_paths = [p for p in image_paths if os.path.exists(p)] | |
if not image_paths: | |
return "No valid image file found in the question." | |
image_path = image_paths[0] | |
# Preprocess the image using OpenCV | |
image = cv2.imread(image_path) | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
gray = cv2.convertScaleAbs(gray, alpha=1.2, beta=20) | |
gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
edges = cv2.Canny(gray, 50, 150, apertureSize=3) | |
# Create a temporary file for the processed image | |
with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as tmp_file: | |
temp_image_path = tmp_file.name | |
cv2.imwrite(temp_image_path, edges) | |
# Encode the temp image | |
mime_type, _ = mimetypes.guess_type(temp_image_path) | |
mime_type = mime_type or "image/png" | |
with open(temp_image_path, "rb") as f: | |
encoded = base64.b64encode(f.read()).decode("utf-8") | |
# Prepare the prompt and image for the model | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": question}, | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:{mime_type};base64,{encoded}"}, | |
}, | |
], | |
} | |
] | |
# Invoke the vision model | |
response = model_vision.invoke(messages) | |
# Clean up | |
del messages, encoded, image_path | |
gc.collect() | |
torch.cuda.empty_cache() | |
return str(response.content) if hasattr(response, "content") else str(response) | |
# YouTube Video Review Tool | |
def review_youtube_video(url: str) -> str: | |
"""Reviews a YouTube video and answers a specific question about that video. | |
Args: | |
url (str): the URL to the YouTube video. | |
question (str): The question you are asking about the video. | |
Returns: | |
str: The answer to the question | |
""" | |
# Extract video ID from URL (assuming it is in the format https://youtube.com/watch?v=VIDEO_ID) | |
video_id = url.split("v=")[1] | |
transcript_url = ( | |
f"https://www.youtube.com/api/timedtext?v={video_id}" # Getting transcript data | |
) | |
response = requests.get(transcript_url, timeout=200) | |
transcript = response.text # This is the transcript (XML or SRT format) | |
# Prepare the content (just the transcript, no question needed) | |
transcript_content = f"Here is the transcript of the video: {transcript}" | |
# Return the transcript content so the main LLM can handle question generation | |
return transcript_content | |
# YouTube Frames to Images Tool | |
def video_frames_to_images( | |
url: str, | |
folder_name: str, | |
sample_interval_seconds: int = 5, | |
) -> List[str]: | |
"""Extracts frames from a video at specified intervals and saves them as images. | |
Args: | |
url (str): the URL to the video. | |
folder_name (str): the name of the folder to save the images to. | |
sample_interval_seconds (int): the interval between frames to sample. | |
Returns: | |
List[str]: A list of paths to the saved image files. | |
""" | |
# Create a subdirectory for the frames | |
frames_dir = os.path.join(folder_name, "frames") | |
os.makedirs(frames_dir, exist_ok=True) | |
ydl_opts = { | |
"format": "bestvideo[height<=1080]+bestaudio/best[height<=1080]/best", | |
"outtmpl": os.path.join(folder_name, "video.%(ext)s"), | |
"quiet": True, | |
"noplaylist": True, | |
"merge_output_format": "mp4", | |
"force_ipv4": True, | |
} | |
info_extracted = [] | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
info_extracted.append(info) | |
video_path = next( | |
( | |
os.path.join(folder_name, f) | |
for f in os.listdir(folder_name) | |
if f.endswith(".mp4") | |
), | |
None, | |
) | |
if not video_path: | |
raise RuntimeError("Failed to download video as mp4") | |
reader = imageio.get_reader(video_path) | |
metadata = reader.get_meta_data() | |
fps = metadata.get("fps") | |
if fps is None: | |
reader.close() | |
raise RuntimeError("Unable to determine FPS from video metadata") | |
frame_interval = int(fps * sample_interval_seconds) | |
num_frames = reader.get_length() | |
image_paths: List[str] = [] | |
for idx in range(num_frames): | |
if idx % frame_interval == 0: | |
# Save frame as image | |
frame = reader.get_data(idx) | |
image_path = os.path.join(frames_dir, f"frame_{idx:06d}.jpg") | |
imageio.imwrite(image_path, frame) | |
image_paths.append(image_path) | |
reader.close() | |
return image_paths | |
# File Reading Tool | |
def read_file(filepath: str) -> str: | |
"""Reads the content of a text file. | |
Args: | |
filepath (str): the path to the file to read. | |
Returns: | |
str: The content of the file. | |
""" | |
try: | |
with open(filepath, "r", encoding="utf-8") as file: | |
content = file.read() | |
return content | |
except FileNotFoundError: | |
return f"File not found: {filepath}" | |
except IOError as e: | |
return f"Error reading file: {str(e)}" | |
# File Download Tool | |
def download_file_from_url(url: str, directory: str) -> Dict[str, Union[str, None]]: | |
"""Downloads a file from a URL and saves it to a directory. | |
Args: | |
url (str): the URL to download the file from. | |
directory (str): the directory to save the file to. | |
Returns: | |
Dict[str, Union[str, None]]: A dictionary containing the file type and path. | |
""" | |
response = requests.get(url, stream=True, timeout=10) | |
response.raise_for_status() | |
content_type = response.headers.get("content-type", "").lower() | |
# Try to get filename from headers | |
filename = None | |
cd = response.headers.get("content-disposition", "") | |
match = re.search(r"filename\*=UTF-8\'\'(.+)", cd) or re.search( | |
r'filename="?([^"]+)"?', cd | |
) | |
if match: | |
filename = match.group(1) | |
# If not in headers, try URL | |
if not filename: | |
filename = os.path.basename(url.split("?")[0]) | |
# Fallback to generated filename | |
if not filename: | |
extension = { | |
"image/jpeg": ".jpg", | |
"image/png": ".png", | |
"image/gif": ".gif", | |
"audio/wav": ".wav", | |
"audio/mpeg": ".mp3", | |
"video/mp4": ".mp4", | |
"text/plain": ".txt", | |
"text/csv": ".csv", | |
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", | |
"application/vnd.ms-excel": ".xls", | |
"application/octet-stream": ".bin", | |
}.get(content_type, ".bin") | |
filename = f"downloaded_{uuid.uuid4().hex[:8]}{extension}" | |
os.makedirs(directory, exist_ok=True) | |
file_path = os.path.join(directory, filename) | |
with open(file_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
# shutil.copy(file_path, os.getcwd()) | |
return {"type": content_type, "path": file_path} | |
# Text Extraction from Image Tool | |
def extract_text_from_image(image_path: str) -> str: | |
"""Extracts text from an image using OCR. | |
Args: | |
image_path (str): the path to the image to extract text from. | |
Returns: | |
str: The text extracted from the image. | |
""" | |
image = Image.open(image_path) | |
text = pytesseract.image_to_string(image) | |
return f"Extracted text from image:\n\n{text}" | |
# CSV Analysis Tool | |
def analyze_csv_file(file_path: str, query: str) -> str: | |
"""Analyzes a CSV file and answers questions about its contents using an Ollama model. | |
Args: | |
file_path (str): The path to the CSV file to analyze. | |
query (str): The question to answer about the CSV file. | |
Returns: | |
str: The result of the analysis. | |
""" | |
# Load the CSV file | |
df = pd.read_csv(file_path) | |
df_str = df.to_string(index=False) | |
# Compose the prompt | |
prompt = f""" | |
You are a data analyst. Analyze the following CSV data and answer the question provided. | |
CSV Dimensions: {df.shape[0]} rows × {df.shape[1]} columns | |
CSV Data: | |
{df_str} | |
Please provide: | |
1. A summary of the data structure and content | |
2. Key patterns and insights | |
3. Potential data quality issues | |
4. Suggestions for analysis | |
User Query: | |
{query} | |
Format your response in markdown with sections and bullet points. | |
""" | |
model = model_text | |
# Call the model | |
response = model.invoke([{"type": "text", "text": prompt}]) | |
del df | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Return the result | |
if hasattr(response, "content") and isinstance(response.content, str): | |
return response.content | |
return str(response) | |
# Excel Analysis Tool | |
def analyze_excel_file(file_path: str) -> str: | |
"""Analyzes an Excel file and answers questions about its contents using Ollama backed LLM | |
Args: | |
file_path (str): the path to the Excel file to analyze. | |
question (str): the question to answer about the Excel file. | |
Returns: | |
str: The result of the analysis. | |
""" | |
llm = model_text | |
# Read all sheets from the Excel file | |
excel_file = pd.ExcelFile(file_path) | |
sheet_names = excel_file.sheet_names | |
result = f"Excel file loaded with {len(sheet_names)} sheets: {', '.join(sheet_names)}\n\n" | |
for sheet_name in sheet_names: | |
df = pd.read_excel(file_path, sheet_name=sheet_name) | |
df_str = df.to_string() | |
# Build the prompt | |
prompt = f"""Analyze the following Excel sheet data and answer the user's query. | |
Sheet Name: {sheet_name} | |
Dimensions: {len(df)} rows × {len(df.columns)} columns | |
Data: | |
{df_str} | |
Please provide: | |
1. A summary of the data structure and content | |
2. Key patterns and insights | |
3. Potential data quality issues | |
4. Suggestions for analysis | |
Format the response clearly using headings and bullet points.""" | |
# Call the LLM with the prompt | |
response = llm.invoke([HumanMessage(content=prompt)]) | |
result += f"=== Sheet: {sheet_name} ===\n" | |
result += str(response.content) + "\n" | |
result += "=" * 50 + "\n\n" | |
del df | |
gc.collect() | |
excel_file.close() | |
torch.cuda.empty_cache() | |
return result | |
# Audio Transcription Tool | |
def transcribe_audio(audio_file_path: str) -> str: | |
"""Transcribes an audio file using Whisper's audio capabilities. | |
Args: | |
audio_file_path (str): The path to the audio file to transcribe. | |
mime_type (str): The MIME type of the audio file. | |
Returns: | |
str: The transcript of the audio file. | |
Raises: | |
ValueError: If the MIME type is not supported. | |
""" | |
model = whisper.load_model("base") | |
result = model.transcribe(audio_file_path) | |
assert isinstance(result["text"], str) | |
del model | |
torch.cuda.empty_cache() | |
gc.collect() | |
return result["text"] | |
def _extract_video_id(url: str) -> Optional[str]: | |
"""Extract video ID from YouTube URL. | |
Args: | |
url (str): the URL to the YouTube video. | |
Returns: | |
str: The video ID of the YouTube video. | |
""" | |
patterns = [ | |
r"(?:youtube\.com\/watch\?v=|youtube\.com\/embed\/|youtu\.be\/)([^&\n?#]+)", | |
r"(?:youtube\.com\/v\/|youtube\.com\/e\/|youtube\.com\/user\/[^\/]+\/|youtube\.com\/[^\/]+\/|youtube\.com\/embed\/|youtu\.be\/)([^&\n?#]+)", | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return None | |
def transcribe_youtube(url: str) -> str: | |
""" | |
Transcribes a YouTube video using YouTube Transcript API or ChatOllama with Whisper as fallback. | |
This function first tries to fetch the transcript of a YouTube video using the YouTube Transcript API. | |
If the transcript is unavailable (e.g., due to captions being disabled), it falls back to using | |
ChatOllama integrated with Whisper to transcribe the audio. | |
Args: | |
url (str): The URL to the YouTube video. | |
Returns: | |
str: The transcript of the YouTube video, or an error message if transcription fails. | |
""" | |
try: | |
# Try using YouTube Transcript API | |
video_id = _extract_video_id(url) | |
transcript = "" | |
transcript_chunks = YouTubeTranscriptApi.get_transcript( | |
video_id, languages=["en"] | |
) | |
for chunk in transcript_chunks: | |
timestamp = str(timedelta(seconds=int(chunk["start"]))) | |
transcript += f"[{timestamp}] {chunk['text']}\n" | |
# Return API transcript if available | |
if transcript.strip(): | |
return transcript | |
except (TranscriptsDisabled, NoTranscriptFound, Exception) as exec: | |
try: | |
with tempfile.TemporaryDirectory() as tmpdir: | |
# Download audio from YouTube | |
ydl_opts = { | |
"format": "bestaudio/best", | |
"outtmpl": os.path.join(tmpdir, "audio.%(ext)s"), | |
"quiet": True, | |
"noplaylist": True, | |
"postprocessors": [ | |
{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "wav", | |
"preferredquality": "192", | |
} | |
], | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
if info is not None: | |
title = info.get("title", "Unknown Title") # Type:None | |
duration = info.get("duration", 0) # in seconds | |
uploader = info.get("uploader", "Unknown Uploader") | |
else: | |
title = "Unknown Title" | |
duration = 0 | |
uploader = "Unknown Uploader" | |
audio_path = next( | |
( | |
os.path.join(tmpdir, f) | |
for f in os.listdir(tmpdir) | |
if f.endswith(".wav") | |
), | |
None, | |
) | |
if not audio_path: | |
raise RuntimeError("Failed to download or convert audio") from exec | |
# Use Whisper for initial transcription | |
whisper_model = whisper.load_model("base") | |
transcription = whisper_model.transcribe(audio_path, verbose=False) | |
raw_transcript = transcription["text"] | |
del whisper_model | |
gc.collect() | |
# Use ChatOllama to format transcript with timestamps | |
ollama = model_text | |
prompt = ( | |
"Please format the following raw transcript into a structured format with timestamps " | |
f"The following transcript was generated from a YouTube video titled '{title}' " | |
f"uploaded by {uploader}. The total video duration is approximately {duration}.\n\n" | |
"Use the video’s length to help guide timestamp estimation.\n\n" | |
"(e.g., [00:00:00] text). Estimate timestamps based on the natural flow of the text." | |
f"Raw transcript:\n{raw_transcript}" | |
) | |
response = ollama.invoke([HumanMessage(content=prompt)]) | |
formatted_transcript = str( | |
response.content | |
) # Ensure response is a string | |
torch.cuda.empty_cache() | |
return formatted_transcript | |
except Exception as fallback_exc: | |
raise RuntimeError("Fallback Transcription failed") from fallback_exc | |
return "Transcription failed unexpectedly." | |
def website_scrape(url: str) -> str: | |
"""scrapes a website and returns the text. | |
args: | |
url (str): the url to the website to scrape. | |
returns: | |
str: the text of the website. | |
""" | |
try: | |
parsed_url = urlparse(url) | |
if not parsed_url.scheme or not parsed_url.netloc: | |
raise ValueError( | |
f"Invalid URL: '{url}'. Call `duckduckgo_search` first to get a valid URL." | |
) | |
with sync_playwright() as p: | |
browser = p.chromium.launch(headless=True) | |
page = browser.new_page() | |
page.goto(url, wait_until="networkidle", timeout=60000) | |
page.wait_for_load_state("domcontentloaded") | |
html_content = page.content() | |
browser.close() | |
soup = BeautifulSoup(html_content, "html.parser") | |
relevant_text = "" | |
for header in soup.find_all(["h2", "h3"]): | |
heading_text = header.get_text().strip().lower() | |
if "discography" in heading_text or "studio albums" in heading_text: | |
section_texts = [] | |
tag = header.find_next_sibling() | |
while tag and ( | |
not isinstance(tag, Tag) or tag.name not in ["h2", "h3"] | |
): | |
section_texts.append(tag.get_text(separator=" ", strip=True)) | |
tag = tag.find_next_sibling() | |
relevant_text = "\n\n".join(section_texts) | |
break | |
if not relevant_text: | |
article = soup.find("article") | |
if article: | |
relevant_text = article.get_text(separator=" ", strip=True) | |
if not relevant_text: | |
relevant_text = soup.get_text(separator=" ", strip=True) | |
# step 2: chunk the text (optional but recommended) | |
def chunk_text(text, max_length=1000): | |
words = text.split() | |
chunks = [] | |
for i in range(0, len(words), max_length): | |
chunks.append(" ".join(words[i : i + max_length])) | |
return chunks | |
chunks = chunk_text(relevant_text) | |
# return only the first 2–3 chunks to keep it concise | |
return "\n\n".join(chunks[:100]) | |
except ValueError as e: | |
# Catch URL validation errors | |
return str(e) | |
except Exception as e: | |
# Catch other unexpected errors | |
return f"Scraping failed: {str(e)}" | |
class SearchResult(TypedDict): | |
query: str | |
status: str | |
attempt: int | |
results: Optional[List[dict]] | |
error: Optional[str] | |
def duckduckgo_search(query: str, max_results: int = 10) -> SearchResult: | |
""" | |
Perform a DuckDuckGo search with retry and backoff. | |
Use this FIRST before invoking and scraping tools. | |
Args: | |
query: The search query string. | |
max_results: Max number of results to return (default 10). | |
Returns: | |
A dict with the query, results, status, attempt count, and any error. | |
""" | |
max_retries = 3 | |
base_delay = 2 | |
backoff_factor = 2 | |
for attempt in range(max_retries): | |
try: | |
with DDGS() as ddgs: | |
results = ddgs.text(keywords=query, max_results=max_results) | |
if results: | |
formatted_results = [ | |
{ | |
"title": result.get("title", ""), | |
"url": result.get("href", ""), | |
"body": result.get("body", ""), | |
} | |
for result in results | |
] | |
return { | |
"query": query, | |
"status": "success", | |
"attempt": attempt + 1, | |
"results": formatted_results, | |
"error": None, | |
} | |
except Exception as e: | |
print(f"[DuckDuckGo Tool] Attempt {attempt + 1} failed: {e}") | |
time.sleep(base_delay * (backoff_factor**attempt)) | |
return { | |
"query": query, | |
"status": "failed", | |
"attempt": max_retries, | |
"results": None, | |
"error": "Max retries exceeded or request failed.", | |
} | |
def reverse_decoder(question: str) -> str: | |
"""Decodes a reversed sentence if the input appears to be written backward. | |
Args: | |
question (str): The possibly reversed question string. | |
Returns: | |
str: The decoded sentence. | |
""" | |
# Remove leading punctuation if present | |
cleaned = question.strip().strip(".!?") | |
# Check if it's likely reversed (simple heuristic: mostly lowercase, reversed word order) | |
reversed_text = cleaned[::-1] | |
return reversed_text | |