| import os |
| import uuid |
| import requests |
| import tempfile |
| from PIL import Image |
| import pytesseract |
| import pandas as pd |
| from urllib.parse import urlparse |
| from langchain_core.tools import tool |
| from typing import Optional |
| import logging |
| import pandasql as psql |
|
|
| |
| def setup_logger(): |
| logger = logging.getLogger("FileToolLogger") |
| logger.setLevel(logging.INFO) |
| if not logger.handlers: |
| handler = logging.StreamHandler() |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
| handler.setFormatter(formatter) |
| logger.addHandler(handler) |
| return logger |
|
|
| logger = setup_logger() |
|
|
| |
| @tool |
| def save_and_read_file(content: str, filename: Optional[str] = None) -> str: |
| """ |
| Save content to a file and return the path. |
| Args: |
| content (str): the content to save to the file |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. |
| """ |
| temp_dir = tempfile.gettempdir() |
| if filename is None: |
| temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) |
| filepath = temp_file.name |
| else: |
| filepath = os.path.join(temp_dir, filename) |
|
|
| with open(filepath, "w") as f: |
| f.write(content) |
|
|
| return f"File saved to {filepath}. You can read this file to process its contents." |
|
|
| |
|
|
| @tool |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: |
| """ |
| Download a file from a URL and save it to a temporary location. |
| Args: |
| url (str): the URL of the file to download. |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. |
| """ |
| try: |
| |
| if not filename: |
| path = urlparse(url).path |
| filename = os.path.basename(path) |
| if not filename: |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" |
|
|
| |
| temp_dir = tempfile.gettempdir() |
| filepath = os.path.join(temp_dir, filename) |
|
|
| |
| response = requests.get(url, stream=True) |
| response.raise_for_status() |
|
|
| |
| with open(filepath, "wb") as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| f.write(chunk) |
|
|
| return f"File downloaded to {filepath}. You can read this file to process its contents." |
| except Exception as e: |
| return f"Error downloading file: {str(e)}" |
|
|
| @tool |
| def extract_text_from_image(image_path: str) -> str: |
| """ |
| Extract text from an image using OCR library pytesseract (if available). |
| Args: |
| image_path (str): the path to the image file. |
| """ |
| try: |
| |
| |
| image = Image.open(image_path) |
| |
| |
| text = pytesseract.image_to_string(image) |
| return f"Extracted text from image:\n\n{text}" |
| |
| except Exception as e: |
| return f"Error extracting text from image: {str(e)}" |
|
|
| @tool |
| def analyze_csv_file(file_path: str, query: Optional[str] = None) -> str: |
|
|
| """ |
| Analyze a CSV file using pandas and answer a question about it. |
| Args: |
| file_path (str): the path to the CSV file. |
| query (str): Question about the data |
| """ |
| if not os.path.isfile(file_path) or not file_path.endswith((".csv")): |
| return "Invalid or missing csv file." |
| try : |
| |
| df = pd.read_csv(file_path) |
| columns = df.columns |
| result = [f"CSV loaded with shape: {df.shape}", f" Columns: {', '.join(columns)}"] |
|
|
| if query: |
| result.append(f"\n Query: {query}") |
| result_df = psql.sqldf(query, {"df": df}) |
| result.append("Query Result:\n" + result_df.to_string(index=False)) |
| else: |
| result.append("\nSummary:\n" + str(df.describe(include='all'))) |
|
|
| return "\n".join(result) |
|
|
| except Exception as e: |
| return f"Error analyzing CSV file: {str(e)}" |
|
|
| @tool |
| def analyze_excel_file(file_path: str, query: Optional[str] = None) -> str: |
|
|
| """ |
| Analyze a excel file using pandas and answer a question about it. |
| Args: |
| file_path (str): the path to the xls or xlsx file. |
| query (str): Question about the data |
| """ |
| if not os.path.isfile(file_path) or not file_path.endswith((".xls", ".xlsx")): |
| return "Invalid or missing Excel file." |
|
|
| try : |
|
|
| df = pd.read_excel(file_path) |
| columns = df.columns |
| result = [f"CSV loaded with shape: {df.shape}", f" Columns: {', '.join(columns)}"] |
|
|
| if query: |
| result.append(f"\n Query: {query}") |
| result_df = psql.sqldf(query, {"df": df}) |
| result.append("Query Result:\n" + result_df.to_string(index=False)) |
| else: |
| result.append("\nSummary:\n" + str(df.describe(include='all'))) |
|
|
| return "\n".join(result) |
|
|
| except Exception as e: |
| return f"Error analyzing Excel file: {str(e)}" |
| |