Spaces:
Sleeping
Sleeping
# pylint: disable=no-member | |
import base64 | |
import gc | |
import math | |
import mimetypes | |
import multiprocessing | |
import os | |
import re | |
import tempfile | |
import time | |
import uuid | |
from datetime import timedelta | |
from typing import Dict, List, Optional, TypedDict, Union | |
from urllib.parse import urlparse | |
import cv2 | |
import imageio | |
import pandas as pd | |
import pytesseract | |
import requests | |
import torch | |
import whisper | |
import yt_dlp | |
from bs4 import BeautifulSoup, Tag | |
from dotenv import load_dotenv | |
from duckduckgo_search import DDGS | |
from langchain_core.messages import HumanMessage | |
from langchain_core.tools import tool | |
from langchain_ollama import ChatOllama | |
from PIL import Image | |
from playwright.sync_api import sync_playwright | |
from youtube_transcript_api import ( | |
NoTranscriptFound, | |
TranscriptsDisabled, | |
YouTubeTranscriptApi, | |
) | |
load_dotenv() | |
base_url = os.getenv("OLLAMA_BASE_URL") | |
model_vision = ChatOllama( | |
model="gemma3:latest", | |
base_url=base_url, | |
) | |
model_text = ChatOllama( | |
model="hf.co/lmstudio-community/Qwen2.5-14B-Instruct-GGUF:Q6_K", base_url=base_url | |
) | |
def use_vision_model(question: str) -> str: | |
""" | |
A multimodal reasoning model that combines image and text input to answer | |
questions using the image. | |
""" | |
# Extract image paths | |
image_paths = re.findall(r"[\w\-/\.]+\.(?:png|jpg|jpeg|webp)", question) | |
image_paths = [p for p in image_paths if os.path.exists(p)] | |
if not image_paths: | |
return "No valid image file found in the question." | |
image_path = image_paths[0] | |
# # Preprocess the image using OpenCV | |
# image = cv2.imread(image_path) | |
# gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
# gray = cv2.convertScaleAbs(gray, alpha=1.2, beta=20) | |
# gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
# edges = cv2.Canny(gray, 50, 150, apertureSize=3) | |
# # Create a temporary file for the processed image | |
# with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as tmp_file: | |
# temp_image_path = tmp_file.name | |
# cv2.imwrite(temp_image_path, image) | |
# Encode the temp image(this code was under with tempfile) | |
mime_type, _ = mimetypes.guess_type(image_path) | |
mime_type = mime_type or "image/png" | |
with open(image_path, "rb") as f: | |
encoded = base64.b64encode(f.read()).decode("utf-8") | |
# Prepare the prompt and image for the model | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": question}, | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:{mime_type};base64,{encoded}"}, | |
}, | |
], | |
} | |
] | |
# Invoke the vision model | |
response = model_vision.invoke(messages) | |
# Clean up | |
del messages, encoded, image_path | |
gc.collect() | |
torch.cuda.empty_cache() | |
return str(response.content) if hasattr(response, "content") else str(response) | |
# YouTube Video Review Tool | |
def review_youtube_video(url: str) -> str: | |
"""Reviews a YouTube video and answers a specific question about that video. | |
Args: | |
url (str): the URL to the YouTube video. | |
question (str): The question you are asking about the video. | |
Returns: | |
str: The answer to the question | |
""" | |
# Extract video ID from URL (assuming it is in the format https://youtube.com/watch?v=VIDEO_ID) | |
video_id = url.split("v=")[1] | |
transcript_url = ( | |
f"https://www.youtube.com/api/timedtext?v={video_id}" # Getting transcript data | |
) | |
response = requests.get(transcript_url, timeout=200) | |
transcript = response.text # This is the transcript (XML or SRT format) | |
# Prepare the content (just the transcript, no question needed) | |
transcript_content = f"Here is the transcript of the video: {transcript}" | |
# Return the transcript content so the main LLM can handle question generation | |
return transcript_content | |
# YouTube Frames to Images Tool | |
def video_frames_to_images( | |
url: str, | |
sample_interval_seconds: int = 5, | |
) -> List[str]: | |
"""Extracts frames from a video at specified intervals and saves them as images. | |
Args: | |
url (str): the URL to the video. | |
folder_name (str): the name of the folder to save the images to. | |
sample_interval_seconds (int): the interval between frames to sample. | |
Returns: | |
List[str]: A list of paths to the saved image files. | |
""" | |
folder_name = "./frames" | |
# Create a subdirectory for the frames | |
frames_dir = os.path.join(folder_name, "frames") | |
os.makedirs(frames_dir, exist_ok=True) | |
ydl_opts = { | |
"format": "bestvideo[height<=1080]+bestaudio/best[height<=1080]/best", | |
"outtmpl": os.path.join(folder_name, "video.%(ext)s"), | |
"quiet": True, | |
"noplaylist": True, | |
"merge_output_format": "mp4", | |
"force_ipv4": True, | |
} | |
info_extracted = [] | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
info_extracted.append(info) | |
video_path = next( | |
( | |
os.path.join(folder_name, f) | |
for f in os.listdir(folder_name) | |
if f.endswith(".mp4") | |
), | |
None, | |
) | |
if not video_path: | |
raise RuntimeError("Failed to download video as mp4") | |
reader = imageio.get_reader(video_path) | |
# metadata = reader.get_meta_data() | |
fps = 25 | |
duration_seconds = 120 | |
frame_interval = int(fps * sample_interval_seconds) | |
num_frames = int(fps * duration_seconds) | |
# if num_frames is None or math.isinf(num_frames): | |
# num_frames = int(fps * duration_seconds) | |
# Handle case where the number of frames is infinite or invalid | |
# if num_frames == float("inf") or not isinstance(num_frames, int): | |
# reader.close() | |
# raise RuntimeError("Invalid video length (infinite or not an integer)") | |
image_paths: List[str] = [] | |
for idx in range(num_frames): | |
if idx % frame_interval == 0: | |
# Save frame as image | |
frame = reader.get_data(idx) | |
image_path = os.path.join(frames_dir, f"frame_{idx:06d}.jpg") | |
imageio.imwrite(image_path, frame) | |
image_paths.append(image_path) | |
reader.close() | |
return image_paths | |
# File Reading Tool | |
def read_file(filepath: str) -> str: | |
"""Reads the content of a PYTHON file. | |
Args: | |
filepath (str): the path to the file to read. | |
Returns: | |
str: The content of the file. | |
""" | |
try: | |
with open(filepath, "r", encoding="utf-8") as file: | |
content = file.read() | |
# Calculate metadata for the prompt | |
filename = os.path.basename(filepath) | |
line_count = content.count("\\n") + 1 | |
code_str = content.strip() | |
# Compose the prompt | |
prompt = f""" | |
You are a Python expert and code reviewer. Analyze the following Python script and answer the question provided. | |
Give Final Answer: the output of the code | |
Script Length: {line_count} lines | |
Filename: {filename} | |
Python Code: | |
```python | |
{code_str} | |
``` | |
""" | |
model = model_text | |
# Call the model | |
message = HumanMessage(content=prompt) | |
response = model.invoke([message]) | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Return the result | |
if hasattr(response, "content") and isinstance(response.content, str): | |
return response.content | |
return str(response) | |
except FileNotFoundError: | |
return f"File not found: {filepath}" | |
except IOError as e: | |
return f"Error reading file: {str(e)}" | |
# To run python code | |
def execute_code(code: str): | |
"""Helper function to execute the code in a separate process.""" | |
try: | |
exec(code) | |
except Exception as e: | |
raise RuntimeError(f"Error executing the code: {str(e)}") from e | |
def run_code_from_file(file_path: str, timeout: int = 10): | |
""" | |
Reads a Python file and executes it, with timeout handling. | |
Args: | |
file_path (str): The full path to the Python file to execute. | |
timeout (int): The timeout in seconds before forcefully stopping the execution. | |
""" | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"The file {file_path} does not exist.") | |
# Read the file and get the code to execute | |
with open(file_path, "r", encoding="utf-8") as file: | |
code = file.read() | |
# Start a process to execute the code | |
process = multiprocessing.Process(target=execute_code, args=(code,)) | |
process.start() | |
# Wait for the process to finish or timeout | |
process.join(timeout) | |
# If the process is still alive after the timeout, terminate it | |
if process.is_alive(): | |
process.terminate() # Stop the execution | |
raise TimeoutError( | |
f"The code execution took longer than {timeout} seconds and was terminated." | |
) | |
# File Download Tool | |
def download_file_from_url(url: str, directory: str) -> Dict[str, Union[str, None]]: | |
"""Downloads a file from a URL and saves it to a directory. | |
Args: | |
url (str): the URL to download the file from. | |
directory (str): the directory to save the file to. | |
Returns: | |
Dict[str, Union[str, None]]: A dictionary containing the file type and path. | |
""" | |
response = requests.get(url, stream=True, timeout=10) | |
response.raise_for_status() | |
content_type = response.headers.get("content-type", "").lower() | |
# Try to get filename from headers | |
filename = None | |
cd = response.headers.get("content-disposition", "") | |
match = re.search(r"filename\*=UTF-8\'\'(.+)", cd) or re.search( | |
r'filename="?([^"]+)"?', cd | |
) | |
if match: | |
filename = match.group(1) | |
# If not in headers, try URL | |
if not filename: | |
filename = os.path.basename(url.split("?")[0]) | |
# Fallback to generated filename | |
if not filename: | |
extension = { | |
"image/jpeg": ".jpg", | |
"image/png": ".png", | |
"image/gif": ".gif", | |
"audio/wav": ".wav", | |
"audio/mpeg": ".mp3", | |
"video/mp4": ".mp4", | |
"text/plain": ".txt", | |
"text/csv": ".csv", | |
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", | |
"application/vnd.ms-excel": ".xls", | |
"application/octet-stream": ".bin", | |
}.get(content_type, ".bin") | |
filename = f"downloaded_file{extension}" | |
os.makedirs(directory, exist_ok=True) | |
file_path = os.path.join(directory, filename) | |
print(file_path) | |
with open(file_path, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
# shutil.copy(file_path, os.getcwd()) | |
return { | |
"type": content_type, | |
"filename": filename, | |
"path": file_path, | |
} | |
# Text Extraction from Image Tool | |
def extract_text_from_image(image_path: str) -> str: | |
"""Extracts text from an image using OCR. | |
Args: | |
image_path (str): the path to the image to extract text from. | |
Returns: | |
str: The text extracted from the image. | |
""" | |
image = Image.open(image_path) | |
text = pytesseract.image_to_string(image) | |
return f"Extracted text from image:\n\n{text}" | |
# CSV Analysis Tool | |
def analyze_csv_file(file_path: str, query: str) -> str: | |
"""Analyzes a CSV file and answers questions about its contents using an | |
Ollama model. | |
Args: | |
file_path (str): The path to the CSV file to analyze. | |
query (str): The question to answer about the CSV file. | |
Returns: | |
str: The result of the analysis. | |
""" | |
# Load the CSV file | |
df = pd.read_csv(file_path) | |
df_str = df.to_string(index=False) | |
# Compose the prompt | |
prompt = f""" | |
You are a data analyst. Analyze the following CSV data and answer the question provided. | |
CSV Dimensions: {df.shape[0]} rows × {df.shape[1]} columns | |
CSV Data: | |
{df_str} | |
Please provide: | |
1. A summary of the data structure and content | |
2. Key patterns and insights | |
3. Potential data quality issues | |
4. Suggestions for analysis | |
User Query: | |
{query} | |
Format your response in markdown with sections and bullet points. | |
""" | |
model = model_text | |
# Call the model | |
response = model.invoke([{"type": "text", "text": prompt}]) | |
del df | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Return the result | |
if hasattr(response, "content") and isinstance(response.content, str): | |
return response.content | |
return str(response) | |
# Excel Analysis Tool | |
def analyze_excel_file(file_path: str) -> str: | |
"""Analyzes an Excel file and answers questions about its contents using an | |
Ollama model | |
Args: | |
file_path (str): the path to the Excel file to analyze. | |
query (str): the question to answer about the Excel file. | |
Returns: | |
str: The result of the analysis. | |
""" | |
llm = model_text | |
print(file_path) | |
# Read all sheets from the Excel file | |
excel_file = pd.ExcelFile(file_path) | |
sheet_names = excel_file.sheet_names | |
result = f"Excel file loaded with {len(sheet_names)} sheets: {', '.join(sheet_names)}\n\n" | |
for sheet_name in sheet_names: | |
df = pd.read_excel(file_path, sheet_name=sheet_name) | |
df_str = df.to_string() | |
# Build the prompt | |
prompt = f"""Analyze the following Excel sheet data and answer the user's query. | |
Sheet Name: {sheet_name} | |
Dimensions: {len(df)} rows × {len(df.columns)} columns | |
Data: | |
{df_str} | |
Please provide: | |
1. A summary of the data structure and content | |
2. List all the values of the columns in a proper table format. | |
3. If a file contains food items, assume it refers to the | |
monetary value of the items, not the quantity sold. | |
4. If the File contains food items, make a new list which | |
contains the name of all the food item in the column only (not including drinks). | |
5. If the file contains any time of monetary value its in USD with two decimal places. | |
Format the response clearly using headings and bullet points.""" | |
# Call the LLM with the prompt | |
response = llm.invoke([HumanMessage(content=prompt)]) | |
result += f"=== Sheet: {sheet_name} ===\n" | |
result += str(response.content) + "\n" | |
result += "=" * 50 + "\n\n" | |
del df | |
gc.collect() | |
excel_file.close() | |
torch.cuda.empty_cache() | |
return result | |
# Audio Transcription Tool | |
def transcribe_audio(audio_file_path: str) -> str: | |
"""Transcribes an audio file using Whisper's audio capabilities. | |
Always give Final Answer of the question in a specific format for example list all the pages mentioned in increasing order in one line. | |
Change vanilla extract to pure vanilla extract in the final answer. | |
Args: | |
audio_file_path (str): The path to the audio file to transcribe. | |
mime_type (str): The MIME type of the audio file. | |
Returns: | |
str: The transcript of the audio file. | |
Raises: | |
ValueError: If the MIME type is not supported. | |
""" | |
model = whisper.load_model("base") | |
result = model.transcribe(audio_file_path) | |
assert isinstance(result["text"], str) | |
del model | |
torch.cuda.empty_cache() | |
gc.collect() | |
return result["text"] | |
def _extract_video_id(url: str) -> Optional[str]: | |
"""Extract video ID from YouTube URL. | |
Args: | |
url (str): the URL to the YouTube video. | |
Returns: | |
str: The video ID of the YouTube video. | |
""" | |
patterns = [ | |
r"(?:youtube\.com\/watch\?v=|youtube\.com\/embed\/|youtu\.be\/)([^&\n?#]+)", | |
r"(?:youtube\.com\/v\/|youtube\.com\/e\/|youtube\.com\/user\/[^\/]+\/|youtube\.com\/[^\/]+\/|youtube\.com\/embed\/|youtu\.be\/)([^&\n?#]+)", | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return None | |
def transcribe_youtube(url: str) -> str: | |
""" | |
Transcribes a YouTube video using YouTube Transcript API or ChatOllama with Whisper as fallback. | |
This function first tries to fetch the transcript of a YouTube video using the YouTube Transcript API. | |
If the transcript is unavailable (e.g., due to captions being disabled), it falls back to using | |
ChatOllama integrated with Whisper to transcribe the audio. | |
Args: | |
url (str): The URL to the YouTube video. | |
Returns: | |
str: The transcript of the YouTube video, or an error message if transcription fails. | |
""" | |
try: | |
# Try using YouTube Transcript API | |
video_id = _extract_video_id(url) | |
transcript = "" | |
transcript_chunks = YouTubeTranscriptApi.get_transcript( | |
video_id, languages=["en"] | |
) | |
for chunk in transcript_chunks: | |
timestamp = str(timedelta(seconds=int(chunk["start"]))) | |
transcript += f"[{timestamp}] {chunk['text']}\n" | |
# Return API transcript if available | |
if transcript.strip(): | |
return transcript | |
except (TranscriptsDisabled, NoTranscriptFound, Exception) as err: | |
try: | |
with tempfile.TemporaryDirectory() as tmpdir: | |
# Download audio from YouTube | |
ydl_opts = { | |
"format": "bestaudio/best", | |
"outtmpl": os.path.join(tmpdir, "audio.%(ext)s"), | |
"quiet": True, | |
"noplaylist": True, | |
"postprocessors": [ | |
{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "wav", | |
"preferredquality": "192", | |
} | |
], | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
if info is not None: | |
title = info.get("title", "Unknown Title") # Type:None | |
duration = info.get("duration", 0) # in seconds | |
uploader = info.get("uploader", "Unknown Uploader") | |
else: | |
title = "Unknown Title" | |
duration = 0 | |
uploader = "Unknown Uploader" | |
audio_path = next( | |
( | |
os.path.join(tmpdir, f) | |
for f in os.listdir(tmpdir) | |
if f.endswith(".wav") | |
), | |
None, | |
) | |
if not audio_path: | |
raise RuntimeError("Failed to download or convert audio") from err | |
# Use Whisper for initial transcription | |
whisper_model = whisper.load_model("base") | |
transcription = whisper_model.transcribe(audio_path, verbose=False) | |
raw_transcript = transcription["text"] | |
del whisper_model | |
gc.collect() | |
torch.cuda.empty_cache() | |
result = f"Title: {title}\nUploader: {uploader}\nDuration: {duration} seconds\nTranscript: {raw_transcript}" | |
return result | |
except Exception as fallback_exc: | |
raise RuntimeError("Fallback Transcription failed") from fallback_exc | |
return "Transcription failed unexpectedly." | |
def website_scrape(url: str) -> str: | |
"""scrapes a website and returns the text. | |
args: | |
url (str): the url to the website to scrape. | |
returns: | |
str: the text of the website. | |
""" | |
try: | |
parsed_url = urlparse(url) | |
if not parsed_url.scheme or not parsed_url.netloc: | |
raise ValueError( | |
f"Invalid URL: '{url}'. Call `duckduckgo_search` first to get a valid URL." | |
) | |
with sync_playwright() as p: | |
browser = p.chromium.launch(headless=True) | |
page = browser.new_page() | |
page.goto(url, wait_until="networkidle", timeout=60000) | |
page.wait_for_load_state("domcontentloaded") | |
html_content = page.content() | |
browser.close() | |
soup = BeautifulSoup(html_content, "html.parser") | |
relevant_text = "" | |
# for header in soup.find_all(["h2", "h3"]): | |
# heading_text = header.get_text().strip().lower() | |
# if "discography" in heading_text or "studio albums" in heading_text: | |
# section_texts = [] | |
# tag = header.find_next_sibling() | |
# while tag and ( | |
# not isinstance(tag, Tag) or tag.name not in ["h2", "h3"] | |
# ): | |
# section_texts.append(tag.get_text(separator=" ", strip=True)) | |
# tag = tag.find_next_sibling() | |
# relevant_text = "\n\n".join(section_texts) | |
# break | |
# if not relevant_text: | |
# article = soup.find("article") | |
# if article: | |
# relevant_text = article.get_text(separator=" ", strip=True) | |
# if not relevant_text: | |
relevant_text = soup.get_text(separator=" ", strip=True) | |
# step 2: chunk the text (optional but recommended) | |
def chunk_text(text, max_length=1000): | |
words = text.split() | |
chunks = [] | |
for i in range(0, len(words), max_length): | |
chunks.append(" ".join(words[i : i + max_length])) | |
return chunks | |
chunks = chunk_text(relevant_text) | |
# return only the first 2–3 chunks to keep it concise | |
return "\n\n".join(chunks[:5]) | |
except ValueError as e: | |
# Catch URL validation errors | |
return str(e) | |
except Exception as e: | |
# Catch other unexpected errors | |
return f"Scraping failed: {str(e)}" | |
class SearchResult(TypedDict): | |
query: str | |
status: str | |
attempt: int | |
results: Optional[List[dict]] | |
error: Optional[str] | |
def duckduckgo_search(query: str, max_results: int = 10) -> SearchResult: | |
""" | |
Perform a DuckDuckGo search with retry and backoff. | |
Use this FIRST before invoking and scraping tools. | |
Args: | |
query: The search query string. | |
max_results: Max number of results to return (default 10). | |
Returns: | |
A dict with the query, results, status, attempt count, and any error. | |
""" | |
max_retries = 3 | |
base_delay = 2 | |
backoff_factor = 2 | |
for attempt in range(max_retries): | |
try: | |
with DDGS() as ddgs: | |
results = ddgs.text(keywords=query, max_results=max_results) | |
if results: | |
formatted_results = [ | |
{ | |
"title": result.get("title", ""), | |
"url": result.get("href", ""), | |
"body": result.get("body", ""), | |
} | |
for result in results | |
] | |
return { | |
"query": query, | |
"status": "success", | |
"attempt": attempt + 1, | |
"results": formatted_results, | |
"error": None, | |
} | |
except Exception as e: | |
print(f"[DuckDuckGo Tool] Attempt {attempt + 1} failed: {e}") | |
time.sleep(base_delay * (backoff_factor**attempt)) | |
return { | |
"query": query, | |
"status": "failed", | |
"attempt": max_retries, | |
"results": None, | |
"error": "Max retries exceeded or request failed.", | |
} | |
def reverse_decoder(question: str) -> str: | |
"""Decodes a reversed sentence if the input appears to be written backward. | |
Args: | |
question (str): The possibly reversed question string. | |
Returns: | |
str: The decoded sentence. | |
""" | |
# Remove leading punctuation if present | |
cleaned = question.strip().strip(".!?") | |
# Check if it's likely reversed (simple heuristic: mostly lowercase, reversed word order) | |
reversed_text = cleaned[::-1] | |
return reversed_text | |