Spaces:
Build error
Build error
import os | |
from pathlib import Path | |
from openhands.events.observation import ( | |
ErrorObservation, | |
FileReadObservation, | |
FileWriteObservation, | |
Observation, | |
) | |
def resolve_path( | |
file_path: str, | |
working_directory: str, | |
workspace_base: str, | |
workspace_mount_path_in_sandbox: str, | |
) -> Path: | |
"""Resolve a file path to a path on the host filesystem. | |
Args: | |
file_path: The path to resolve. | |
working_directory: The working directory of the agent. | |
workspace_mount_path_in_sandbox: The path to the workspace inside the sandbox. | |
workspace_base: The base path of the workspace on the host filesystem. | |
Returns: | |
The resolved path on the host filesystem. | |
""" | |
path_in_sandbox = Path(file_path) | |
# Apply working directory | |
if not path_in_sandbox.is_absolute(): | |
path_in_sandbox = Path(working_directory) / path_in_sandbox | |
# Sanitize the path with respect to the root of the full sandbox | |
# (deny any .. path traversal to parent directories of the sandbox) | |
abs_path_in_sandbox = path_in_sandbox.resolve() | |
# If the path is outside the workspace, deny it | |
if not abs_path_in_sandbox.is_relative_to(workspace_mount_path_in_sandbox): | |
raise PermissionError(f'File access not permitted: {file_path}') | |
# Get path relative to the root of the workspace inside the sandbox | |
path_in_workspace = abs_path_in_sandbox.relative_to( | |
Path(workspace_mount_path_in_sandbox) | |
) | |
# Get path relative to host | |
path_in_host_workspace = Path(workspace_base) / path_in_workspace | |
return path_in_host_workspace | |
def read_lines(all_lines: list[str], start: int = 0, end: int = -1) -> list[str]: | |
start = max(start, 0) | |
start = min(start, len(all_lines)) | |
end = -1 if end == -1 else max(end, 0) | |
end = min(end, len(all_lines)) | |
if end == -1: | |
if start == 0: | |
return all_lines | |
else: | |
return all_lines[start:] | |
else: | |
num_lines = len(all_lines) | |
begin = max(0, min(start, num_lines - 2)) | |
end = -1 if end > num_lines else max(begin + 1, end) | |
return all_lines[begin:end] | |
async def read_file( | |
path: str, | |
workdir: str, | |
workspace_base: str, | |
workspace_mount_path_in_sandbox: str, | |
start: int = 0, | |
end: int = -1, | |
) -> Observation: | |
try: | |
whole_path = resolve_path( | |
path, workdir, workspace_base, workspace_mount_path_in_sandbox | |
) | |
except PermissionError: | |
return ErrorObservation( | |
f"You're not allowed to access this path: {path}. You can only access paths inside the workspace." | |
) | |
try: | |
with open(whole_path, 'r', encoding='utf-8') as file: # noqa: ASYNC101 | |
lines = read_lines(file.readlines(), start, end) | |
except FileNotFoundError: | |
return ErrorObservation(f'File not found: {path}') | |
except UnicodeDecodeError: | |
return ErrorObservation(f'File could not be decoded as utf-8: {path}') | |
except IsADirectoryError: | |
return ErrorObservation(f'Path is a directory: {path}. You can only read files') | |
code_view = ''.join(lines) | |
return FileReadObservation(path=path, content=code_view) | |
def insert_lines( | |
to_insert: list[str], original: list[str], start: int = 0, end: int = -1 | |
) -> list[str]: | |
"""Insert the new content to the original content based on start and end""" | |
new_lines = [''] if start == 0 else original[:start] | |
new_lines += [i + '\n' for i in to_insert] | |
new_lines += [''] if end == -1 else original[end:] | |
return new_lines | |
async def write_file( | |
path: str, | |
workdir: str, | |
workspace_base: str, | |
workspace_mount_path_in_sandbox: str, | |
content: str, | |
start: int = 0, | |
end: int = -1, | |
) -> Observation: | |
insert = content.split('\n') | |
try: | |
whole_path = resolve_path( | |
path, workdir, workspace_base, workspace_mount_path_in_sandbox | |
) | |
if not os.path.exists(os.path.dirname(whole_path)): | |
os.makedirs(os.path.dirname(whole_path)) | |
mode = 'w' if not os.path.exists(whole_path) else 'r+' | |
try: | |
with open(whole_path, mode, encoding='utf-8') as file: # noqa: ASYNC101 | |
if mode != 'w': | |
all_lines = file.readlines() | |
new_file = insert_lines(insert, all_lines, start, end) | |
else: | |
new_file = [i + '\n' for i in insert] | |
file.seek(0) | |
file.writelines(new_file) | |
file.truncate() | |
except FileNotFoundError: | |
return ErrorObservation(f'File not found: {path}') | |
except IsADirectoryError: | |
return ErrorObservation( | |
f'Path is a directory: {path}. You can only write to files' | |
) | |
except UnicodeDecodeError: | |
return ErrorObservation(f'File could not be decoded as utf-8: {path}') | |
except PermissionError as e: | |
return ErrorObservation(f'Permission error on {path}: {e}') | |
return FileWriteObservation(content='', path=path) | |