Spaces:
Sleeping
Sleeping
| from langchain_core.tools import tool, Tool | |
| import math | |
| def calculator_tool(expression: str) -> str: | |
| """ | |
| Evaluate a mathematical expression. | |
| """ | |
| # Define the restricted global and local namespace | |
| safe_globals = {"__builtins__": {}} | |
| safe_locals = { | |
| # Math functions | |
| 'sqrt': math.sqrt, | |
| 'sin': math.sin, | |
| 'cos': math.cos, | |
| 'tan': math.tan, | |
| 'log': math.log10, # log base 10 | |
| 'ln': math.log, # natural log | |
| 'exp': math.exp, | |
| 'pow': pow, | |
| # Constants | |
| 'pi': math.pi, | |
| 'e': math.e, | |
| # Built-in math utilities | |
| 'abs': abs, | |
| 'round': round, | |
| 'max': max, | |
| 'min': min, | |
| 'sum': sum, | |
| } | |
| try: | |
| # Evaluate the expression in a restricted environment | |
| result = eval(expression, safe_globals, safe_locals) | |
| # Handle None explicitly | |
| if result is None: | |
| return "calculator tool produced no valid result" | |
| # Optional: Round very small floats to avoid scientific notation | |
| if isinstance(result, float) and abs(result) < 1e-9: | |
| result = round(result, 10) | |
| return str(result) | |
| except SyntaxError as se: | |
| return f"Syntax error in expression: {str(se)}" | |
| except NameError as ne: | |
| return f"Undefined variable or function used: {str(ne)}" | |
| except ZeroDivisionError: | |
| return "Error: Division by zero" | |
| except Exception as e: | |
| return f"Evaluation error: {str(e)}" | |
| from langchain_tavily import TavilySearch | |
| def web_search(query: str) -> str: | |
| """ | |
| Searches the web and returns a list of the most relevant URLs. | |
| Use this FIRST for complex queries, metadata questions, or to find the right sources. | |
| Then follow up with get_webdoc_content or get_website_content on the most promising URL. | |
| """ | |
| try: | |
| tavily_search = TavilySearch( | |
| max_results=5, | |
| topic="general", | |
| search_depth="advanced", | |
| include_raw_content=False, # Just URLs and snippets | |
| ) | |
| results = tavily_search.invoke(query) | |
| # Format results to show URLs and brief descriptions | |
| web_search_results = "Search Results:\n" | |
| for i, result in enumerate(results["results"], 1): | |
| web_search_results += f"{i}. {result['title']}: {result['url']}\n {result['content'][:150]}...\n\n" | |
| return web_search_results | |
| except Exception as e: | |
| return f"web_search tool error: {str(e)}" | |
| import os | |
| import tempfile | |
| import requests | |
| import easyocr | |
| from io import BytesIO | |
| from PIL import Image | |
| from openai import OpenAI | |
| def query_image(query: str, source: str, need_ocr: bool = True, need_reasoning: bool = False) -> str: | |
| """Use ONLY to answer question about an image using a Vision Language Model. | |
| NOT used to perform image processing or other tasks EXCEPT asking question about an image. | |
| Args: | |
| query (str): The question about the image, e.g. how many persons are on the image? | |
| source (str): URL to the image | |
| need_reasoning (bool): Set to True for complex query that require a reasoning model to answer properly. Set to False otherwise. | |
| need_ocr (bool): If True, also extract visible text from the image. Set to False otherwise. | |
| """ | |
| try: | |
| # OCR Extraction (optional) | |
| ocr_text = "" | |
| if need_ocr: | |
| try: | |
| # Download image from URL | |
| response = requests.get(source, stream=True, timeout=10) | |
| response.raise_for_status() | |
| # Load image into PIL | |
| image = Image.open(BytesIO(response.content)) | |
| # Save to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as tmpfile: | |
| image.save(tmpfile, format=image.format) | |
| file_to_use = tmpfile.name | |
| # Perform OCR | |
| reader = easyocr.Reader(['en']) | |
| results = reader.readtext(file_to_use) | |
| ocr_text = "\n".join([res[1] for res in results]) | |
| ocr_text = f"\n\n[OCR Extracted Text]:\n{ocr_text}" | |
| except Exception as ocr_error: | |
| ocr_text = f"\n\n[OCR Error]: {str(ocr_error)}" | |
| finally: | |
| # Clean up temporary file | |
| if file_to_use and os.path.exists(file_to_use): | |
| os.unlink(file_to_use) | |
| # Query Vision Language Model | |
| client = OpenAI() | |
| if need_reasoning: | |
| model_name = "o4-mini" | |
| else: | |
| model_name = "gpt-4o-mini" | |
| response = client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": query}, | |
| {"type": "image_url", "image_url": {"url": source}}, | |
| ], | |
| } | |
| ], | |
| max_tokens=512, | |
| ) | |
| content = response.choices[0].message.content | |
| # Combine OCR and VLM output | |
| final_response = content | |
| if need_ocr and ocr_text: | |
| final_response += ocr_text | |
| return final_response | |
| except Exception as e: | |
| return f"Image query failed: {str(e)}" | |
| from pydantic import BaseModel, Field | |
| from e2b import Sandbox | |
| import re | |
| import os | |
| class PythonCodeInput(BaseModel): | |
| code: str = Field(description="The Python code string to execute.") | |
| def python_repl(code: str) -> str: | |
| """ | |
| Use this to execute single or multi-line Python commands to perform tasks like: | |
| sort a list in ascending or descending order, reverse input string, draw a table, photo processing, etc. | |
| Input should be syntactically valid Python code. | |
| Make sure to include required imports in the code. | |
| Always include in your code `print(...)` or `image.save(...)` to return outputs that can be seen. | |
| You are allowed to access internet and download files from URLs via code (e.g., using requests) | |
| Avoid using any system-level commands or libraries that could harm the host system. | |
| Avoid commands that require user input or block indefinitely (e.g., `input()`). | |
| """ | |
| # List of forbidden patterns in code | |
| FORBIDDEN_PATTERNS = [ | |
| r'\bimport\s+(os|sys|subprocess|shutil|socket)', | |
| r'\b(eval|exec|input|open)\s*$(?=.*\w)', | |
| r'\b__import__', | |
| r'\bos\.', | |
| r'\bsys\.', | |
| r'\bsubprocess\.', | |
| ] | |
| # Step 1: Keyword-based security check | |
| for pattern in FORBIDDEN_PATTERNS: | |
| if re.search(pattern, code): | |
| match = re.search(pattern, code).group() | |
| return f"Blocked unsafe operation: {match}" | |
| # Step 2: Create E2B sandbox | |
| try: | |
| with Sandbox(api_key=os.getenv("E2B_API_KEY")) as sandbox: | |
| # Known mismatches: import name -> pip package name | |
| import_to_pip = { | |
| "PIL": "pillow", | |
| "cv2": "opencv-python", | |
| "yaml": "PyYAML", | |
| "bs4": "beautifulsoup4", | |
| "tkinter": "tk", | |
| } | |
| # Built-in modules that don't need installation | |
| built_in_modules = { | |
| "math", "re", "json", "csv", "os", "sys", "time", "datetime", "random", | |
| "itertools", "functools", "__future__", "collections", "pathlib", "io", | |
| } | |
| # Step 1: Extract import statements | |
| import_matches = re.findall( | |
| r'(?:import\s+([a-zA-Z0-9_]+)(?!\.)|\bfrom\s+([a-zA-Z0-9_]+)(?=\s+import\b))', | |
| code | |
| ) | |
| base_imports = set() | |
| base_imports = set(match[0] or match[1] for match in import_matches) # match[0] = 'import X', match[1] = 'from X import Y' | |
| # Step 2: Determine which packages to install | |
| packages_to_install = set() | |
| for imp in base_imports: | |
| # Skip known built-ins | |
| if imp in built_in_modules: | |
| continue | |
| # Use mapped name if exists, else use import name | |
| package_name = import_to_pip.get(imp, imp) | |
| # Avoid installing system-specific modules like __pycache__ | |
| if imp.startswith("__"): | |
| continue | |
| packages_to_install.add(package_name) | |
| # Step 3: Install necessary packages | |
| if packages_to_install: | |
| install_cmd = f"pip install {' '.join(packages_to_install)}" | |
| result = sandbox.commands.run(install_cmd) | |
| if result.stderr: | |
| return f"Failed to install packages:\n{result.stderr}" | |
| # Step 4: Write and run the user code | |
| CODE_FILE_PATH = "/tmp/code.py" | |
| sandbox.files.write(CODE_FILE_PATH, code) | |
| # Step 5: Execute the code using the new API | |
| result = sandbox.commands.run(f"python {CODE_FILE_PATH}") | |
| stdout = result.stdout.strip() | |
| stderr = result.stderr.strip() | |
| # Step 6: Return output | |
| if stderr: | |
| return f"Execution error:\n{stderr}" | |
| return stdout or "No output" | |
| except Exception as e: | |
| return f"Sandbox error: {str(e)}" | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from PyPDF2 import PdfReader | |
| from io import BytesIO | |
| from markdownify import markdownify | |
| def get_webdoc_content(url: str) -> str: | |
| """ | |
| Extracts content from PDFs or document-like URLs (academic papers, reports) | |
| Can be used after web_search to get detailed information. | |
| Args: | |
| url (str): the URL of web page to extract the content from | |
| """ | |
| try: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| content_type = response.headers.get('Content-Type', '') | |
| # PDF Handling | |
| if 'application/pdf' in content_type: | |
| pdf_file = BytesIO(response.content) | |
| reader = PdfReader(pdf_file) | |
| text = "\n".join(page.extract_text() for page in reader.pages) | |
| # return f"## PDF Content from {page_url}\n\n{text[:15000]}" | |
| return f"## PDF Content from {url}\n\n```\n{text[:15000]}\n```" | |
| # HTML Document Handling | |
| elif 'text/html' in content_type: | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| cleaned_html = soup.body or soup # Fallback to full document | |
| return markdownify(str(cleaned_html), strip=['a']) | |
| # Fallback: Raw text extraction | |
| else: | |
| return f"## Raw Content from {url}\n\n{response.text[:15000]}" | |
| except requests.exceptions.RequestException as e: | |
| return f"HTTP error in get_webpage_content: {str(e)}" | |
| except Exception as e: | |
| return f"Unexpected error in get_webpage_content: {str(e)}" | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify | |
| def get_website_content(url: str) -> str: | |
| """ | |
| Extracts contents from HTML-based URLs. | |
| Specializes in Wikipedia, technical documentation, and discussion pages. | |
| NOT used for document-based URLs (academic papers, reports). | |
| Used after web_search to get detailed information. | |
| Args: | |
| url (str): The URL of the web page to extract content from | |
| """ | |
| try: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove non-content elements | |
| for element in soup.select('script, style, footer, nav, header, aside'): | |
| element.decompose() | |
| # Convert cleaned HTML to markdown | |
| cleaned_html = str(soup.body) if soup.body else str(soup) | |
| markdown_content = markdownify(cleaned_html, strip=['a']) # Optional: strip links | |
| return f"## Extracted Content from {url}\n\n{markdown_content[:15000]}" # Limit length | |
| except requests.exceptions.RequestException as e: | |
| return f"HTTP error in web_content_extract: {str(e)}" | |
| except Exception as e: | |
| return f"Unexpected error in web_content_extract: {str(e)}" | |
| import os | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| def extract_answer_from_content(content: str | dict, query: str) -> str: | |
| """ | |
| Extract relevant information from content based on user query. | |
| Args: | |
| content (str/dict): Raw text or transcribed test from audio or structured content from any source | |
| query (str): Natural language question to answer | |
| Returns: | |
| str: Concise answer extracted from content | |
| """ | |
| try: | |
| # Normalize content format | |
| if isinstance(content, dict): | |
| text_content = "" | |
| if "summary" in content: | |
| text_content += f"SUMMARY: {content['summary']}\n\n" | |
| if "infobox" in content: | |
| text_content += "INFOBOX:\n" | |
| for k, v in content["infobox"].items(): | |
| text_content += f"{k}: {v}\n" | |
| text_content += "\n" | |
| if "sections" in content: | |
| for section, text in content["sections"].items(): | |
| text_content += f"{section}:\n{text}\n\n" | |
| else: | |
| text_content = content | |
| # Initialize OpenAI embeddings | |
| embeddings = OpenAIEmbeddings( | |
| openai_api_key=os.getenv("OPENAI_API_KEY"), | |
| model="text-embedding-3-large" | |
| ) | |
| # Split content into manageable chunks | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=100 | |
| ) | |
| chunks = text_splitter.split_text(text_content) | |
| # Create vector store | |
| vectorstore = FAISS.from_texts(chunks, embeddings) | |
| retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) | |
| # Get most relevant content | |
| relevant_docs = retriever.invoke(query) | |
| combined_text = " ".join([doc.page_content for doc in relevant_docs]) | |
| # Return relevant content with context | |
| return f"Relevant information found:\n{combined_text[:1500]}" | |
| except Exception as e: | |
| return f"Content extraction failed: {str(e)}" | |
| import os | |
| import requests | |
| from openai import OpenAI | |
| def transcribe_audio(source: str, file_extension: str) -> str: | |
| """ | |
| Transcribes an audio to text from local path or URL. | |
| Args: | |
| source (str): URL to an audio file. | |
| Returns: | |
| str: The transcribed text, or error message. | |
| """ | |
| # If file is not existing use download_file_from_url tool to download the file first. | |
| client = OpenAI() | |
| try: | |
| # download the audio file | |
| response = requests.get(source) | |
| response.raise_for_status() | |
| # write to disk | |
| file_extension = file_extension.replace('.','') | |
| with open(f'tmp.{file_extension}', 'wb') as file: | |
| file.write(response.content) | |
| audio_file = open(f'tmp.{file_extension}', "rb") | |
| client = OpenAI() | |
| transcription = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file | |
| ) | |
| return transcription.text | |
| except Exception as e: | |
| return f"Transcription error: {str(e)}" | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from pytube import extract | |
| def get_youtube_transcript(page_url: str) -> str: | |
| """Get the transcript of audio component of YouTube video. | |
| Use this for Youtube videos with available transcripts | |
| Args: | |
| page_url (str): YouTube URL of the video | |
| """ | |
| try: | |
| # Get video ID from URL | |
| video_id = extract.video_id(page_url) | |
| # Get transcript using correct method | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| # Return concatenated text | |
| return '\n'.join([s['text'] for s in transcript]) | |
| except Exception as e: | |
| return f"get_youtube_transcript failed: {str(e)}" | |
| from tabulate import tabulate | |
| from typing import Dict, Any, List | |
| def generate_table_from_data(data: List[Dict[str, Any]]) -> str: | |
| """ | |
| Convert list of dictionaries to markdown table | |
| Args: | |
| data (List[Dict]): List of objects with common keys | |
| Returns: | |
| str: Markdown-formatted table | |
| """ | |
| if not data: | |
| return "No data available" | |
| headers = data[0].keys() | |
| rows = [list(item.values()) for item in data] | |
| return tabulate(rows, headers=headers, tablefmt="pipe") | |
| from pydantic import BaseModel, Field | |
| from typing import List, Dict | |
| class CommutativeCheckInput(BaseModel): | |
| table_str: str = Field(..., description="Markdown-formatted string of the operation table (e.g., |*|a|b|c|...)") | |
| elements: List[str] = Field(..., description="List of elements in the set S") | |
| def check_commutative(table_str: str, elements: List[str]) -> str: | |
| """ | |
| Analyzes a binary operation table for commutativity. | |
| Args: | |
| table_str (str): Markdown-formatted string of the operation table. | |
| elements (List[str]): List of elements in the set S. | |
| Returns: | |
| str: Comma-separated list of element pairs (e.g., "b,e") where x*y β y*x. | |
| """ | |
| # Parse the table string into a 2D list | |
| lines = [line.strip() for line in table_str.strip().split('\n') if line.strip()] | |
| header = [cell.strip() for cell in lines[0].split('|') if cell.strip()][1:] # Skip the first cell (operator) | |
| rows = [] | |
| for line in lines[2:]: | |
| cells = [cell.strip() for cell in line.split('|') if cell.strip()] # Remove empty cells | |
| if cells: | |
| rows.append(cells) | |
| # Validate that all rows have the correct number of cells | |
| expected_length = len(header) + 1 # x + one for each header | |
| for row in rows: | |
| if len(row) < expected_length: | |
| return f"Error: Row '{row[0]}' has {len(row)} cells, but expected {expected_length}." | |
| # Build a dictionary for the operation: op[x][y] = result | |
| operation: Dict[str, Dict[str, str]] = {} | |
| for row in rows: | |
| x = row[0] | |
| operation[x] = {} | |
| for i, y in enumerate(header): | |
| operation[x][y] = row[i + 1] | |
| # Check all pairs (x, y) for x*y == y*x | |
| counterexamples = [] | |
| for x in elements: | |
| for y in elements: | |
| if x < y: # Avoid redundant checks and self-comparison | |
| try: | |
| xy = operation[x][y] | |
| yx = operation[y][x] | |
| if xy != yx: | |
| counterexamples.append(f"{x},{y}") | |
| except KeyError as e: | |
| return f"Error: Missing data for pair ({x}, {y}) in table." | |
| return "\n".join(counterexamples) if counterexamples else "The operation is commutative." |