|
"""File operations for AutoGPT""" |
|
from __future__ import annotations |
|
|
|
import os |
|
import os.path |
|
from typing import Generator |
|
|
|
import requests |
|
from colorama import Back, Fore |
|
from requests.adapters import HTTPAdapter, Retry |
|
|
|
from autogpt.spinner import Spinner |
|
from autogpt.utils import readable_file_size |
|
from autogpt.workspace import WORKSPACE_PATH, path_in_workspace |
|
|
|
LOG_FILE = "file_logger.txt" |
|
LOG_FILE_PATH = WORKSPACE_PATH / LOG_FILE |
|
|
|
|
|
def check_duplicate_operation(operation: str, filename: str) -> bool: |
|
"""Check if the operation has already been performed on the given file |
|
|
|
Args: |
|
operation (str): The operation to check for |
|
filename (str): The name of the file to check for |
|
|
|
Returns: |
|
bool: True if the operation has already been performed on the file |
|
""" |
|
log_content = read_file(LOG_FILE) |
|
log_entry = f"{operation}: {filename}\n" |
|
return log_entry in log_content |
|
|
|
|
|
def log_operation(operation: str, filename: str) -> None: |
|
"""Log the file operation to the file_logger.txt |
|
|
|
Args: |
|
operation (str): The operation to log |
|
filename (str): The name of the file the operation was performed on |
|
""" |
|
log_entry = f"{operation}: {filename}\n" |
|
|
|
|
|
if not os.path.exists(LOG_FILE_PATH): |
|
with open(LOG_FILE_PATH, "w", encoding="utf-8") as f: |
|
f.write("File Operation Logger ") |
|
|
|
append_to_file(LOG_FILE, log_entry, shouldLog=False) |
|
|
|
|
|
def split_file( |
|
content: str, max_length: int = 4000, overlap: int = 0 |
|
) -> Generator[str, None, None]: |
|
""" |
|
Split text into chunks of a specified maximum length with a specified overlap |
|
between chunks. |
|
|
|
:param content: The input text to be split into chunks |
|
:param max_length: The maximum length of each chunk, |
|
default is 4000 (about 1k token) |
|
:param overlap: The number of overlapping characters between chunks, |
|
default is no overlap |
|
:return: A generator yielding chunks of text |
|
""" |
|
start = 0 |
|
content_length = len(content) |
|
|
|
while start < content_length: |
|
end = start + max_length |
|
if end + overlap < content_length: |
|
chunk = content[start : end + overlap - 1] |
|
else: |
|
chunk = content[start:content_length] |
|
|
|
|
|
if len(chunk) <= overlap: |
|
break |
|
|
|
yield chunk |
|
start += max_length - overlap |
|
|
|
|
|
def read_file(filename: str) -> str: |
|
"""Read a file and return the contents |
|
|
|
Args: |
|
filename (str): The name of the file to read |
|
|
|
Returns: |
|
str: The contents of the file |
|
""" |
|
try: |
|
filepath = path_in_workspace(filename) |
|
with open(filepath, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
return content |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def ingest_file( |
|
filename: str, memory, max_length: int = 4000, overlap: int = 200 |
|
) -> None: |
|
""" |
|
Ingest a file by reading its content, splitting it into chunks with a specified |
|
maximum length and overlap, and adding the chunks to the memory storage. |
|
|
|
:param filename: The name of the file to ingest |
|
:param memory: An object with an add() method to store the chunks in memory |
|
:param max_length: The maximum length of each chunk, default is 4000 |
|
:param overlap: The number of overlapping characters between chunks, default is 200 |
|
""" |
|
try: |
|
print(f"Working with file {filename}") |
|
content = read_file(filename) |
|
content_length = len(content) |
|
print(f"File length: {content_length} characters") |
|
|
|
chunks = list(split_file(content, max_length=max_length, overlap=overlap)) |
|
|
|
num_chunks = len(chunks) |
|
for i, chunk in enumerate(chunks): |
|
print(f"Ingesting chunk {i + 1} / {num_chunks} into memory") |
|
memory_to_add = ( |
|
f"Filename: {filename}\n" f"Content part#{i + 1}/{num_chunks}: {chunk}" |
|
) |
|
|
|
memory.add(memory_to_add) |
|
|
|
print(f"Done ingesting {num_chunks} chunks from {filename}.") |
|
except Exception as e: |
|
print(f"Error while ingesting file '{filename}': {str(e)}") |
|
|
|
|
|
def write_to_file(filename: str, text: str) -> str: |
|
"""Write text to a file |
|
|
|
Args: |
|
filename (str): The name of the file to write to |
|
text (str): The text to write to the file |
|
|
|
Returns: |
|
str: A message indicating success or failure |
|
""" |
|
if check_duplicate_operation("write", filename): |
|
return "Error: File has already been updated." |
|
try: |
|
filepath = path_in_workspace(filename) |
|
directory = os.path.dirname(filepath) |
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
with open(filepath, "w", encoding="utf-8") as f: |
|
f.write(text) |
|
log_operation("write", filename) |
|
return "File written to successfully." |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def append_to_file(filename: str, text: str, shouldLog: bool = True) -> str: |
|
"""Append text to a file |
|
|
|
Args: |
|
filename (str): The name of the file to append to |
|
text (str): The text to append to the file |
|
|
|
Returns: |
|
str: A message indicating success or failure |
|
""" |
|
try: |
|
filepath = path_in_workspace(filename) |
|
with open(filepath, "a") as f: |
|
f.write(text) |
|
|
|
if shouldLog: |
|
log_operation("append", filename) |
|
|
|
return "Text appended successfully." |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def delete_file(filename: str) -> str: |
|
"""Delete a file |
|
|
|
Args: |
|
filename (str): The name of the file to delete |
|
|
|
Returns: |
|
str: A message indicating success or failure |
|
""" |
|
if check_duplicate_operation("delete", filename): |
|
return "Error: File has already been deleted." |
|
try: |
|
filepath = path_in_workspace(filename) |
|
os.remove(filepath) |
|
log_operation("delete", filename) |
|
return "File deleted successfully." |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def search_files(directory: str) -> list[str]: |
|
"""Search for files in a directory |
|
|
|
Args: |
|
directory (str): The directory to search in |
|
|
|
Returns: |
|
list[str]: A list of files found in the directory |
|
""" |
|
found_files = [] |
|
|
|
if directory in {"", "/"}: |
|
search_directory = WORKSPACE_PATH |
|
else: |
|
search_directory = path_in_workspace(directory) |
|
|
|
for root, _, files in os.walk(search_directory): |
|
for file in files: |
|
if file.startswith("."): |
|
continue |
|
relative_path = os.path.relpath(os.path.join(root, file), WORKSPACE_PATH) |
|
found_files.append(relative_path) |
|
|
|
return found_files |
|
|
|
|
|
def download_file(url, filename): |
|
"""Downloads a file |
|
Args: |
|
url (str): URL of the file to download |
|
filename (str): Filename to save the file as |
|
""" |
|
safe_filename = path_in_workspace(filename) |
|
try: |
|
message = f"{Fore.YELLOW}Downloading file from {Back.LIGHTBLUE_EX}{url}{Back.RESET}{Fore.RESET}" |
|
with Spinner(message) as spinner: |
|
session = requests.Session() |
|
retry = Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504]) |
|
adapter = HTTPAdapter(max_retries=retry) |
|
session.mount("http://", adapter) |
|
session.mount("https://", adapter) |
|
|
|
total_size = 0 |
|
downloaded_size = 0 |
|
|
|
with session.get(url, allow_redirects=True, stream=True) as r: |
|
r.raise_for_status() |
|
total_size = int(r.headers.get("Content-Length", 0)) |
|
downloaded_size = 0 |
|
|
|
with open(safe_filename, "wb") as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
downloaded_size += len(chunk) |
|
|
|
|
|
progress = f"{readable_file_size(downloaded_size)} / {readable_file_size(total_size)}" |
|
spinner.update_message(f"{message} {progress}") |
|
|
|
return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(total_size)})' |
|
except requests.HTTPError as e: |
|
return f"Got an HTTP Error whilst trying to download file: {e}" |
|
except Exception as e: |
|
return "Error: " + str(e) |
|
|