FinalAssignment / tools.py
alex-i07's picture
debug
a5e2181
raw
history blame
8.97 kB
import os
import base64
import requests
import tempfile
import pandas as pd
from openai import OpenAI
from pytubefix import YouTube
from langchain_community.tools import tool
from bs4 import BeautifulSoup, ResultSet, PageElement, Tag, NavigableString
@tool
def default_file_reader(file_path: str) -> str | None:
"""
Default file reader tool that opens a file as a text reads it content and return it as a string.
Use this default tool if there is no specific file reader for a given file.
"""
try:
with open(file_path, 'r') as file:
return file.read()
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def image_reader(file_path: str) -> dict[str, str | dict[str, str]] | None:
"""
Opens and png image and returns it's data as a dictionary.
"""
try:
with open(file_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
return {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}}
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def excel_column_reader(file_path: str) -> str | None:
"""
Opens an Excel file, reads the first row to get the names of the columns and return it as a string.
Use it to find out what data is available in the Excel file.
"""
try:
df = pd.read_excel(file_path)
return ' '.join(df.columns.astype(str))
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def excel_find_column_values_sum(file_path: str, columns: list[str]) -> None | int:
"""Opens an Excel file, find specified columns by column_name and calculates a total sum of all numeric cells of specified columns"""
try:
total = 0
df = pd.read_excel(file_path)
for column in columns:
total += df[column].sum()
return total
except FileNotFoundError as e:
print(f"Error:{e}")
return None
@tool
def wiki_search(query: str) -> str | None:
"""
Search wikipedia by query string and return content of the first found page.
Also use it to get information about shows and actors.
"""
try:
ddg_results = []
wiki_results = ""
link_rows = _fetch_ddg_search_result_links(f"wikipedia {query}")
print(query, link_rows)
for link_row in link_rows:
if not 'en.wikipedia.org' in link_row.attrs['href']:
continue
ddg_results.append({
'title': link_row.get_text(strip=True),
'url': link_row.attrs['href']
})
wiki_results += _fetch_specific_page(link_row.attrs['href'])
if len(ddg_results) == 1:
break
return wiki_results
except requests.exceptions.RequestException as e:
print(f"Error during request: {e}")
return None
except Exception as e:
print(f"Error parsing results: {e}")
return None
@tool
def archive_search(query: str) -> str | None:
"""
Search archive.org by query string and return content of the first found page.
Use this search when you need to find scientific paper or specific scientific publication detail.
"""
try:
ddg_results = []
archive_results = ""
link_rows = _fetch_ddg_search_result_links(f"archive.org {query}")
print(query, link_rows)
for link_row in link_rows:
if not 'archive.org' in link_row.attrs['href']:
continue
ddg_results.append({
'title': link_row.get_text(strip=True),
'url': link_row.attrs['href']
})
archive_results += _fetch_specific_page(link_row.attrs['href'])
if len(ddg_results) == 1:
break
return archive_results
except requests.exceptions.RequestException as e:
print(f"Error during request: {e}")
return None
except Exception as e:
print(f"Error parsing results: {e}")
return None
@tool
def get_ioc_code(country_name: str) -> str | None:
"""
Accepts country name as a string and returns IOC code of this country.
"""
try:
ioc_df = pd.read_html('https://en.wikipedia.org/wiki/List_of_IOC_country_codes')[0]
ioc_df['Code'] = ioc_df['Code'].str[-3:]
name_to_code = dict(zip(ioc_df['National Olympic Committee'], ioc_df['Code']))
return name_to_code.get(country_name)
except Exception as e:
print(f"Error: {e}")
return None
@tool
def check_commutativity(table_definition: str) -> str | None:
"""
Use this tool if you need to verify whether a binary operation defined by a table is commutative.
Returns dictionary with two fields: "is_commutative"(boolean) and
"counter_example_elements" list of elements that violates x∗y=y∗x that prove * is not commutative
Example of table definition:
|*|a|b|c|d|e|
|---|---|---|---|---|---|
|a|a|b|c|b|d|
|b|b|c|a|e|c|
|c|c|a|b|b|a|
|d|b|e|b|e|d|
|e|d|b|a|d|c|
"""
lines = [line.strip() for line in table_definition.strip().splitlines() if
line.strip().startswith('|') and not line.strip().startswith('|-')]
# Parse header: skip the '*' cell
header_cells = [cell.strip() for cell in lines[0].split('|')[1:] if cell.strip()]
S = header_cells[1:] # Skip the first header cell which is "*"
operation_table = {}
for row in lines[1:]:
cells = [cell.strip() for cell in row.split('|')[1:] if cell.strip()]
row_label = cells[0]
values = cells[1:]
if len(values) != len(S):
raise ValueError(f"Row {row_label} does not have the correct number of entries.")
operation_table[row_label] = dict(zip(S, values))
counter_example_elements = set()
for x in S:
for y in S:
if operation_table[x][y] != operation_table[y][x]:
counter_example_elements.update([x, y])
return ', '.join(sorted(counter_example_elements)) if len(counter_example_elements) > 0 else None
@tool
def audio_to_text(file_path: str) -> str | None:
"""
Transcribes audio file to text and returns text as a string.
"""
try:
client = OpenAI()
audio_file = open(file_path, "rb")
transcription = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=audio_file
)
return transcription.text
except Exception as e:
print(f"Error: {e}")
return None
@tool
def video_to_text(video_url: str) -> str | None:
"""
Downloads YouTube video by url, transcribes it to text and returns text as a string.
"""
file_path = ""
try:
ytx = YouTube(video_url)
temp_dir = tempfile.gettempdir()
ysx = ytx.streams.get_highest_resolution()
file_path = ysx.download(output_path=temp_dir)
client = OpenAI()
video_file = open(file_path, "rb")
transcription = client.audio.transcriptions.create(
model="gpt-4o-transcribe",
file=video_file,
temperature=0.0,
prompt="Ignore music playing in the background and transcribe all conversations."
)
return transcription.text
except FileNotFoundError:
print(f"Error: File {file_path} was not found.")
return None
except Exception as e:
print(f"Error: {e}")
return None
def _fetch_ddg_search_result_links(query: str) -> ResultSet[PageElement | Tag | NavigableString]:
url = "https://lite.duckduckgo.com/lite/"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
params = {
'q': query,
'kl': 'us-en'
}
ddg_response = requests.get(url, headers=headers, params=params)
ddg_response.raise_for_status()
soup = BeautifulSoup(ddg_response.text, 'html.parser')
return soup.find_all('a', {'class': 'result-link'})
def _fetch_specific_page(url: str) -> str:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
wiki_response = requests.get(url, headers=headers)
wiki_response.raise_for_status()
soup = BeautifulSoup(wiki_response.text, 'html.parser')
return soup.get_text()