"""File operations for AutoGPT""" |
from __future__ import annotations |
import os |
import os.path |
from typing import Generator |
import requests |
from colorama import Back, Fore |
from requests.adapters import HTTPAdapter, Retry |
from autogpt.spinner import Spinner |
from autogpt.utils import readable_file_size |
from autogpt.workspace import WORKSPACE_PATH, path_in_workspace |
LOG_FILE = "file_logger.txt" |
def check_duplicate_operation(operation: str, filename: str) -> bool: |
"""Check if the operation has already been performed on the given file |
Args: |
operation (str): The operation to check for |
filename (str): The name of the file to check for |
Returns: |
bool: True if the operation has already been performed on the file |
""" |
log_content = read_file(LOG_FILE) |
log_entry = f"{operation}: {filename}\n" |
return log_entry in log_content |
def log_operation(operation: str, filename: str) -> None: |
"""Log the file operation to the file_logger.txt |
Args: |
operation (str): The operation to log |
filename (str): The name of the file the operation was performed on |
""" |
log_entry = f"{operation}: {filename}\n" |
if not os.path.exists(LOG_FILE_PATH): |
with open(LOG_FILE_PATH, "w", encoding="utf-8") as f: |
f.write("File Operation Logger ") |
append_to_file(LOG_FILE, log_entry, shouldLog=False) |
def split_file( |
content: str, max_length: int = 4000, overlap: int = 0 |
) -> Generator[str, None, None]: |
""" |
Split text into chunks of a specified maximum length with a specified overlap |
between chunks. |
:param content: The input text to be split into chunks |
:param max_length: The maximum length of each chunk, |
default is 4000 (about 1k token) |
:param overlap: The number of overlapping characters between chunks, |
default is no overlap |
:return: A generator yielding chunks of text |
""" |
start = 0 |
content_length = len(content) |
while start < content_length: |
end = start + max_length |
if end + overlap < content_length: |
chunk = content[start : end + overlap - 1] |
else: |
chunk = content[start:content_length] |
if len(chunk) <= overlap: |
break |
yield chunk |
start += max_length - overlap |
def read_file(filename: str) -> str: |
"""Read a file and return the contents |
Args: |
filename (str): The name of the file to read |
Returns: |
str: The contents of the file |
""" |
try: |
filepath = path_in_workspace(filename) |
with open(filepath, "r", encoding="utf-8") as f: |
content = f.read() |
return content |
except Exception as e: |
return f"Error: {str(e)}" |
def ingest_file( |
filename: str, memory, max_length: int = 4000, overlap: int = 200 |
) -> None: |
""" |
Ingest a file by reading its content, splitting it into chunks with a specified |
maximum length and overlap, and adding the chunks to the memory storage. |
:param filename: The name of the file to ingest |
:param memory: An object with an add() method to store the chunks in memory |
:param max_length: The maximum length of each chunk, default is 4000 |
:param overlap: The number of overlapping characters between chunks, default is 200 |
""" |
try: |
print(f"Working with file {filename}") |
content = read_file(filename) |
content_length = len(content) |
print(f"File length: {content_length} characters") |
chunks = list(split_file(content, max_length=max_length, overlap=overlap)) |
num_chunks = len(chunks) |
for i, chunk in enumerate(chunks): |
print(f"Ingesting chunk {i + 1} / {num_chunks} into memory") |
memory_to_add = ( |
f"Filename: {filename}\n" f"Content part#{i + 1}/{num_chunks}: {chunk}" |
) |
memory.add(memory_to_add) |
print(f"Done ingesting {num_chunks} chunks from {filename}.") |
except Exception as e: |
print(f"Error while ingesting file '{filename}': {str(e)}") |
def write_to_file(filename: str, text: str) -> str: |
"""Write text to a file |
Args: |
filename (str): The name of the file to write to |
text (str): The text to write to the file |
Returns: |
str: A message indicating success or failure |
""" |
if check_duplicate_operation("write", filename): |
return "Error: File has already been updated." |
try: |
filepath = path_in_workspace(filename) |
directory = os.path.dirname(filepath) |
if not os.path.exists(directory): |
os.makedirs(directory) |
with open(filepath, "w", encoding="utf-8") as f: |
f.write(text) |
log_operation("write", filename) |
return "File written to successfully." |
except Exception as e: |
return f"Error: {str(e)}" |
def append_to_file(filename: str, text: str, shouldLog: bool = True) -> str: |
"""Append text to a file |
Args: |
filename (str): The name of the file to append to |
text (str): The text to append to the file |
Returns: |
str: A message indicating success or failure |
""" |
try: |
filepath = path_in_workspace(filename) |
with open(filepath, "a") as f: |
f.write(text) |
if shouldLog: |
log_operation("append", filename) |
return "Text appended successfully." |
except Exception as e: |
return f"Error: {str(e)}" |
def delete_file(filename: str) -> str: |
"""Delete a file |
Args: |
filename (str): The name of the file to delete |
Returns: |
str: A message indicating success or failure |
""" |
if check_duplicate_operation("delete", filename): |
return "Error: File has already been deleted." |
try: |
filepath = path_in_workspace(filename) |
os.remove(filepath) |
log_operation("delete", filename) |
return "File deleted successfully." |
except Exception as e: |
return f"Error: {str(e)}" |
def search_files(directory: str) -> list[str]: |
"""Search for files in a directory |
Args: |
directory (str): The directory to search in |
Returns: |
list[str]: A list of files found in the directory |
""" |
found_files = [] |
if directory in {"", "/"}: |
search_directory = WORKSPACE_PATH |
else: |
search_directory = path_in_workspace(directory) |
for root, _, files in os.walk(search_directory): |
for file in files: |
if file.startswith("."): |
continue |
relative_path = os.path.relpath(os.path.join(root, file), WORKSPACE_PATH) |
found_files.append(relative_path) |
return found_files |
def download_file(url, filename): |
"""Downloads a file |
Args: |
url (str): URL of the file to download |
filename (str): Filename to save the file as |
""" |
safe_filename = path_in_workspace(filename) |
try: |
message = f"{Fore.YELLOW}Downloading file from {Back.LIGHTBLUE_EX}{url}{Back.RESET}{Fore.RESET}" |
with Spinner(message) as spinner: |
session = requests.Session() |
retry = Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504]) |
adapter = HTTPAdapter(max_retries=retry) |
session.mount("http://", adapter) |
session.mount("https://", adapter) |
total_size = 0 |
downloaded_size = 0 |
with session.get(url, allow_redirects=True, stream=True) as r: |
r.raise_for_status() |
total_size = int(r.headers.get("Content-Length", 0)) |
downloaded_size = 0 |
with open(safe_filename, "wb") as f: |
for chunk in r.iter_content(chunk_size=8192): |
f.write(chunk) |
downloaded_size += len(chunk) |
progress = f"{readable_file_size(downloaded_size)} / {readable_file_size(total_size)}" |
spinner.update_message(f"{message} {progress}") |
return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(total_size)})' |
except requests.HTTPError as e: |
return f"Got an HTTP Error whilst trying to download file: {e}" |
except Exception as e: |
return "Error: " + str(e) |