|  | import requests | 
					
						
						|  | from pydantic import BaseModel, Field | 
					
						
						|  | from huggingface_hub import InferenceClient | 
					
						
						|  | from openai import OpenAI | 
					
						
						|  | from bs4 import BeautifulSoup | 
					
						
						|  | from markdownify import markdownify as md | 
					
						
						|  | from langchain_core.tools import tool, Tool | 
					
						
						|  | from langchain_experimental.utilities import PythonREPL | 
					
						
						|  | from pypdf import PdfReader | 
					
						
						|  | from io import BytesIO | 
					
						
						|  | from youtube_transcript_api import YouTubeTranscriptApi | 
					
						
						|  | from pytube import extract | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def multiply(a: float, b: float) -> float: | 
					
						
						|  | """Multiplies two numbers. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | a (float): the first number | 
					
						
						|  | b (float): the second number | 
					
						
						|  | """ | 
					
						
						|  | return a * b | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def add(a: float, b: float) -> float: | 
					
						
						|  | """Adds two numbers. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | a (float): the first number | 
					
						
						|  | b (float): the second number | 
					
						
						|  | """ | 
					
						
						|  | return a + b | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def subtract(a: float, b: float) -> int: | 
					
						
						|  | """Subtracts two numbers. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | a (float): the first number | 
					
						
						|  | b (float): the second number | 
					
						
						|  | """ | 
					
						
						|  | return a - b | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def divide(a: float, b: float) -> float: | 
					
						
						|  | """Divides two numbers. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | a (float): the first float number | 
					
						
						|  | b (float): the second float number | 
					
						
						|  | """ | 
					
						
						|  | if b == 0: | 
					
						
						|  | raise ValueError("Cannot divided by zero.") | 
					
						
						|  | return a / b | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def modulus(a: int, b: int) -> int: | 
					
						
						|  | """Get the modulus of two numbers. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | a (int): the first number | 
					
						
						|  | b (int): the second number | 
					
						
						|  | """ | 
					
						
						|  | return a % b | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def power(a: float, b: float) -> float: | 
					
						
						|  | """Get the power of two numbers. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | a (float): the first number | 
					
						
						|  | b (float): the second number | 
					
						
						|  | """ | 
					
						
						|  | return a**b | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def query_image(query: str, image_url: str, need_reasoning: bool = False) -> str: | 
					
						
						|  | """Ask anything about an image using a Vision Language Model | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | query (str): The query about the image, e.g. how many persons are on the image? | 
					
						
						|  | image_url (str): The URL to the image | 
					
						
						|  | need_reasoning (bool): Set to True for complex query that require a reasoning model to answer properly. Set to False otherwise. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | PROVIDER = 'openai' | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | if PROVIDER == 'huggingface': | 
					
						
						|  | client = InferenceClient(provider="nebius") | 
					
						
						|  | completion = client.chat.completions.create( | 
					
						
						|  |  | 
					
						
						|  | model="Qwen/Qwen2.5-VL-72B-Instruct", | 
					
						
						|  | messages=[ | 
					
						
						|  | { | 
					
						
						|  | "role": "user", | 
					
						
						|  | "content": [ | 
					
						
						|  | { | 
					
						
						|  | "type": "text", | 
					
						
						|  | "text": query | 
					
						
						|  | }, | 
					
						
						|  | { | 
					
						
						|  | "type": "image_url", | 
					
						
						|  | "image_url": { | 
					
						
						|  | "url": image_url | 
					
						
						|  | } | 
					
						
						|  | } | 
					
						
						|  | ] | 
					
						
						|  | } | 
					
						
						|  | ], | 
					
						
						|  | max_tokens=512, | 
					
						
						|  | ) | 
					
						
						|  | return completion.choices[0].message | 
					
						
						|  |  | 
					
						
						|  | elif PROVIDER == 'openai': | 
					
						
						|  | if need_reasoning: | 
					
						
						|  | model_name = "o4-mini" | 
					
						
						|  | else: | 
					
						
						|  | model_name = "gpt-4.1-mini" | 
					
						
						|  | client = OpenAI() | 
					
						
						|  | response = client.responses.create( | 
					
						
						|  | model=model_name, | 
					
						
						|  | input=[{ | 
					
						
						|  | "role": "user", | 
					
						
						|  | "content": [ | 
					
						
						|  | {"type": "input_text", "text": query}, | 
					
						
						|  | { | 
					
						
						|  | "type": "input_image", | 
					
						
						|  | "image_url": image_url, | 
					
						
						|  | }, | 
					
						
						|  | ], | 
					
						
						|  | }], | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return response.output_text | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | raise AttributeError(f'PROVIDER must be "openai" or "huggingface", received "{PROVIDER}"') | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return f"query_image failed: {e}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def automatic_speech_recognition(file_url: str, file_extension: str) -> str: | 
					
						
						|  | """Transcribe an audio file to text | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | file_url (str): the URL to the audio file | 
					
						
						|  | file_extension (str): the file extension, e.g. mp3 | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | PROVIDER = 'openai' | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | if PROVIDER == 'huggingface': | 
					
						
						|  | client = InferenceClient(provider="fal-ai") | 
					
						
						|  | return client.automatic_speech_recognition(file_url, model="openai/whisper-large-v3") | 
					
						
						|  |  | 
					
						
						|  | elif PROVIDER == 'openai': | 
					
						
						|  |  | 
					
						
						|  | response = requests.get(file_url) | 
					
						
						|  | response.raise_for_status() | 
					
						
						|  |  | 
					
						
						|  | file_extension = file_extension.replace('.','') | 
					
						
						|  | with open(f'tmp.{file_extension}', 'wb') as file: | 
					
						
						|  | file.write(response.content) | 
					
						
						|  |  | 
					
						
						|  | audio_file = open(f'tmp.{file_extension}', "rb") | 
					
						
						|  | client = OpenAI() | 
					
						
						|  | transcription = client.audio.transcriptions.create( | 
					
						
						|  | model="whisper-1", | 
					
						
						|  | file=audio_file | 
					
						
						|  | ) | 
					
						
						|  | return transcription.text | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  | raise AttributeError(f'PROVIDER must be "openai" or "huggingface", received "{PROVIDER}"') | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return f"automatic_speech_recognition failed: {e}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def get_webpage_content(page_url: str) -> str: | 
					
						
						|  | """Load a web page and return it to markdown if possible | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | page_url (str): the URL of web page to get | 
					
						
						|  | """ | 
					
						
						|  | try: | 
					
						
						|  | r = requests.get(page_url) | 
					
						
						|  | r.raise_for_status() | 
					
						
						|  | text = "" | 
					
						
						|  |  | 
					
						
						|  | if r.headers.get('Content-Type', '') == 'application/pdf': | 
					
						
						|  | pdf_file = BytesIO(r.content) | 
					
						
						|  | reader = PdfReader(pdf_file) | 
					
						
						|  | for page in reader.pages: | 
					
						
						|  | text += page.extract_text() | 
					
						
						|  | else: | 
					
						
						|  | soup = BeautifulSoup((r.text), 'html.parser') | 
					
						
						|  | if soup.body: | 
					
						
						|  |  | 
					
						
						|  | text = md(str(soup.body)) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | text = r.text | 
					
						
						|  | return text | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return f"get_webpage_content failed: {e}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class PythonREPLInput(BaseModel): | 
					
						
						|  | code: str = Field(description="The Python code string to execute.") | 
					
						
						|  |  | 
					
						
						|  | python_repl = PythonREPL() | 
					
						
						|  |  | 
					
						
						|  | python_repl_tool = Tool( | 
					
						
						|  | name="python_repl", | 
					
						
						|  | description="""A Python REPL shell (Read-Eval-Print Loop). | 
					
						
						|  | Use this to execute single or multi-line python commands. | 
					
						
						|  | Input should be syntactically valid Python code. | 
					
						
						|  | Always end your code with `print(...)` to see the output. | 
					
						
						|  | Do NOT execute code that could be harmful to the host system. | 
					
						
						|  | You are allowed to download files from URLs. | 
					
						
						|  | Do NOT send commands that block indefinitely (e.g., `input()`).""", | 
					
						
						|  | func=python_repl.run, | 
					
						
						|  | args_schema=PythonREPLInput | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def get_youtube_transcript(page_url: str) -> str: | 
					
						
						|  | """Get the transcript of a YouTube video | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | page_url (str): YouTube URL of the video | 
					
						
						|  | """ | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | video_id = extract.video_id(page_url) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | ytt_api = YouTubeTranscriptApi() | 
					
						
						|  | transcript = ytt_api.fetch(video_id) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | txt = '\n'.join([s.text for s in transcript.snippets]) | 
					
						
						|  | return txt | 
					
						
						|  | except Exception as e: | 
					
						
						|  | return f"get_youtube_transcript failed: {e}" | 
					
						
						|  |  |