| | import base64 |
| | import csv |
| | from langchain_openai import ChatOpenAI |
| | import openai |
| | import requests |
| | import pandas as pd |
| | import tempfile |
| | import os |
| | import json |
| | from pathlib import Path |
| | from typing import Any, Union |
| | from dotenv import load_dotenv |
| | from bs4 import BeautifulSoup |
| |
|
| |
|
| | from langchain_core.messages import HumanMessage |
| | from langchain_core.output_parsers import StrOutputParser |
| | from langchain.tools import tool |
| | from langchain_openai import ChatOpenAI |
| | from langchain_community.document_loaders import YoutubeLoader |
| | from langchain_community.document_loaders.youtube import TranscriptFormat |
| | from langchain_community.tools import ArxivQueryRun |
| | from langchain_community.utilities import ArxivAPIWrapper |
| | from langchain.agents.agent_types import AgentType |
| | from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent |
| |
|
| | import wikipedia |
| | from langchain_tavily import TavilySearch |
| | import yt_dlp |
| | from yt_dlp.utils import DownloadError, ExtractorError |
| | from utils import setup_llm |
| |
|
| | current_dir = Path(__file__).parent.absolute() |
| | env_path = current_dir / ".env" |
| |
|
| | load_dotenv(dotenv_path=env_path, override=True) |
| |
|
| | @tool |
| | def read_file(file_path: str) -> str: |
| | """Read the entire content of a text file specified by its path.""" |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | return f.read() |
| | except Exception as e: |
| | return f"read_file tool error: {str(e)}" |
| |
|
| | @tool |
| | def web_search(query: str) -> str: |
| | """ |
| | Searches the web and returns a list of the most relevant URLs. |
| | Use this FIRST for complex queries, metadata questions, or to find the right sources. |
| | Then follow up with web_content_extract on the most promising URL. |
| | """ |
| | try: |
| | tavily_search = TavilySearch( |
| | max_results=5, |
| | topic="general", |
| | search_depth="advanced", |
| | include_raw_content=False, |
| | ) |
| |
|
| | results = tavily_search.invoke(query) |
| | |
| | web_search_results = "Search Results:\n" |
| | for i, result in enumerate(results["results"], 1): |
| | web_search_results += f"{i}. {result['title']}: {result['url']}\n {result['content'][:150]}...\n\n" |
| |
|
| | return web_search_results |
| | except Exception as e: |
| | return f"web_search tool error: {str(e)}" |
| |
|
| | @tool |
| | def web_content_extract(url: str) -> str: |
| | """ |
| | Extracts and analyzes specific content from a URL using BeautifulSoup. |
| | Particularly effective for Wikipedia metadata pages, discussion pages, |
| | and structured web content. |
| | Can be used after web_search to get detailed information. |
| | """ |
| | try: |
| | import requests |
| | from bs4 import BeautifulSoup |
| |
|
| | headers = { |
| | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| | } |
| |
|
| | response = requests.get(url, headers=headers, timeout=10) |
| | response.raise_for_status() |
| |
|
| | soup = BeautifulSoup(response.text, 'html.parser') |
| | for element in soup.select('script, style, footer, nav, header'): |
| | if element: |
| | element.decompose() |
| | text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True) |
| |
|
| | |
| | return f"Content extracted from {url}:\n\n{text[:10000]}..." if len(text) > 10000 else text |
| |
|
| | except Exception as e: |
| | return f"web_content_extract tool error: {str(e)}" |
| | """ |
| | # not used |
| | def web_search(query: str) -> str: |
| | |
| | Searches the web for general information. Optimal for: |
| | 1. Recent events or time-sensitive information |
| | 2. Metadata about websites |
| | 3. Questions combining multiple specific criteria |
| | 4. Queries that need to search across multiple websites or data sources |
| | |
| | try: |
| | tavily_search = TavilySearch( |
| | max_results=3, |
| | topic="general", |
| | search_depth="basic", |
| | include_raw_content=True |
| | ) |
| | |
| | # This returns a string representation of the results |
| | return tavily_search.invoke(query) |
| | except Exception as e: |
| | return f"web_search tool error: {str(e)}" |
| | _summary_ |
| | |
| | Raises: |
| | ValueError: _description_ |
| | RuntimeError: _description_ |
| | |
| | Returns: |
| | _type_: _description_ |
| | """ |
| | @tool |
| | def wikipedia_search(query: str, get_summary: bool = True) -> str: |
| | """ |
| | Searches Wikipedia for factual information contained within articles. |
| | Best for: |
| | 1. Basic encyclopedic facts about topics |
| | 2. Definitions and explanations of concepts |
| | 3. Historical information about well-documented subjects |
| | 4. Classification and categorical information |
| | |
| | Args: |
| | query (str): The query to search Wikipedia for |
| | get_summary (bool): Whether to get the summary of the Wikipedia page instead of the full content |
| | """ |
| |
|
| | try: |
| | page = wikipedia.page(title=query, auto_suggest=True, redirect=True) |
| |
|
| | |
| | full_content_html = page.html() |
| |
|
| | |
| | soup = BeautifulSoup(full_content_html, 'html.parser') |
| | text_content = soup.get_text() |
| |
|
| | if get_summary: |
| | llm_agent_management = setup_llm()[0] |
| | message = [ |
| | HumanMessage(content=f"Provide response to the following query: {query}\n\nbased on the following content: {text_content}") |
| | ] |
| | response = llm_agent_management.invoke(message) |
| | return response.content |
| | |
| | |
| | response = f"Page: {page.title}\nSource: {page.url}\n\n{text_content}" |
| | if response: |
| | return StrOutputParser().invoke(response) |
| | else: |
| | return "wikipedia_search tool produced empty response" |
| |
|
| | |
| | except wikipedia.exceptions.PageError: |
| | |
| | return f"Wikipedia page for query '{query}' does not match any known pages." |
| | except wikipedia.exceptions.DisambiguationError as e: |
| | |
| | options = "\n - ".join(e.options[:5]) |
| | return f"Ambiguous query '{query}'. Did you mean:\n - {options}\nPlease refine your search." |
| | except Exception as e: |
| | return f"wikipedia_search tool error: {str(e)}" |
| |
|
| | @tool |
| | def arxiv_search(query: str) -> str: |
| | """ |
| | Search Arxiv for scientific papers |
| | """ |
| | try: |
| | arxiv_api = ArxivAPIWrapper(top_k_results=3) |
| | arxiv_search = ArxivQueryRun(api_wrapper=arxiv_api) |
| | result = arxiv_search.invoke(query) |
| | response = result.content + "\n\n" |
| |
|
| | if response: |
| | return response |
| | else: |
| | return "arxiv_search tool produced empty response" |
| | except Exception as e: |
| | return f"arxiv_search tool error: {str(e)}" |
| |
|
| | @tool |
| | def calculator_tool(expression: str) -> str: |
| | """Evaluate a mathematical expression.""" |
| | try: |
| | result = eval(expression, {"__builtins__": {}}, |
| | {"abs": abs, "round": round, "max": max, "min": min}) |
| | if result: |
| | return str(result) |
| | else: |
| | return "calculator tool produced empty response" |
| | except Exception as e: |
| | return f"calculator tool error: {str(e)}" |
| |
|
| | @tool |
| | def extract_text_from_image(image_path: str) -> str: |
| | """Extract text from a locally saved image.""" |
| |
|
| | try: |
| | llm_vision = setup_llm()[3] |
| | with open(image_path, "rb") as image_file: |
| | image_bytes = image_file.read() |
| | image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
| |
|
| | message = [ |
| | HumanMessage( |
| | content=[ |
| | { |
| | "type": "text", |
| | "text": ( |
| | "Extract all the text from this image. " |
| | "Return only the extracted text, no explanations." |
| | ), |
| | }, |
| | { |
| | "type": "image_url", |
| | "image_url": { |
| | "url": f"data:image/png;base64,{image_base64}" |
| | }, |
| | }, |
| | ] |
| | ) |
| | ] |
| |
|
| | result = llm_vision.invoke(message) |
| | response = result.content + "\n\n" |
| |
|
| | if response: |
| | return response |
| | else: |
| | return "extract_text_from_image tool produced empty response" |
| | except Exception as e: |
| | return f"extract_text_from_image tool error: {str(e)}" |
| |
|
| | @tool |
| | def extract_youtube_video(video_url: str) -> str: |
| | """Extract text from a youtube video.""" |
| | try: |
| | loader = YoutubeLoader.from_youtube_url( |
| | video_url, |
| | transcript_format=TranscriptFormat.TEXT, |
| | language="en", |
| | add_video_info=True, |
| | ) |
| |
|
| | result = loader.load() |
| | response = result[0].page_content + "\n\n" |
| |
|
| | if response: |
| | return response |
| | else: |
| | return "extract_youtube_video tool produced empty response" |
| | except Exception as e: |
| | return f"extract_youtube_video tool error: {str(e)}" |
| |
|
| | @tool |
| | def transcribe_audio(audio_path: str) -> str: |
| | """Extract text from a locally saved audio file.""" |
| | |
| | try: |
| | openai_client = setup_llm()[4] |
| |
|
| | with open(audio_path, "rb") as audio_file: |
| | response = openai_client.audio.transcriptions.create( |
| | model="gpt-4o-mini-transcribe", |
| | file=audio_file |
| | ) |
| |
|
| | if response: |
| | return response.text |
| | else: |
| | return "transcribe_audio tool produced empty response" |
| | except Exception as e: |
| | return f"transcribe_audio tool error: {str(e)}" |
| |
|
| | @tool |
| | def query_about_image(query: str, image_url: str) -> str: |
| | """Ask anything about an image from a URL using a Vision Language Model |
| | Args: |
| | query (str): The query about the image |
| | image_url (str): The URL to the image |
| | """ |
| |
|
| | try: |
| | openai_client = setup_llm()[4] |
| | response = openai_client.responses.create( |
| | model="gpt-4o-mini", |
| | input=[{ |
| | "role": "user", |
| | "content": [ |
| | {"type": "input_text", "text": query}, |
| | { |
| | "type": "input_image", |
| | "image_url": image_url, |
| | }, |
| | ], |
| | }], |
| | ) |
| |
|
| | if response: |
| | return response.text |
| | else: |
| | return "query_about_image tool produced empty response" |
| | except Exception as e: |
| | return f"query_about_image tool error: {str(e)}" |
| |
|
| | @tool |
| | def execute_python_code(code: str) -> Any: |
| | """Executes Python code safely in a restricted environment.""" |
| | |
| | try: |
| | |
| | allowed_modules = {'math', 're', 'json'} |
| | forbidden_terms = ['import', 'exec', 'eval', 'open', 'os', 'sys', 'subprocess'] |
| |
|
| | |
| | for term in forbidden_terms: |
| | if term in code: |
| | return f"Forbidden term used: {term}" |
| |
|
| | |
| | restricted_globals = { |
| | '__builtins__': { |
| | k: __builtins__[k] for k in __builtins__ |
| | if k not in ['open', 'exec', 'eval', 'compile'] |
| | } |
| | } |
| |
|
| | |
| | for module_name in allowed_modules: |
| | restricted_globals[module_name] = __import__(module_name) |
| |
|
| | |
| | local_vars = {} |
| | exec(code, restricted_globals, local_vars) |
| |
|
| | |
| | return local_vars |
| |
|
| | except Exception as e: |
| | return f"Code execution error: {str(e)}" |
| | """ |
| | # not used |
| | def csv_reader(file_path: str) -> str: |
| | |
| | try: |
| | df = pd.read_csv(file_path, encoding='utf-8') |
| | |
| | if df.empty: |
| | return f"CSV file found at '{file_path}' but it is empty." |
| | |
| | return df.to_string() |
| | |
| | except FileNotFoundError: |
| | return f"csv_reader tool error: File not found at '{file_path}'" |
| | except pd.errors.EmptyDataError: |
| | return f"csv_reader tool error: No data found in CSV file '{file_path}'." |
| | except pd.errors.ParserError as e: |
| | return f"csv_reader tool error: Error parsing CSV file '{file_path}'. Details: {str(e)}" |
| | except Exception as e: |
| | # Catch any other unexpected errors |
| | import traceback |
| | tb_str = traceback.format_exc() |
| | return f"csv_reader tool error: An unexpected error occurred while reading '{file_path}'. Error: {str(e)}\nTraceback:\n{tb_str}" |
| | """ |
| | @tool |
| | def chess_board_image_analysis(image_path, order_of_play: str = "black") -> str: |
| | """Analyze a chess position from an image and order of play (black or white) and return the best move in algebraic notation.""" |
| | import chess |
| | import base64 |
| | import requests |
| | import os.path |
| |
|
| | def image_to_chess_json(image_path: str) -> Union[dict, str]: |
| | try: |
| | llm_vision = setup_llm()[3] |
| | with open(image_path, "rb") as image_file: |
| | image_bytes = image_file.read() |
| | image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
| |
|
| | message = [ |
| | HumanMessage( |
| | content=[ |
| | { |
| | "type": "text", |
| | "text": |
| | """Analyze this image of a chessboard and return the position of each figure in the following format: |
| | { |
| | "figure_name": "position_on_board" |
| | } |
| | Important instructions: |
| | 1. Each figure on the board is represented by a unique line in the JSON object |
| | 2. Return ONLY the raw JSON without any formatting, markdown, code blocks, or explanations |
| | 3. Do not use triple backticks |
| | 4. Do not include the string "json" before the output |
| | 5. Just return the plain JSON object directly""", |
| | }, |
| | { |
| | "type": "image_url", |
| | "image_url": { |
| | "url": f"data:image/png;base64,{image_base64}" |
| | }, |
| | }, |
| | ] |
| | ) |
| | ] |
| |
|
| | result = llm_vision.invoke(message) |
| | response = result.content |
| |
|
| | try: |
| | position_dict = json.loads(response) |
| | print(position_dict) |
| | return position_dict |
| | except json.JSONDecodeError: |
| | return f"Error: Could not parse response as JSON: {response}" |
| | except Exception as e: |
| | return f"image_to_fen_llm tool error: {str(e)}" |
| |
|
| | def create_fen_from_position(position_dict): |
| | """Convert a position dictionary to FEN notation.""" |
| | |
| | board = [['' for _ in range(8)] for _ in range(8)] |
| |
|
| | |
| | piece_map = { |
| | 'white_king': 'K', 'white_queen': 'Q', |
| | 'white_rook': 'R', 'white_bishop': 'B', |
| | 'white_knight': 'N', 'white_pawn': 'P', |
| | 'black_king': 'k', 'black_queen': 'q', |
| | 'black_rook': 'r', 'black_bishop': 'b', |
| | 'black_knight': 'n', 'black_pawn': 'p' |
| | } |
| |
|
| | |
| | for piece, position in position_dict.items(): |
| | if not position or len(position) < 2: |
| | continue |
| |
|
| | |
| | file, rank = position[0], position[1] |
| | col = ord(file) - ord('a') |
| | row = 8 - int(rank) |
| |
|
| | |
| | if col < 0 or col > 7 or row < 0 or row > 7: |
| | continue |
| |
|
| | |
| | piece_symbol = '' |
| |
|
| | |
| | if piece in piece_map: |
| | piece_symbol = piece_map[piece] |
| | else: |
| | |
| | base_name = piece.rstrip('0123456789') |
| | if base_name.endswith('_'): |
| | base_name = base_name[:-1] |
| |
|
| | if base_name in piece_map: |
| | piece_symbol = piece_map[base_name] |
| | else: |
| | |
| | for key, symbol in piece_map.items(): |
| | if piece.startswith(key): |
| | piece_symbol = symbol |
| | break |
| |
|
| | |
| | if piece_symbol: |
| | board[row][col] = piece_symbol |
| |
|
| | |
| | fen_parts = [] |
| | for row in board: |
| | empty_count = 0 |
| | rank_str = "" |
| |
|
| | for cell in row: |
| | if cell == '': |
| | empty_count += 1 |
| | else: |
| | if empty_count > 0: |
| | rank_str += str(empty_count) |
| | empty_count = 0 |
| | rank_str += cell |
| |
|
| | if empty_count > 0: |
| | rank_str += str(empty_count) |
| |
|
| | fen_parts.append(rank_str) |
| |
|
| | piece_placement = '/'.join(fen_parts) |
| |
|
| | |
| | active_color = "b" if order_of_play == "black" else "w" |
| | castling = "KQkq" |
| | en_passant = "-" |
| | halfmove_clock = "0" |
| | fullmove_number = "1" |
| |
|
| | |
| | fen_notation = f"{piece_placement} {active_color} {castling} {en_passant} {halfmove_clock} {fullmove_number}" |
| |
|
| | return fen_notation |
| | |
| | def get_best_move(fen_notation: str) -> str: |
| | """Get the best move from Stockfish chess engine API.""" |
| | if not fen_notation: |
| | return "Error: Invalid FEN notation" |
| |
|
| | try: |
| | api_url_stockfish = "https://stockfish.online/api/s/v2.php" |
| | depth = 10 |
| | |
| | import urllib.parse |
| | encoded_fen = urllib.parse.quote(fen_notation) |
| | full_url = f"{api_url_stockfish}?fen={encoded_fen}&depth={depth}" |
| |
|
| | |
| | response = requests.get(full_url, timeout=60) |
| |
|
| | |
| | if response.status_code == 200: |
| | result = response.json() |
| | if result.get("success", False): |
| | |
| | best_move = result.get("bestmove", "") |
| |
|
| | |
| | |
| | if " " in best_move: |
| | best_move = best_move.split(" ")[1] |
| |
|
| | return best_move |
| | else: |
| | |
| | return f"Failed to get best move: {result.get('data', 'Unknown error')}" |
| |
|
| | return f"Failed to get best move. Status code: {response.status_code}" |
| | except Exception as e: |
| | return f"Error getting best move: {str(e)}" |
| |
|
| | def convert_uci_to_algebraic(fen_notation, uci_move): |
| | """Convert UCI move notation to algebraic notation.""" |
| | if not fen_notation or not uci_move: |
| | return "Error: Missing FEN notation or UCI move" |
| |
|
| | try: |
| | |
| | board = chess.Board(fen_notation) |
| |
|
| | |
| | move = chess.Move.from_uci(uci_move) |
| |
|
| | |
| | if move not in board.legal_moves: |
| | return f"Error: {uci_move} is not a legal move in this position" |
| |
|
| | |
| | algebraic_move = board.san(move) |
| | return algebraic_move |
| | except ValueError as e: |
| | return f"Error converting move: {str(e)}" |
| | except Exception as e: |
| | return f"Unexpected error: {str(e)}" |
| |
|
| | |
| | try: |
| | |
| | chess_json = image_to_chess_json(image_path) |
| | if isinstance(chess_json, str) and (chess_json.startswith("Error") or chess_json.startswith("Failed")): |
| | return chess_json |
| |
|
| | |
| | fen_notation = create_fen_from_position(chess_json) |
| | uci_move = get_best_move(fen_notation) |
| | if isinstance(uci_move, str) and (uci_move.startswith("Error") or uci_move.startswith("Failed")): |
| | return uci_move |
| |
|
| | |
| | algebraic_move = convert_uci_to_algebraic(fen_notation, uci_move) |
| |
|
| | |
| | if algebraic_move.startswith("Error"): |
| | return algebraic_move |
| | else: |
| | return f"Best move: {algebraic_move}" |
| | except Exception as e: |
| | return f"Chess board analysis failed: {str(e)}" |
| |
|
| | @tool |
| | def download_youtube_audio(url, task_id): |
| | """Download the audio from a YouTube video""" |
| | temp_dir = tempfile.gettempdir() |
| | output_filename_template = os.path.join(temp_dir, f"{task_id}.%(ext)s") |
| | downloaded_audio_file_path = os.path.join(temp_dir, f"{task_id}.mp3") |
| | |
| | ydl_opts = { |
| | 'format': 'bestaudio/best', |
| | 'outtmpl': output_filename_template, |
| | 'postprocessors': [{ |
| | 'key': 'FFmpegExtractAudio', |
| | 'preferredcodec': 'mp3', |
| | 'preferredquality': '192', |
| | }], |
| | 'quiet': True, |
| | } |
| |
|
| | try: |
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| | ydl.download([url]) |
| | if os.path.exists(downloaded_audio_file_path): |
| | print(f"Successfully downloaded and converted audio to: {downloaded_audio_file_path}") |
| | return downloaded_audio_file_path |
| | else: |
| | print(f"Failed to download audio from {url}") |
| | return None |
| |
|
| | except DownloadError as e: |
| | print(f"yt-dlp download error for {url} (Task ID: {task_id}): {e}") |
| | return None |
| | except ExtractorError as e: |
| | print(f"yt-dlp extractor error for {url} (Task ID: {task_id}): {e}") |
| | return None |
| | except Exception as e: |
| | print(f"An unexpected error occurred while processing {url} (Task ID: {task_id}): {e}") |
| | return None |
| |
|
| | @tool |
| | def find_phrase_in_text(text, phrase): |
| | """ |
| | Find a specific phrase in the text and return its segment with the next segment |
| | |
| | Args: |
| | text (str): The text to search |
| | phrase (str): The phrase to look for |
| | |
| | Returns: |
| | tuple: (question segment, response segment) |
| | """ |
| | segments = text['segments'] |
| |
|
| | |
| | phrase_lower = phrase.lower() |
| |
|
| | |
| | phrase_segment = None |
| |
|
| | for i, segment in enumerate(segments): |
| | if phrase_lower in segment['text'].lower(): |
| | phrase_segment = segment |
| | |
| | if i + 1 < len(segments): |
| | response_segment = segments[i + 1] |
| | return phrase_segment, response_segment |
| |
|
| | return None, None |
| |
|
| | @tool |
| | def analyse_tabular_data(table_path, query: str) -> str: |
| | """ |
| | Analyse a table and return the answer to a question. |
| | """ |
| | try: |
| | |
| | file_type = table_path.split(".")[-1] |
| | reader_map = { |
| | "csv": pd.read_csv, |
| | "json": pd.read_json, |
| | "xlsx": pd.read_excel, |
| | "xls": pd.read_excel, |
| | } |
| |
|
| | if file_type not in reader_map: |
| | return f"Unsupported file type: {file_type}" |
| |
|
| | df = reader_map[file_type](table_path) |
| | except Exception as e: |
| | return f"Error reading table: {str(e)}" |
| |
|
| | if df is None: |
| | print(f"Error: Table is not in a valid format") |
| | return None |
| | else: |
| | try: |
| | agent = create_pandas_dataframe_agent( |
| | ChatOpenAI(temperature=0, model="gpt-4.1-mini"), |
| | df, |
| | verbose=True, |
| | agent_type=AgentType.OPENAI_FUNCTIONS, |
| | allow_dangerous_code=True |
| | ) |
| | result = agent.invoke({"input": query}) |
| | return str(result) |
| | except Exception as e: |
| | return f"Error analysing table: {str(e)}" |
| | |
| |
|