File size: 6,231 Bytes
51ff9e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import os
from typing import Any, TypedDict

import boto3
import botocore

from openhands.storage.files import FileStore


class S3ObjectDict(TypedDict):
    Key: str


class GetObjectOutputDict(TypedDict):
    Body: Any


class ListObjectsV2OutputDict(TypedDict):
    Contents: list[S3ObjectDict] | None


class S3FileStore(FileStore):
    def __init__(self, bucket_name: str | None) -> None:
        access_key = os.getenv('AWS_ACCESS_KEY_ID')
        secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
        secure = os.getenv('AWS_S3_SECURE', 'true').lower() == 'true'
        endpoint = self._ensure_url_scheme(secure, os.getenv('AWS_S3_ENDPOINT'))
        if bucket_name is None:
            bucket_name = os.environ['AWS_S3_BUCKET']
        self.bucket: str = bucket_name
        self.client: Any = boto3.client(
            's3',
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            endpoint_url=endpoint,
            use_ssl=secure,
        )

    def write(self, path: str, contents: str | bytes) -> None:
        try:
            as_bytes = (
                contents.encode('utf-8') if isinstance(contents, str) else contents
            )
            self.client.put_object(Bucket=self.bucket, Key=path, Body=as_bytes)
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'AccessDenied':
                raise FileNotFoundError(
                    f"Error: Access denied to bucket '{self.bucket}'."
                )
            elif e.response['Error']['Code'] == 'NoSuchBucket':
                raise FileNotFoundError(
                    f"Error: The bucket '{self.bucket}' does not exist."
                )
            raise FileNotFoundError(
                f"Error: Failed to write to bucket '{self.bucket}' at path {path}: {e}"
            )

    def read(self, path: str) -> str:
        try:
            response: GetObjectOutputDict = self.client.get_object(
                Bucket=self.bucket, Key=path
            )
            with response['Body'] as stream:
                return str(stream.read().decode('utf-8'))
        except botocore.exceptions.ClientError as e:
            # Catch all S3-related errors
            if e.response['Error']['Code'] == 'NoSuchBucket':
                raise FileNotFoundError(
                    f"Error: The bucket '{self.bucket}' does not exist."
                )
            elif e.response['Error']['Code'] == 'NoSuchKey':
                raise FileNotFoundError(
                    f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'."
                )
            else:
                raise FileNotFoundError(
                    f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}"
                )
        except Exception as e:
            raise FileNotFoundError(
                f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}"
            )

    def list(self, path: str) -> list[str]:
        if not path or path == '/':
            path = ''
        elif not path.endswith('/'):
            path += '/'
        # The delimiter logic screens out directories, so we can't use it. :(
        # For example, given a structure:
        #   foo/bar/zap.txt
        #   foo/bar/bang.txt
        #   ping.txt
        # prefix=None, delimiter="/"   yields  ["ping.txt"]  # :(
        # prefix="foo", delimiter="/"  yields  []  # :(
        results: set[str] = set()
        prefix_len = len(path)
        response: ListObjectsV2OutputDict = self.client.list_objects_v2(
            Bucket=self.bucket, Prefix=path
        )
        contents = response.get('Contents')
        if not contents:
            return []
        paths = [obj['Key'] for obj in contents]
        for sub_path in paths:
            if sub_path == path:
                continue
            try:
                index = sub_path.index('/', prefix_len + 1)
                if index != prefix_len:
                    results.add(sub_path[: index + 1])
            except ValueError:
                results.add(sub_path)
        return list(results)

    def delete(self, path: str) -> None:
        try:
            # Sanitize path
            if not path or path == '/':
                path = ''
            if path.endswith('/'):
                path = path[:-1]

            # Try to delete any child resources (Assume the path is a directory)
            response = self.client.list_objects_v2(
                Bucket=self.bucket, Prefix=f'{path}/'
            )
            for content in response.get('Contents') or []:
                self.client.delete_object(Bucket=self.bucket, Key=content['Key'])

            # Next try to delete item as a file
            self.client.delete_object(Bucket=self.bucket, Key=path)

        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchBucket':
                raise FileNotFoundError(
                    f"Error: The bucket '{self.bucket}' does not exist."
                )
            elif e.response['Error']['Code'] == 'AccessDenied':
                raise FileNotFoundError(
                    f"Error: Access denied to bucket '{self.bucket}'."
                )
            elif e.response['Error']['Code'] == 'NoSuchKey':
                raise FileNotFoundError(
                    f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'."
                )
            else:
                raise FileNotFoundError(
                    f"Error: Failed to delete key '{path}' from bucket '{self.bucket}': {e}"
                )
        except Exception as e:
            raise FileNotFoundError(
                f"Error: Failed to delete key '{path}' from bucket '{self.bucket}: {e}"
            )

    def _ensure_url_scheme(self, secure: bool, url: str | None) -> str | None:
        if not url:
            return None
        if secure:
            if not url.startswith('https://'):
                url = 'https://' + url.removeprefix('http://')
        else:
            if not url.startswith('http://'):
                url = 'http://' + url.removeprefix('https://')
        return url