|
import requests |
|
from pydantic import BaseModel, Field |
|
from huggingface_hub import InferenceClient |
|
from openai import OpenAI |
|
from bs4 import BeautifulSoup |
|
from markdownify import markdownify as md |
|
from langchain_core.tools import tool, Tool |
|
from langchain_experimental.utilities import PythonREPL |
|
from pypdf import PdfReader |
|
from io import BytesIO |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
from pytube import extract |
|
|
|
|
|
|
|
|
|
@tool |
|
def multiply(a: float, b: float) -> float: |
|
"""Multiplies two numbers. |
|
|
|
Args: |
|
a (float): the first number |
|
b (float): the second number |
|
""" |
|
return a * b |
|
|
|
|
|
@tool |
|
def add(a: float, b: float) -> float: |
|
"""Adds two numbers. |
|
|
|
Args: |
|
a (float): the first number |
|
b (float): the second number |
|
""" |
|
return a + b |
|
|
|
|
|
@tool |
|
def subtract(a: float, b: float) -> int: |
|
"""Subtracts two numbers. |
|
|
|
Args: |
|
a (float): the first number |
|
b (float): the second number |
|
""" |
|
return a - b |
|
|
|
|
|
@tool |
|
def divide(a: float, b: float) -> float: |
|
"""Divides two numbers. |
|
|
|
Args: |
|
a (float): the first float number |
|
b (float): the second float number |
|
""" |
|
if b == 0: |
|
raise ValueError("Cannot divided by zero.") |
|
return a / b |
|
|
|
|
|
@tool |
|
def modulus(a: int, b: int) -> int: |
|
"""Get the modulus of two numbers. |
|
|
|
Args: |
|
a (int): the first number |
|
b (int): the second number |
|
""" |
|
return a % b |
|
|
|
|
|
@tool |
|
def power(a: float, b: float) -> float: |
|
"""Get the power of two numbers. |
|
|
|
Args: |
|
a (float): the first number |
|
b (float): the second number |
|
""" |
|
return a**b |
|
|
|
|
|
|
|
|
|
@tool |
|
def query_image(query: str, image_url: str, need_reasoning: bool = False) -> str: |
|
"""Ask anything about an image using a Vision Language Model |
|
|
|
Args: |
|
query (str): The query about the image, e.g. how many persons are on the image? |
|
image_url (str): The URL to the image |
|
need_reasoning (bool): Set to True for complex query that require a reasoning model to answer properly. Set to False otherwise. |
|
""" |
|
|
|
|
|
PROVIDER = 'openai' |
|
|
|
try: |
|
if PROVIDER == 'huggingface': |
|
client = InferenceClient(provider="nebius") |
|
completion = client.chat.completions.create( |
|
|
|
model="Qwen/Qwen2.5-VL-72B-Instruct", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": query |
|
}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": image_url |
|
} |
|
} |
|
] |
|
} |
|
], |
|
max_tokens=512, |
|
) |
|
return completion.choices[0].message |
|
|
|
elif PROVIDER == 'openai': |
|
if need_reasoning: |
|
model_name = "o4-mini" |
|
else: |
|
model_name = "gpt-4.1-mini" |
|
client = OpenAI() |
|
response = client.responses.create( |
|
model=model_name, |
|
input=[{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "input_text", "text": query}, |
|
{ |
|
"type": "input_image", |
|
"image_url": image_url, |
|
}, |
|
], |
|
}], |
|
) |
|
|
|
return response.output_text |
|
|
|
else: |
|
raise AttributeError(f'PROVIDER must be "openai" or "huggingface", received "{PROVIDER}"') |
|
|
|
except Exception as e: |
|
return f"query_image failed: {e}" |
|
|
|
|
|
@tool |
|
def automatic_speech_recognition(file_url: str, file_extension: str) -> str: |
|
"""Transcribe an audio file to text |
|
|
|
Args: |
|
file_url (str): the URL to the audio file |
|
file_extension (str): the file extension, e.g. mp3 |
|
""" |
|
|
|
|
|
PROVIDER = 'openai' |
|
|
|
try: |
|
if PROVIDER == 'huggingface': |
|
client = InferenceClient(provider="fal-ai") |
|
return client.automatic_speech_recognition(file_url, model="openai/whisper-large-v3") |
|
|
|
elif PROVIDER == 'openai': |
|
|
|
response = requests.get(file_url) |
|
response.raise_for_status() |
|
|
|
file_extension = file_extension.replace('.','') |
|
with open(f'tmp.{file_extension}', 'wb') as file: |
|
file.write(response.content) |
|
|
|
audio_file = open(f'tmp.{file_extension}', "rb") |
|
client = OpenAI() |
|
transcription = client.audio.transcriptions.create( |
|
model="whisper-1", |
|
file=audio_file |
|
) |
|
return transcription.text |
|
|
|
else: |
|
raise AttributeError(f'PROVIDER must be "openai" or "huggingface", received "{PROVIDER}"') |
|
|
|
except Exception as e: |
|
return f"automatic_speech_recognition failed: {e}" |
|
|
|
|
|
@tool |
|
def get_webpage_content(page_url: str) -> str: |
|
"""Load a web page and return it to markdown if possible |
|
|
|
Args: |
|
page_url (str): the URL of web page to get |
|
""" |
|
try: |
|
r = requests.get(page_url) |
|
r.raise_for_status() |
|
text = "" |
|
|
|
if r.headers.get('Content-Type', '') == 'application/pdf': |
|
pdf_file = BytesIO(r.content) |
|
reader = PdfReader(pdf_file) |
|
for page in reader.pages: |
|
text += page.extract_text() |
|
else: |
|
soup = BeautifulSoup((r.text), 'html.parser') |
|
if soup.body: |
|
|
|
text = md(str(soup.body)) |
|
else: |
|
|
|
text = r.text |
|
return text |
|
except Exception as e: |
|
return f"get_webpage_content failed: {e}" |
|
|
|
|
|
|
|
|
|
|
|
class PythonREPLInput(BaseModel): |
|
code: str = Field(description="The Python code string to execute.") |
|
|
|
python_repl = PythonREPL() |
|
|
|
python_repl_tool = Tool( |
|
name="python_repl", |
|
description="""A Python REPL shell (Read-Eval-Print Loop). |
|
Use this to execute single or multi-line python commands. |
|
Input should be syntactically valid Python code. |
|
Always end your code with `print(...)` to see the output. |
|
Do NOT execute code that could be harmful to the host system. |
|
You are allowed to download files from URLs. |
|
Do NOT send commands that block indefinitely (e.g., `input()`).""", |
|
func=python_repl.run, |
|
args_schema=PythonREPLInput |
|
) |
|
|
|
@tool |
|
def get_youtube_transcript(page_url: str) -> str: |
|
"""Get the transcript of a YouTube video |
|
|
|
Args: |
|
page_url (str): YouTube URL of the video |
|
""" |
|
try: |
|
|
|
video_id = extract.video_id(page_url) |
|
|
|
|
|
ytt_api = YouTubeTranscriptApi() |
|
transcript = ytt_api.fetch(video_id) |
|
|
|
|
|
txt = '\n'.join([s.text for s in transcript.snippets]) |
|
return txt |
|
except Exception as e: |
|
return f"get_youtube_transcript failed: {e}" |
|
|